Unverified Commit dac8110d authored by friendtocephalopods's avatar friendtocephalopods Committed by GitHub

update models for 2.0.0 (#188)

* update models for 2.0.0

schema_name -> schema in table model
update search model to use schema, last_updated_timestamp

* update name -> full_name
parent 01a0f962
......@@ -10,7 +10,7 @@ from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.models.table_metadata import TableMetadata, ColumnMetadata
from itertools import groupby
TableKey = namedtuple('TableKey', ['schema_name', 'table_name'])
TableKey = namedtuple('TableKey', ['schema', 'table_name'])
LOGGER = logging.getLogger(__name__)
......@@ -22,11 +22,11 @@ class AthenaMetadataExtractor(Extractor):
SQL_STATEMENT = """
SELECT
{catalog_source} as cluster, table_schema as schema_name, table_name as name, column_name as col_name,
{catalog_source} as cluster, table_schema as schema, table_name as name, column_name as col_name,
data_type as col_type,ordinal_position as col_sort_order,
comment as col_description, extra_info as extras from information_schema.columns
{where_clause_suffix}
ORDER by cluster, schema_name, name, col_sort_order ;
ORDER by cluster, schema, name, col_sort_order ;
"""
# CONFIG KEYS
......@@ -89,7 +89,7 @@ class AthenaMetadataExtractor(Extractor):
row['col_type'], row['col_sort_order']))
yield TableMetadata('athena', last_row['cluster'],
last_row['schema_name'],
last_row['schema'],
last_row['name'],
'',
columns)
......@@ -113,6 +113,6 @@ class AthenaMetadataExtractor(Extractor):
:return:
"""
if row:
return TableKey(schema_name=row['schema_name'], table_name=row['name'])
return TableKey(schema=row['schema'], table_name=row['name'])
return None
......@@ -13,7 +13,7 @@ from databuilder.extractor.base_extractor import Extractor
DatasetRef = namedtuple('DatasetRef', ['datasetId', 'projectId'])
TableKey = namedtuple('TableKey', ['schema_name', 'table_name'])
TableKey = namedtuple('TableKey', ['schema', 'table_name'])
LOGGER = logging.getLogger(__name__)
......
......@@ -9,7 +9,7 @@ from databuilder.models.table_metadata import TableMetadata, ColumnMetadata
DatasetRef = namedtuple('DatasetRef', ['datasetId', 'projectId'])
TableKey = namedtuple('TableKey', ['schema_name', 'table_name'])
TableKey = namedtuple('TableKey', ['schema', 'table_name'])
LOGGER = logging.getLogger(__name__)
......@@ -75,7 +75,7 @@ class BigQueryMetadataExtractor(BaseBigQueryExtractor):
table_meta = TableMetadata(
database='bigquery',
cluster=tableRef['projectId'],
schema_name=tableRef['datasetId'],
schema=tableRef['datasetId'],
name=table_id,
description=table.get('description', ''),
columns=cols,
......
......@@ -11,7 +11,7 @@ from databuilder.extractor.base_bigquery_extractor import BaseBigQueryExtractor
from databuilder.models.watermark import Watermark
DatasetRef = namedtuple('DatasetRef', ['datasetId', 'projectId'])
TableKey = namedtuple('TableKey', ['schema_name', 'table_name'])
TableKey = namedtuple('TableKey', ['schema', 'table_name'])
PartitionInfo = namedtuple('PartitionInfo', ['partition_id', 'epoch_created'])
LOGGER = logging.getLogger(__name__)
......
......@@ -57,27 +57,27 @@ class HiveTableLastUpdatedExtractor(Extractor):
"""
PARTITION_TABLE_SQL_STATEMENT = """
SELECT
DBS.NAME as schema_name,
DBS.NAME as schema,
TBL_NAME as table_name,
MAX(PARTITIONS.CREATE_TIME) as last_updated_time
FROM TBLS
JOIN DBS ON TBLS.DB_ID = DBS.DB_ID
JOIN PARTITIONS ON TBLS.TBL_ID = PARTITIONS.TBL_ID
{where_clause_suffix}
GROUP BY schema_name, table_name
ORDER BY schema_name, table_name;
GROUP BY schema, table_name
ORDER BY schema, table_name;
"""
NON_PARTITIONED_TABLE_SQL_STATEMENT = """
SELECT
DBS.NAME as schema_name,
DBS.NAME as schema,
TBL_NAME as table_name,
SDS.LOCATION as location
FROM TBLS
JOIN DBS ON TBLS.DB_ID = DBS.DB_ID
JOIN SDS ON TBLS.SD_ID = SDS.SD_ID
{where_clause_suffix}
ORDER BY schema_name, table_name;
ORDER BY schema, table_name;
"""
# Additional where clause for non partitioned table SQL
......@@ -206,7 +206,7 @@ class HiveTableLastUpdatedExtractor(Extractor):
while partitioned_tbl_row:
yield TableLastUpdated(table_name=partitioned_tbl_row['table_name'],
last_updated_time_epoch=partitioned_tbl_row['last_updated_time'],
schema_name=partitioned_tbl_row['schema_name'],
schema=partitioned_tbl_row['schema'],
db=HiveTableLastUpdatedExtractor.DATABASE,
cluster=self._cluster)
partitioned_tbl_row = self._partitioned_table_extractor.extract()
......@@ -227,7 +227,7 @@ class HiveTableLastUpdatedExtractor(Extractor):
start = time.time()
table_last_updated = self._get_last_updated_datetime_from_filesystem(
table=non_partitioned_tbl_row['table_name'],
schema=non_partitioned_tbl_row['schema_name'],
schema=non_partitioned_tbl_row['schema'],
storage_location=non_partitioned_tbl_row['location'])
LOGGER.info('Elapsed: {} seconds'.format(time.time() - start))
......@@ -293,7 +293,7 @@ class HiveTableLastUpdatedExtractor(Extractor):
result = TableLastUpdated(table_name=table,
last_updated_time_epoch=int((last_updated - OLDEST_TIMESTAMP).total_seconds()),
schema_name=schema,
schema=schema,
db=HiveTableLastUpdatedExtractor.DATABASE,
cluster=self._cluster)
......
......@@ -11,7 +11,7 @@ from databuilder.models.table_metadata import TableMetadata, ColumnMetadata
from itertools import groupby
TableKey = namedtuple('TableKey', ['schema_name', 'table_name'])
TableKey = namedtuple('TableKey', ['schema', 'table_name'])
LOGGER = logging.getLogger(__name__)
......@@ -27,7 +27,7 @@ class HiveTableMetadataExtractor(Extractor):
# Using UNION to combine above two statements and order by table & partition identifier.
SQL_STATEMENT = """
SELECT source.* FROM
(SELECT t.TBL_ID, d.NAME as schema_name, t.TBL_NAME name, t.TBL_TYPE, tp.PARAM_VALUE as description,
(SELECT t.TBL_ID, d.NAME as schema, t.TBL_NAME name, t.TBL_TYPE, tp.PARAM_VALUE as description,
p.PKEY_NAME as col_name, p.INTEGER_IDX as col_sort_order,
p.PKEY_TYPE as col_type, p.PKEY_COMMENT as col_description, 1 as "is_partition_col"
FROM TBLS t
......@@ -36,7 +36,7 @@ class HiveTableMetadataExtractor(Extractor):
LEFT JOIN TABLE_PARAMS tp ON (t.TBL_ID = tp.TBL_ID AND tp.PARAM_KEY='comment')
{where_clause_suffix}
UNION
SELECT t.TBL_ID, d.NAME as schema_name, t.TBL_NAME name, t.TBL_TYPE, tp.PARAM_VALUE as description,
SELECT t.TBL_ID, d.NAME as schema, t.TBL_NAME name, t.TBL_TYPE, tp.PARAM_VALUE as description,
c.COLUMN_NAME as col_name, c.INTEGER_IDX as col_sort_order,
c.TYPE_NAME as col_type, c.COMMENT as col_description, 0 as "is_partition_col"
FROM TBLS t
......@@ -101,7 +101,7 @@ class HiveTableMetadataExtractor(Extractor):
row['col_type'], row['col_sort_order']))
yield TableMetadata('hive', self._cluster,
last_row['schema_name'],
last_row['schema'],
last_row['name'],
last_row['description'],
columns)
......@@ -125,6 +125,6 @@ class HiveTableMetadataExtractor(Extractor):
:return:
"""
if row:
return TableKey(schema_name=row['schema_name'], table_name=row['name'])
return TableKey(schema=row['schema'], table_name=row['name'])
return None
......@@ -26,9 +26,9 @@ class Neo4jSearchDataExtractor(Extractor):
OPTIONAL MATCH (cols)-[:DESCRIPTION]->(col_description:Description)
OPTIONAL MATCH (table)-[:TAGGED_BY]->(tags:Tag)
OPTIONAL MATCH (table)-[:LAST_UPDATED_AT]->(time_stamp:Timestamp)
RETURN db.name as database, cluster.name AS cluster, schema.name AS schema_name,
RETURN db.name as database, cluster.name AS cluster, schema.name AS schema,
table.name AS name, table.key AS key, table_description.description AS description,
time_stamp.last_updated_timestamp AS last_updated_epoch,
time_stamp.last_updated_timestamp AS last_updated_timestamp,
EXTRACT(c in COLLECT(DISTINCT cols)| c.name) AS column_names,
EXTRACT(cd IN COLLECT(DISTINCT col_description)| cd.description) AS column_descriptions,
REDUCE(sum_r = 0, r in COLLECT(DISTINCT read)| sum_r + r.read_count) AS total_usage,
......
......@@ -12,7 +12,7 @@ from databuilder.models.table_metadata import TableMetadata, ColumnMetadata
from itertools import groupby
TableKey = namedtuple('TableKey', ['schema_name', 'table_name'])
TableKey = namedtuple('TableKey', ['schema', 'table_name'])
LOGGER = logging.getLogger(__name__)
......@@ -24,7 +24,7 @@ class PostgresMetadataExtractor(Extractor):
# SELECT statement from postgres information_schema to extract table and column metadata
SQL_STATEMENT = """
SELECT
{cluster_source} as cluster, c.table_schema as schema_name, c.table_name as name, pgtd.description as description
{cluster_source} as cluster, c.table_schema as schema, c.table_name as name, pgtd.description as description
,c.column_name as col_name, c.data_type as col_type
, pgcd.description as col_description, ordinal_position as col_sort_order
FROM INFORMATION_SCHEMA.COLUMNS c
......@@ -35,7 +35,7 @@ class PostgresMetadataExtractor(Extractor):
LEFT JOIN
pg_catalog.pg_description pgtd on pgtd.objoid=st.relid and pgtd.objsubid=0
{where_clause_suffix}
ORDER by cluster, schema_name, name, col_sort_order ;
ORDER by cluster, schema, name, col_sort_order ;
"""
# CONFIG KEYS
......@@ -111,7 +111,7 @@ class PostgresMetadataExtractor(Extractor):
row['col_type'], row['col_sort_order']))
yield TableMetadata(self._database, last_row['cluster'],
last_row['schema_name'],
last_row['schema'],
last_row['name'],
last_row['description'],
columns)
......@@ -135,6 +135,6 @@ class PostgresMetadataExtractor(Extractor):
:return:
"""
if row:
return TableKey(schema_name=row['schema_name'], table_name=row['name'])
return TableKey(schema=row['schema'], table_name=row['name'])
return None
......@@ -22,7 +22,7 @@ class PrestoViewMetadataExtractor(Extractor):
# SQL statement to extract View metadata
# {where_clause_suffix} could be used to filter schemas
SQL_STATEMENT = """
SELECT t.TBL_ID, d.NAME as schema_name, t.TBL_NAME name, t.TBL_TYPE, t.VIEW_ORIGINAL_TEXT as view_original_text
SELECT t.TBL_ID, d.NAME as schema, t.TBL_NAME name, t.TBL_TYPE, t.VIEW_ORIGINAL_TEXT as view_original_text
FROM TBLS t
JOIN DBS d ON t.DB_ID = d.DB_ID
WHERE t.VIEW_EXPANDED_TEXT = '/* Presto View */'
......@@ -83,7 +83,7 @@ class PrestoViewMetadataExtractor(Extractor):
columns = self._get_column_metadata(row['view_original_text'])
yield TableMetadata(database='presto',
cluster=self._cluster,
schema_name=row['schema_name'],
schema=row['schema'],
name=row['name'],
description=None,
columns=columns,
......
......@@ -13,7 +13,7 @@ from databuilder.models.table_metadata import TableMetadata, ColumnMetadata
from itertools import groupby
TableKey = namedtuple('TableKey', ['schema_name', 'table_name'])
TableKey = namedtuple('TableKey', ['schema', 'table_name'])
LOGGER = logging.getLogger(__name__)
......@@ -34,7 +34,7 @@ class SnowflakeMetadataExtractor(Extractor):
lower(c.ordinal_position) AS col_sort_order,
lower(c.table_catalog) AS database,
lower({cluster_source}) AS cluster,
lower(c.table_schema) AS schema_name,
lower(c.table_schema) AS schema,
lower(c.table_name) AS name,
t.comment AS description,
decode(lower(t.table_type), 'view', 'true', 'false') AS is_view
......@@ -124,7 +124,7 @@ class SnowflakeMetadataExtractor(Extractor):
)
yield TableMetadata(self._database, last_row['cluster'],
last_row['schema_name'],
last_row['schema'],
last_row['name'],
unidecode(last_row['description']) if last_row['description'] else None,
columns,
......@@ -149,6 +149,6 @@ class SnowflakeMetadataExtractor(Extractor):
:return:
"""
if row:
return TableKey(schema_name=row['schema_name'], table_name=row['name'])
return TableKey(schema=row['schema'], table_name=row['name'])
return None
......@@ -29,7 +29,7 @@ class Application(Neo4jCsvSerializable):
dag_id, # type: str,
application_url_template, # type: str
db_name='hive', # type: str
schema_name='', # type: str
schema='', # type: str
table_name='', # type: str
exec_date='', # type: str
):
......@@ -38,7 +38,7 @@ class Application(Neo4jCsvSerializable):
# todo: need to modify this hack
self.application_url = application_url_template.format(dag_id=dag_id)
self.database, self.schema, self.table = db_name, schema_name, table_name
self.database, self.schema, self.table = db_name, schema, table_name
self.dag = dag_id
......
......@@ -27,7 +27,7 @@ class ColumnUsageModel(Neo4jCsvSerializable):
def __init__(self,
database, # type: str
cluster, # type: str
schema_name, # type: str
schema, # type: str
table_name, # type: str
column_name, # type: str
user_email, # type: str
......@@ -36,7 +36,7 @@ class ColumnUsageModel(Neo4jCsvSerializable):
# type: (...) -> None
self.database = database
self.cluster = cluster
self.schema_name = schema_name
self.schema = schema
self.table_name = table_name
self.column_name = column_name
self.user_email = user_email
......@@ -86,7 +86,7 @@ class ColumnUsageModel(Neo4jCsvSerializable):
# type: (ColumnReader) -> str
return TableMetadata.TABLE_KEY_FORMAT.format(db=self.database,
cluster=self.cluster,
schema=self.schema_name,
schema=self.schema,
tbl=self.table_name)
def _get_user_key(self, email):
......@@ -97,7 +97,7 @@ class ColumnUsageModel(Neo4jCsvSerializable):
# type: () -> str
return 'TableColumnUsage({!r}, {!r}, {!r}, {!r}, {!r}, {!r}, {!r})'.format(self.database,
self.cluster,
self.schema_name,
self.schema,
self.table_name,
self.column_name,
self.user_email,
......
......@@ -13,7 +13,7 @@ class HiveWatermark(Watermark):
def __init__(self,
create_time, # type: str
schema_name, # type: str
schema, # type: str
table_name, # type: str
part_name, # type: str
part_type='high_watermark', # type: str
......@@ -22,7 +22,7 @@ class HiveWatermark(Watermark):
# type: (...) -> None
super(HiveWatermark, self).__init__(create_time=create_time,
database='hive',
schema_name=schema_name,
schema=schema,
table_name=table_name,
part_name=part_name,
part_type=part_type,
......
......@@ -10,11 +10,11 @@ class TableESDocument(ElasticsearchDocument):
def __init__(self,
database, # type: str
cluster, # type: str
schema_name, # type: str
schema, # type: str
name, # type: str
key, # type: str
description, # type: str
last_updated_epoch, # type: Optional[int]
last_updated_timestamp, # type: Optional[int]
column_names, # type: List[str]
column_descriptions, # type: List[str]
total_usage, # type: int
......@@ -25,13 +25,13 @@ class TableESDocument(ElasticsearchDocument):
# type: (...) -> None
self.database = database
self.cluster = cluster
self.schema_name = schema_name
self.schema = schema
self.name = name
self.display_name = display_name if display_name else '{schema}.{table}'.format(schema=schema_name, table=name)
self.display_name = display_name if display_name else '{schema}.{table}'.format(schema=schema, table=name)
self.key = key
self.description = description
# todo: use last_updated_timestamp to match the record in metadata
self.last_updated_epoch = int(last_updated_epoch) if last_updated_epoch else None
self.last_updated_timestamp = int(last_updated_timestamp) if last_updated_timestamp else None
self.column_names = column_names
self.column_descriptions = column_descriptions
self.total_usage = total_usage
......
......@@ -20,14 +20,14 @@ class TableLastUpdated(Neo4jCsvSerializable):
def __init__(self,
table_name, # type: str
last_updated_time_epoch, # type: int
schema_name, # type: str
schema, # type: str
db='hive', # type: str
cluster='gold' # type: str
):
# type: (...) -> None
self.table_name = table_name
self.last_updated_time = int(last_updated_time_epoch)
self.schema = schema_name
self.schema = schema
self.db = db
self.cluster = cluster
......@@ -37,7 +37,7 @@ class TableLastUpdated(Neo4jCsvSerializable):
def __repr__(self):
# type: (...) -> str
return \
"""TableLastUpdated(table_name={!r}, last_updated_time={!r}, schema_name={!r}, db={!r}, cluster={!r})"""\
"""TableLastUpdated(table_name={!r}, last_updated_time={!r}, schema={!r}, db={!r}, cluster={!r})"""\
.format(self.table_name, self.last_updated_time, self.schema, self.db, self.cluster)
def create_next_node(self):
......
......@@ -20,14 +20,14 @@ class TableLineage(Neo4jCsvSerializable):
def __init__(self,
db_name, # type: str
schema_name, # type: str
schema, # type: str
table_name, # type: str
cluster, # type: str
downstream_deps=None, # type: List
):
# type: (...) -> None
self.db = db_name.lower()
self.schema = schema_name.lower()
self.schema = schema.lower()
self.table = table_name.lower()
self.cluster = cluster.lower() if cluster else 'gold'
......
......@@ -229,7 +229,7 @@ class TableMetadata(Neo4jCsvSerializable):
def __init__(self,
database, # type: str
cluster, # type: str
schema_name, # type: str
schema, # type: str
name, # type: str
description, # type: Union[str, None]
columns=None, # type: Iterable[ColumnMetadata]
......@@ -242,7 +242,7 @@ class TableMetadata(Neo4jCsvSerializable):
"""
:param database:
:param cluster:
:param schema_name:
:param schema:
:param name:
:param description:
:param columns:
......@@ -253,7 +253,7 @@ class TableMetadata(Neo4jCsvSerializable):
"""
self.database = database
self.cluster = cluster
self.schema_name = schema_name
self.schema = schema
self.name = name
self.description = DescriptionMetadata.create_description_metadata(text=description, source=description_source)
self.columns = columns if columns else []
......@@ -276,7 +276,7 @@ class TableMetadata(Neo4jCsvSerializable):
return 'TableMetadata({!r}, {!r}, {!r}, {!r} ' \
'{!r}, {!r}, {!r}, {!r})'.format(self.database,
self.cluster,
self.schema_name,
self.schema,
self.name,
self.description,
self.columns,
......@@ -287,14 +287,14 @@ class TableMetadata(Neo4jCsvSerializable):
# type: () -> str
return TableMetadata.TABLE_KEY_FORMAT.format(db=self.database,
cluster=self.cluster,
schema=self.schema_name,
schema=self.schema,
tbl=self.name)
def _get_table_description_key(self, description):
# type: (DescriptionMetadata) -> str
return TableMetadata.TABLE_DESCRIPTION_FORMAT.format(db=self.database,
cluster=self.cluster,
schema=self.schema_name,
schema=self.schema,
tbl=self.name,
description_id=description.get_description_id())
......@@ -311,13 +311,13 @@ class TableMetadata(Neo4jCsvSerializable):
# type: () -> str
return TableMetadata.SCHEMA_KEY_FORMAT.format(db=self.database,
cluster=self.cluster,
schema=self.schema_name)
schema=self.schema)
def _get_col_key(self, col):
# type: (ColumnMetadata) -> str
return ColumnMetadata.COLUMN_KEY_FORMAT.format(db=self.database,
cluster=self.cluster,
schema=self.schema_name,
schema=self.schema,
tbl=self.name,
col=col.name)
......@@ -325,7 +325,7 @@ class TableMetadata(Neo4jCsvSerializable):
# type: (ColumnMetadata, DescriptionMetadata) -> str
return ColumnMetadata.COLUMN_DESCRIPTION_FORMAT.format(db=self.database,
cluster=self.cluster,
schema=self.schema_name,
schema=self.schema,
tbl=self.name,
col=col.name,
description_id=description.get_description_id())
......@@ -385,7 +385,7 @@ class TableMetadata(Neo4jCsvSerializable):
name=self.cluster,
label=TableMetadata.CLUSTER_NODE_LABEL),
NodeTuple(key=self._get_schema_key(),
name=self.schema_name,
name=self.schema,
label=TableMetadata.SCHEMA_NODE_LABEL)
]
......
......@@ -17,14 +17,14 @@ class TableOwner(Neo4jCsvSerializable):
def __init__(self,
db_name, # type: str
schema_name, # type: str
schema, # type: str
table_name, # type: str
owners, # type: Union[List, str]
cluster='gold', # type: str
):
# type: (...) -> None
self.db = db_name.lower()
self.schema = schema_name.lower()
self.schema = schema.lower()
self.table = table_name.lower()
if isinstance(owners, str):
owners = owners.split(',')
......
......@@ -19,7 +19,7 @@ class TableSource(Neo4jCsvSerializable):
def __init__(self,
db_name, # type: str
schema_name, # type: str
schema, # type: str
table_name, # type: str
cluster, # type: str
source, # type: str
......@@ -27,7 +27,7 @@ class TableSource(Neo4jCsvSerializable):
):
# type: (...) -> None
self.db = db_name.lower()
self.schema = schema_name.lower()
self.schema = schema.lower()
self.table = table_name.lower()
self.cluster = cluster.lower() if cluster else 'gold'
......
......@@ -27,14 +27,14 @@ class TableColumnStats(Neo4jCsvSerializable):
end_epoch, # type: str
db='hive', # type: str
cluster='gold', # type: str
schema_name=None # type: str
schema=None # type: str
):
# type: (...) -> None
if schema_name is None:
if schema is None:
self.schema, self.table = table_name.split('.')
else:
self.table = table_name.lower()
self.schema = schema_name.lower()
self.schema = schema.lower()
self.db = db
self.col_name = col_name.lower()
self.start_epoch = start_epoch
......
......@@ -9,7 +9,7 @@ class UserESDocument(ElasticsearchDocument):
email, # type: str
first_name, # type: str
last_name, # type: str
name, # type: str
full_name, # type: str
github_username, # type: str
team_name, # type: str
employee_type, # type: str
......@@ -24,7 +24,7 @@ class UserESDocument(ElasticsearchDocument):
self.email = email
self.first_name = first_name
self.last_name = last_name
self.name = name
self.full_name = full_name
self.github_username = github_username
self.team_name = team_name
self.employee_type = employee_type
......
......@@ -20,7 +20,7 @@ class Watermark(Neo4jCsvSerializable):
def __init__(self,
create_time, # type: str
database, # type: str
schema_name, # type: str
schema, # type: str
table_name, # type: str
part_name, # type: str
part_type='high_watermark', # type: str
......@@ -29,7 +29,7 @@ class Watermark(Neo4jCsvSerializable):
# type: (...) -> None
self.create_time = create_time
self.database = database.lower()
self.schema = schema_name.lower()
self.schema = schema.lower()
self.table = table_name.lower()
self.parts = [] # type: list
......
......@@ -51,7 +51,7 @@ class ElasticsearchPublisher(Publisher):
}
}
},
"schema_name": {
"schema": {
"type":"text",
"analyzer": "simple",
"fields": {
......@@ -63,7 +63,7 @@ class ElasticsearchPublisher(Publisher):
"display_name": {
"type": "keyword"
},
"last_updated_epoch": {
"last_updated_timestamp": {
"type": "date",
"format": "epoch_second"
},
......
......@@ -197,7 +197,7 @@ class SqlToTblColUsageTransformer(Transformer):
table_metadata = table_metadata_extractor.extract()
while table_metadata:
# TODO: deal with collision
table_to_schema[table_metadata.name.lower()] = table_metadata.schema_name.lower()
table_to_schema[table_metadata.name.lower()] = table_metadata.schema.lower()
table_metadata = table_metadata_extractor.extract()
return table_to_schema
......
......@@ -63,7 +63,7 @@ def connection_string():
def create_table_wm_job(**kwargs):
sql = textwrap.dedent("""
SELECT From_unixtime(A0.create_time) as create_time,
C0.NAME as schema_name,
C0.NAME as schema,
B0.tbl_name as table_name,
{func}(A0.part_name) as part_name,
{watermark} as part_type
......
task_id,dag_id,exec_date,application_url_template,db_name,schema_name,table_name
task_id,dag_id,exec_date,application_url_template,db_name,schema,table_name
hive.test_schema.test_table1,event_test,"2018-05-31T00:00:00","https://airflow_host.net/admin/airflow/tree?dag_id={dag_id}",hive,test_schema,test_table1
name,description,col_type,sort_order,database,cluster,schema_name,table_name
name,description,col_type,sort_order,database,cluster,schema,table_name
col1,"col1 description","string",1,hive,gold,test_schema,test_table1
col2,"col2 description","string",2,hive,gold,test_schema,test_table1
col3,"col3 description","string",3,hive,gold,test_schema,test_table1
......@@ -9,4 +9,4 @@ col2,"col2 description","string",2,dynamo,gold,test_schema,test_table2
col3,"col3 description","string",3,dynamo,gold,test_schema,test_table2
col4,"col4 description","int",4,dynamo,gold,test_schema,test_table2
col1,"view col description","int",1,hive,gold,test_schema,test_view1
col1,"col1 description","int",1,hive,gold,test_schema,test_table3,""
\ No newline at end of file
col1,"col1 description","int",1,hive,gold,test_schema,test_table3,""
database,cluster,schema_name,table_name,column_name,user_email,read_count
database,cluster,schema,table_name,column_name,user_email,read_count
hive,gold,test_schema,test_table1,col1,roald.amundsen@example.org,100
hive,gold,test_schema,test_table3,col1,aoald0@example.org,10
hive,gold,test_schema,test_table3,col1,boald1@example.org,10
......
db_name,cluster,schema_name,table_name,source,source_type
db_name,cluster,schema,table_name,source,source_type
hive,gold,test_schema,test_table1,"https://github.com/lyft/amundsen/",github
database,cluster,schema_name,name,description,tags,is_view,description_source
database,cluster,schema,name,description,tags,is_view,description_source
hive,gold,test_schema,test_table1,"1st test table","tag1,tag2,pii,high_quality",false,
dynamo,gold,test_schema,test_table2,"2nd test table","high_quality,recommended",false,
hive,gold,test_schema,test_view1,"1st test view","tag1",true,
hive,gold,test_schema,test_table3,"3rd test","needs_documentation",false,
\ No newline at end of file
hive,gold,test_schema,test_table3,"3rd test","needs_documentation",false,
cluster,db,schema_name,table_name,col_name,stat_name,stat_val,start_epoch,end_epoch
cluster,db,schema,table_name,col_name,stat_name,stat_val,start_epoch,end_epoch
gold,hive,test_schema,test_table1,col1,"distinct values","8",1432300762,1562300762
gold,hive,test_schema,test_table1,col1,"min","""aardvark""",1432300762,1562300762
gold,hive,test_schema,test_table1,col1,"max","""zebra""",1432300762,1562300762
......
cluster,db,schema_name,table_name,last_updated_time_epoch
cluster,db,schema,table_name,last_updated_time_epoch
gold,hive,test_schema,test_table1,1570230473
gold,dynamo,test_schema,test_table2,1070230473
db_name,schema_name,cluster,table_name,owners
db_name,schema,cluster,table_name,owners
hive,test_schema,gold,test_table1,"roald.amundsen@example.org,chrisc@example.org"
dynamo,test_schema,gold,test_table2,
create_time,database,schema_name,table_name,part_name,part_type,cluster
create_time,database,schema,table_name,part_name,part_type,cluster
2019-10-01T12:13:14,hive,test_schema,test_table1,col3=2017-04-22/col4=0,LOW_WATERMARK,gold
2019-10-01T12:13:14,hive,test_schema,test_table1,col3=2019-09-30/col4=11,HIGH_WATERMARK,gold
2019-10-01T12:13:14,dynamo,test_schema,test_table2,col3=2018-01-01,LOW_WATERMARK,gold
......
......@@ -259,7 +259,7 @@ if __name__ == "__main__":
}
}
},
"name": {
"full_name": {
"type":"text",
"analyzer": "simple",
"fields": {
......
This diff is collapsed.
......@@ -2,7 +2,7 @@ import os
from setuptools import setup, find_packages
__version__ = '1.6.2'
__version__ = '2.0.0'
requirements_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'requirements.txt')
......
......@@ -43,7 +43,7 @@ class TestAthenaMetadataExtractor(unittest.TestCase):
mock_connection.return_value = connection
sql_execute = MagicMock()
connection.execute = sql_execute
table = {'schema_name': 'test_schema',
table = {'schema': 'test_schema',
'name': 'test_table',
'description': '',
'cluster': self.conf['extractor.athena_metadata.{}'.format(AthenaMetadataExtractor.CATALOG_KEY)],
......@@ -109,19 +109,19 @@ class TestAthenaMetadataExtractor(unittest.TestCase):
mock_connection.return_value = connection
sql_execute = MagicMock()
connection.execute = sql_execute
table = {'schema_name': 'test_schema1',
table = {'schema': 'test_schema1',
'name': 'test_table1',
'description': '',
'cluster': self.conf['extractor.athena_metadata.{}'.format(AthenaMetadataExtractor.CATALOG_KEY)],
}
table1 = {'schema_name': 'test_schema1',
table1 = {'schema': 'test_schema1',
'name': 'test_table2',
'description': '',
'cluster': self.conf['extractor.athena_metadata.{}'.format(AthenaMetadataExtractor.CATALOG_KEY)],
}
table2 = {'schema_name': 'test_schema2',
table2 = {'schema': 'test_schema2',
'name': 'test_table3',
'description': '',
'cluster': self.conf['extractor.athena_metadata.{}'.format(AthenaMetadataExtractor.CATALOG_KEY)],
......
......@@ -187,7 +187,7 @@ class TestBigQueryMetadataExtractor(unittest.TestCase):
self.assertEquals(result.database, 'bigquery')
self.assertEquals(result.cluster, 'your-project-here')
self.assertEquals(result.schema_name, 'fdgdfgh')
self.assertEquals(result.schema, 'fdgdfgh')
self.assertEquals(result.name, 'nested_recs')
self.assertEquals(result.description._text, '')
self.assertEquals(result.columns, [])
......@@ -203,7 +203,7 @@ class TestBigQueryMetadataExtractor(unittest.TestCase):
self.assertEquals(result.database, 'bigquery')
self.assertEquals(result.cluster, 'your-project-here')
self.assertEquals(result.schema_name, 'fdgdfgh')
self.assertEquals(result.schema, 'fdgdfgh')
self.assertEquals(result.name, 'nested_recs')
self.assertEquals(result.description._text, "")
self.assertEquals(result.columns, [])
......@@ -229,7 +229,7 @@ class TestBigQueryMetadataExtractor(unittest.TestCase):
self.assertEquals(result.database, 'bigquery')
self.assertEquals(result.cluster, 'your-project-here')
self.assertEquals(result.schema_name, 'fdgdfgh')
self.assertEquals(result.schema, 'fdgdfgh')
self.assertEquals(result.name, 'nested_recs')
self.assertEquals(result.description._text, "")
......
......@@ -30,4 +30,4 @@ class TestCsvExtractor(unittest.TestCase):
self.assertEquals(result.description._text, '1st test table')
self.assertEquals(result.database, 'hive')
self.assertEquals(result.cluster, 'gold')
self.assertEquals(result.schema_name, 'test_schema')
self.assertEquals(result.schema, 'test_schema')
......@@ -50,10 +50,10 @@ class TestHiveTableLastUpdatedExtractor(unittest.TestCase):
patch.object(HiveTableLastUpdatedExtractor, '_get_non_partitioned_table_sql_alchemy_extractor',
return_value=non_pt_alchemy_extractor_instance):
pt_alchemy_extractor_instance.extract = MagicMock(side_effect=[
{'schema_name': 'foo_schema',
{'schema': 'foo_schema',
'table_name': 'table_1',
'last_updated_time': 1},
{'schema_name': 'foo_schema',
{'schema': 'foo_schema',
'table_name': 'table_2',
'last_updated_time': 2}
])
......@@ -64,11 +64,11 @@ class TestHiveTableLastUpdatedExtractor(unittest.TestCase):
extractor.init(conf)
result = extractor.extract()
expected = TableLastUpdated(schema_name='foo_schema', table_name='table_1', last_updated_time_epoch=1,
expected = TableLastUpdated(schema='foo_schema', table_name='table_1', last_updated_time_epoch=1,
db='hive', cluster='gold')
self.assertEqual(result.__repr__(), expected.__repr__())
result = extractor.extract()
expected = TableLastUpdated(schema_name='foo_schema', table_name='table_2', last_updated_time_epoch=2,
expected = TableLastUpdated(schema='foo_schema', table_name='table_2', last_updated_time_epoch=2,
db='hive', cluster='gold')
self.assertEqual(result.__repr__(), expected.__repr__())
......@@ -100,7 +100,7 @@ class TestHiveTableLastUpdatedExtractor(unittest.TestCase):
pt_alchemy_extractor_instance.extract = MagicMock(return_value=None)
non_pt_alchemy_extractor_instance.extract = MagicMock(side_effect=[
{'schema_name': 'foo_schema',
{'schema': 'foo_schema',
'table_name': 'table_1',
'location': '/foo/bar'}
])
......@@ -109,7 +109,7 @@ class TestHiveTableLastUpdatedExtractor(unittest.TestCase):
extractor.init(ConfigFactory.from_dict({}))
result = extractor.extract()
expected = TableLastUpdated(schema_name='foo_schema', table_name='table_1',
expected = TableLastUpdated(schema='foo_schema', table_name='table_1',
last_updated_time_epoch=1542168723,
db='hive', cluster='gold')
self.assertEqual(result.__repr__(), expected.__repr__())
......
......@@ -40,7 +40,7 @@ class TestHiveTableMetadataExtractor(unittest.TestCase):
mock_connection.return_value = connection
sql_execute = MagicMock()
connection.execute = sql_execute
table = {'schema_name': 'test_schema',
table = {'schema': 'test_schema',
'name': 'test_table',
'description': 'a table for testing'}
......@@ -97,15 +97,15 @@ class TestHiveTableMetadataExtractor(unittest.TestCase):
mock_connection.return_value = connection
sql_execute = MagicMock()
connection.execute = sql_execute
table = {'schema_name': 'test_schema1',
table = {'schema': 'test_schema1',
'name': 'test_table1',
'description': 'test table 1'}
table1 = {'schema_name': 'test_schema1',
table1 = {'schema': 'test_schema1',
'name': 'test_table2',
'description': 'test table 2'}
table2 = {'schema_name': 'test_schema2',
table2 = {'schema': 'test_schema2',
'name': 'test_table3',
'description': 'test table 3'}
......
......@@ -102,12 +102,12 @@ class TestNeo4jExtractor(unittest.TestCase):
result_dict = dict(database='test_database',
cluster='test_cluster',
schema_name='test_schema',
schema='test_schema',
name='test_table_name',
display_name='test_schema.test_table_name',
key='test_table_key',
description='test_table_description',
last_updated_epoch=123456789,
last_updated_timestamp=123456789,
column_names=['test_col1', 'test_col2', 'test_col3'],
column_descriptions=['test_description1', 'test_description2', ''],
total_usage=100,
......
......@@ -46,7 +46,7 @@ class TestPostgresMetadataExtractor(unittest.TestCase):
mock_connection.return_value = connection
sql_execute = MagicMock()
connection.execute = sql_execute
table = {'schema_name': 'test_schema',
table = {'schema': 'test_schema',
'name': 'test_table',
'description': 'a table for testing',
'cluster':
......@@ -107,21 +107,21 @@ class TestPostgresMetadataExtractor(unittest.TestCase):
mock_connection.return_value = connection
sql_execute = MagicMock()
connection.execute = sql_execute
table = {'schema_name': 'test_schema1',
table = {'schema': 'test_schema1',
'name': 'test_table1',
'description': 'test table 1',
'cluster':
self.conf['extractor.postgres_metadata.{}'.format(PostgresMetadataExtractor.CLUSTER_KEY)]
}
table1 = {'schema_name': 'test_schema1',
table1 = {'schema': 'test_schema1',
'name': 'test_table2',
'description': 'test table 2',
'cluster':
self.conf['extractor.postgres_metadata.{}'.format(PostgresMetadataExtractor.CLUSTER_KEY)]
}
table2 = {'schema_name': 'test_schema2',
table2 = {'schema': 'test_schema2',
'name': 'test_table3',
'description': 'test table 3',
'cluster':
......
......@@ -54,12 +54,12 @@ class TestPrestoViewMetadataExtractor(unittest.TestCase):
sql_execute.return_value = [
{'tbl_id': 2,
'schema_name': 'test_schema2',
'schema': 'test_schema2',
'name': 'test_view2',
'tbl_type': 'virtual_view',
'view_original_text': base64.b64encode(json.dumps(columns2).encode()).decode("utf-8")},
{'tbl_id': 1,
'schema_name': 'test_schema1',
'schema': 'test_schema1',
'name': 'test_view1',
'tbl_type': 'virtual_view',
'view_original_text': base64.b64encode(json.dumps(columns1).encode()).decode("utf-8")},
......
......@@ -46,7 +46,7 @@ class TestSnowflakeMetadataExtractor(unittest.TestCase):
mock_connection.return_value = connection
sql_execute = MagicMock()
connection.execute = sql_execute
table = {'schema_name': 'test_schema',
table = {'schema': 'test_schema',
'name': 'test_table',
'description': 'a table for testing',
'cluster':
......@@ -109,7 +109,7 @@ class TestSnowflakeMetadataExtractor(unittest.TestCase):
mock_connection.return_value = connection
sql_execute = MagicMock()
connection.execute = sql_execute
table = {'schema_name': 'test_schema1',
table = {'schema': 'test_schema1',
'name': 'test_table1',
'description': 'test table 1',
'cluster':
......@@ -117,7 +117,7 @@ class TestSnowflakeMetadataExtractor(unittest.TestCase):
'is_view': 'nottrue'
}
table1 = {'schema_name': 'test_schema1',
table1 = {'schema': 'test_schema1',
'name': 'test_table2',
'description': 'test table 2',
'cluster':
......@@ -125,7 +125,7 @@ class TestSnowflakeMetadataExtractor(unittest.TestCase):
'is_view': 'false'
}
table2 = {'schema_name': 'test_schema2',
table2 = {'schema': 'test_schema2',
'name': 'test_table3',
'description': 'test table 3',
'cluster':
......
......@@ -63,10 +63,10 @@ class TestFSElasticsearchJSONLoader(unittest.TestCase):
data = dict(database='test_database',
cluster='test_cluster',
schema_name='test_schema',
schema='test_schema',
name='test_table',
key='test_table_key',
last_updated_epoch=123456789,
last_updated_timestamp=123456789,
description='test_description',
column_names=['test_col1', 'test_col2'],
column_descriptions=['test_comment1', 'test_comment2'],
......@@ -91,10 +91,10 @@ class TestFSElasticsearchJSONLoader(unittest.TestCase):
data = TableESDocument(database='test_database',
cluster='test_cluster',
schema_name='test_schema',
schema='test_schema',
name='test_table',
key='test_table_key',
last_updated_epoch=123456789,
last_updated_timestamp=123456789,
description='test_description',
column_names=['test_col1', 'test_col2'],
column_descriptions=['test_comment1', 'test_comment2'],
......@@ -106,9 +106,9 @@ class TestFSElasticsearchJSONLoader(unittest.TestCase):
expected = [
('{"key": "test_table_key", "column_descriptions": ["test_comment1", "test_comment2"], '
'"schema_name": "test_schema", "database": "test_database", "cluster": "test_cluster", '
'"schema": "test_schema", "database": "test_database", "cluster": "test_cluster", '
'"column_names": ["test_col1", "test_col2"], "name": "test_table", '
'"last_updated_epoch": 123456789, "display_name": "test_schema.test_table", '
'"last_updated_timestamp": 123456789, "display_name": "test_schema.test_table", '
'"description": "test_description", "unique_usage": 5, "total_usage": 10, '
'"tags": ["test_tag1", "test_tag2"]}')
]
......@@ -127,10 +127,10 @@ class TestFSElasticsearchJSONLoader(unittest.TestCase):
data = [TableESDocument(database='test_database',
cluster='test_cluster',
schema_name='test_schema',
schema='test_schema',
name='test_table',
key='test_table_key',
last_updated_epoch=123456789,
last_updated_timestamp=123456789,
description='test_description',
column_names=['test_col1', 'test_col2'],
column_descriptions=['test_comment1', 'test_comment2'],
......@@ -144,9 +144,9 @@ class TestFSElasticsearchJSONLoader(unittest.TestCase):
expected = [
('{"key": "test_table_key", "column_descriptions": ["test_comment1", "test_comment2"], '
'"schema_name": "test_schema", "database": "test_database", "cluster": "test_cluster", '
'"schema": "test_schema", "database": "test_database", "cluster": "test_cluster", '
'"column_names": ["test_col1", "test_col2"], "name": "test_table", '
'"last_updated_epoch": 123456789, "display_name": "test_schema.test_table", '
'"last_updated_timestamp": 123456789, "display_name": "test_schema.test_table", '
'"description": "test_description", "unique_usage": 5, "total_usage": 10, '
'"tags": ["test_tag1", "test_tag2"]}')
] * 5
......
......@@ -16,7 +16,7 @@ class TestApplication(unittest.TestCase):
self.application = Application(task_id='hive.default.test_table',
dag_id='event_test',
schema_name='default',
schema='default',
table_name='test_table',
application_url_template='airflow_host.net/admin/airflow/tree?dag_id={dag_id}')
......
......@@ -20,7 +20,7 @@ class TestHiveWatermark(unittest.TestCase):
# type: () -> None
super(TestHiveWatermark, self).setUp()
self.watermark = HiveWatermark(create_time='2017-09-18T00:00:00',
schema_name=SCHEMA,
schema=SCHEMA,
table_name=TABLE,
cluster=CLUSTER,
part_type=PART_TYPE,
......
......@@ -13,10 +13,10 @@ class TestTableElasticsearchDocument(unittest.TestCase):
"""
test_obj = TableESDocument(database='test_database',
cluster='test_cluster',
schema_name='test_schema',
schema='test_schema',
name='test_table',
key='test_table_key',
last_updated_epoch=123456789,
last_updated_timestamp=123456789,
description='test_table_description',
column_names=['test_col1', 'test_col2'],
column_descriptions=['test_description1', 'test_description2'],
......@@ -26,11 +26,11 @@ class TestTableElasticsearchDocument(unittest.TestCase):
expected_document_dict = {"database": "test_database",
"cluster": "test_cluster",
"schema_name": "test_schema",
"schema": "test_schema",
"name": "test_table",
"display_name": "test_schema.test_table",
"key": "test_table_key",
"last_updated_epoch": 123456789,
"last_updated_timestamp": 123456789,
"description": "test_table_description",
"column_names": ["test_col1", "test_col2"],
"column_descriptions": ["test_description1", "test_description2"],
......
......@@ -14,7 +14,7 @@ class TestTableLastUpdated(unittest.TestCase):
self.tableLastUpdated = TableLastUpdated(table_name='test_table',
last_updated_time_epoch=25195665,
schema_name='default')
schema='default')
self.expected_node_result = {
NODE_KEY: 'hive://gold.default/test_table/timestamp',
......
......@@ -17,7 +17,7 @@ class TestTableLineage(unittest.TestCase):
# type: () -> None
super(TestTableLineage, self).setUp()
self.table_lineage = TableLineage(db_name='hive',
schema_name=SCHEMA,
schema=SCHEMA,
table_name=TABLE,
cluster=CLUSTER,
downstream_deps=['hive://default.test_schema/test_table1',
......
......@@ -22,7 +22,7 @@ class TestTableOwner(unittest.TestCase):
# type: () -> None
super(TestTableOwner, self).setUp()
self.table_owner = TableOwner(db_name='hive',
schema_name=SCHEMA,
schema=SCHEMA,
table_name=TABLE,
cluster=CLUSTER,
owners="user1@1, UsER2@2 ")
......@@ -84,7 +84,7 @@ class TestTableOwner(unittest.TestCase):
def test_create_nodes_with_owners_list(self):
# type: () -> None
self.table_owner_list = TableOwner(db_name='hive',
schema_name=SCHEMA,
schema=SCHEMA,
table_name=TABLE,
cluster=CLUSTER,
owners=['user1@1', ' UsER2@2 '])
......
......@@ -18,7 +18,7 @@ class TestTableSource(unittest.TestCase):
# type: () -> None
super(TestTableSource, self).setUp()
self.table_source = TableSource(db_name='hive',
schema_name=SCHEMA,
schema=SCHEMA,
table_name=TABLE,
cluster=CLUSTER,
source=SOURCE)
......
......@@ -14,7 +14,7 @@ class TestUserElasticsearchDocument(unittest.TestCase):
test_obj = UserESDocument(email='test@email.com',
first_name='test_firstname',
last_name='test_lastname',
name='full_name',
full_name='full_name',
github_username='github_user',
team_name='team',
employee_type='fte',
......@@ -27,7 +27,7 @@ class TestUserElasticsearchDocument(unittest.TestCase):
expected_document_dict = {"first_name": "test_firstname",
"last_name": "test_lastname",
"name": "full_name",
"full_name": "full_name",
"team_name": "team",
"total_follow": 1,
"total_read": 2,
......
......@@ -22,7 +22,7 @@ class TestWatermark(unittest.TestCase):
super(TestWatermark, self).setUp()
self.watermark = Watermark(create_time='2017-09-18T00:00:00',
database=DATABASE,
schema_name=SCHEMA,
schema=SCHEMA,
table_name=TABLE,
cluster=CLUSTER,
part_type=PART_TYPE,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment