Unverified Commit 9a261695 authored by Tao Feng's avatar Tao Feng Committed by GitHub

Support table/col tag in table metadata model (#75)

parent 975d442a
...@@ -169,4 +169,4 @@ class KafkaSourceExtractor(Extractor, Callback): ...@@ -169,4 +169,4 @@ class KafkaSourceExtractor(Extractor, Callback):
def get_scope(self): def get_scope(self):
# type: () -> str # type: () -> str
return 'extractor.kafka_source_extractor' return 'extractor.kafka_source'
...@@ -11,6 +11,24 @@ from databuilder.publisher.neo4j_csv_publisher import UNQUOTED_SUFFIX ...@@ -11,6 +11,24 @@ from databuilder.publisher.neo4j_csv_publisher import UNQUOTED_SUFFIX
DESCRIPTION_NODE_LABEL = 'Description' DESCRIPTION_NODE_LABEL = 'Description'
class TagMetadata:
TAG_NODE_LABEL = 'Tag'
TAG_KEY_FORMAT = '{tag}'
TAG_TYPE = 'tag_type'
def __init__(self,
name, # type: str,
tag_type='default', # type: str
):
self._name = name
self._tag_type = tag_type
@staticmethod
def get_tag_key(name):
# type: (str) -> str
return TagMetadata.TAG_KEY_FORMAT.format(tag=name)
class ColumnMetadata: class ColumnMetadata:
COLUMN_NODE_LABEL = 'Column' COLUMN_NODE_LABEL = 'Column'
COLUMN_KEY_FORMAT = '{db}://{cluster}.{schema}/{tbl}/{col}' COLUMN_KEY_FORMAT = '{db}://{cluster}.{schema}/{tbl}/{col}'
...@@ -24,11 +42,16 @@ class ColumnMetadata: ...@@ -24,11 +42,16 @@ class ColumnMetadata:
COL_DESCRIPTION_RELATION_TYPE = 'DESCRIPTION' COL_DESCRIPTION_RELATION_TYPE = 'DESCRIPTION'
DESCRIPTION_COL_RELATION_TYPE = 'DESCRIPTION_OF' DESCRIPTION_COL_RELATION_TYPE = 'DESCRIPTION_OF'
# Relation between column and tag
COL_TAG_RELATION_TYPE = 'TAGGED_BY'
TAG_COL_RELATION_TYPE = 'TAG'
def __init__(self, def __init__(self,
name, # type: str name, # type: str
description, # type: Union[str, None] description, # type: Union[str, None]
col_type, # type: str col_type, # type: str
sort_order, # type: int sort_order, # type: int
tags=None, # Union[List[str], None]
): ):
# type: (...) -> None # type: (...) -> None
""" """
...@@ -42,6 +65,7 @@ class ColumnMetadata: ...@@ -42,6 +65,7 @@ class ColumnMetadata:
self.description = description self.description = description
self.type = col_type self.type = col_type
self.sort_order = sort_order self.sort_order = sort_order
self.tags = tags
def __repr__(self): def __repr__(self):
# type: () -> str # type: () -> str
...@@ -95,6 +119,9 @@ class TableMetadata(Neo4jCsvSerializable): ...@@ -95,6 +119,9 @@ class TableMetadata(Neo4jCsvSerializable):
TABLE_COL_RELATION_TYPE = 'COLUMN' TABLE_COL_RELATION_TYPE = 'COLUMN'
COL_TABLE_RELATION_TYPE = 'COLUMN_OF' COL_TABLE_RELATION_TYPE = 'COLUMN_OF'
TABLE_TAG_RELATION_TYPE = 'TAGGED_BY'
TAG_TABLE_RELATION_TYPE = 'TAG'
# Only for deduping database, cluster, and schema (table and column will be always processed) # Only for deduping database, cluster, and schema (table and column will be always processed)
serialized_nodes = set() # type: Set[Any] serialized_nodes = set() # type: Set[Any]
serialized_rels = set() # type: Set[Any] serialized_rels = set() # type: Set[Any]
...@@ -107,6 +134,7 @@ class TableMetadata(Neo4jCsvSerializable): ...@@ -107,6 +134,7 @@ class TableMetadata(Neo4jCsvSerializable):
description, # type: Union[str, None] description, # type: Union[str, None]
columns=None, # type: Iterable[ColumnMetadata] columns=None, # type: Iterable[ColumnMetadata]
is_view=False, # type: bool is_view=False, # type: bool
tags=None, # type: List
**kwargs # type: Dict **kwargs # type: Dict
): ):
# type: (...) -> None # type: (...) -> None
...@@ -129,6 +157,8 @@ class TableMetadata(Neo4jCsvSerializable): ...@@ -129,6 +157,8 @@ class TableMetadata(Neo4jCsvSerializable):
self.columns = columns if columns else [] self.columns = columns if columns else []
self.is_view = is_view self.is_view = is_view
self.attrs = None self.attrs = None
self.tags = tags
if kwargs: if kwargs:
self.attrs = copy.deepcopy(kwargs) self.attrs = copy.deepcopy(kwargs)
...@@ -197,7 +227,7 @@ class TableMetadata(Neo4jCsvSerializable): ...@@ -197,7 +227,7 @@ class TableMetadata(Neo4jCsvSerializable):
except StopIteration: except StopIteration:
return None return None
def _create_next_node(self): def _create_next_node(self): # noqa: C901
# type: () -> Iterator[Any] # type: () -> Iterator[Any]
table_node = {NODE_LABEL: TableMetadata.TABLE_NODE_LABEL, table_node = {NODE_LABEL: TableMetadata.TABLE_NODE_LABEL,
...@@ -215,6 +245,13 @@ class TableMetadata(Neo4jCsvSerializable): ...@@ -215,6 +245,13 @@ class TableMetadata(Neo4jCsvSerializable):
NODE_KEY: self._get_table_description_key(), NODE_KEY: self._get_table_description_key(),
TableMetadata.TABLE_DESCRIPTION: self.description} TableMetadata.TABLE_DESCRIPTION: self.description}
# Create the table tag node
if self.tags:
for tag in self.tags:
yield {NODE_LABEL: TagMetadata.TAG_NODE_LABEL,
NODE_KEY: TagMetadata.get_tag_key(tag),
TagMetadata.TAG_TYPE: 'default'}
for col in self.columns: for col in self.columns:
yield { yield {
NODE_LABEL: ColumnMetadata.COLUMN_NODE_LABEL, NODE_LABEL: ColumnMetadata.COLUMN_NODE_LABEL,
...@@ -231,6 +268,14 @@ class TableMetadata(Neo4jCsvSerializable): ...@@ -231,6 +268,14 @@ class TableMetadata(Neo4jCsvSerializable):
NODE_KEY: self._get_col_description_key(col), NODE_KEY: self._get_col_description_key(col),
ColumnMetadata.COLUMN_DESCRIPTION: col.description} ColumnMetadata.COLUMN_DESCRIPTION: col.description}
if not col.tags:
continue
for tag in col.tags:
yield {NODE_LABEL: TagMetadata.TAG_NODE_LABEL,
NODE_KEY: TagMetadata.get_tag_key(tag),
TagMetadata.TAG_TYPE: 'default'}
# Database, cluster, schema # Database, cluster, schema
others = [NodeTuple(key=self._get_database_key(), others = [NodeTuple(key=self._get_database_key(),
name=self.database, name=self.database,
...@@ -281,6 +326,17 @@ class TableMetadata(Neo4jCsvSerializable): ...@@ -281,6 +326,17 @@ class TableMetadata(Neo4jCsvSerializable):
RELATION_REVERSE_TYPE: TableMetadata.DESCRIPTION_TABLE_RELATION_TYPE RELATION_REVERSE_TYPE: TableMetadata.DESCRIPTION_TABLE_RELATION_TYPE
} }
if self.tags:
for tag in self.tags:
yield {
RELATION_START_LABEL: TableMetadata.TABLE_NODE_LABEL,
RELATION_END_LABEL: TagMetadata.TAG_NODE_LABEL,
RELATION_START_KEY: self._get_table_key(),
RELATION_END_KEY: TagMetadata.get_tag_key(tag),
RELATION_TYPE: TableMetadata.TABLE_TAG_RELATION_TYPE,
RELATION_REVERSE_TYPE: TableMetadata.TAG_TABLE_RELATION_TYPE,
}
for col in self.columns: for col in self.columns:
yield { yield {
RELATION_START_LABEL: TableMetadata.TABLE_NODE_LABEL, RELATION_START_LABEL: TableMetadata.TABLE_NODE_LABEL,
...@@ -303,6 +359,19 @@ class TableMetadata(Neo4jCsvSerializable): ...@@ -303,6 +359,19 @@ class TableMetadata(Neo4jCsvSerializable):
RELATION_REVERSE_TYPE: ColumnMetadata.DESCRIPTION_COL_RELATION_TYPE RELATION_REVERSE_TYPE: ColumnMetadata.DESCRIPTION_COL_RELATION_TYPE
} }
if not col.tags:
continue
for tag in col.tags:
yield {
RELATION_START_LABEL: TableMetadata.TABLE_NODE_LABEL,
RELATION_END_LABEL: TagMetadata.TAG_NODE_LABEL,
RELATION_START_KEY: self._get_table_key(),
RELATION_END_KEY: TagMetadata.get_tag_key(tag),
RELATION_TYPE: ColumnMetadata.COL_TAG_RELATION_TYPE,
RELATION_REVERSE_TYPE: ColumnMetadata.TAG_COL_RELATION_TYPE,
}
others = [ others = [
RelTuple(start_label=TableMetadata.DATABASE_NODE_LABEL, RelTuple(start_label=TableMetadata.DATABASE_NODE_LABEL,
end_label=TableMetadata.CLUSTER_NODE_LABEL, end_label=TableMetadata.CLUSTER_NODE_LABEL,
......
from setuptools import setup, find_packages from setuptools import setup, find_packages
__version__ = '1.2.4' __version__ = '1.2.5'
setup( setup(
......
...@@ -13,12 +13,12 @@ class TestKafkaSourceExtractor(unittest.TestCase): ...@@ -13,12 +13,12 @@ class TestKafkaSourceExtractor(unittest.TestCase):
# type: () -> None # type: () -> None
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
config_dict = { config_dict = {
'extractor.kafka_source_extractor.consumer_config': {'"group.id"': 'consumer-group', 'extractor.kafka_source.consumer_config': {'"group.id"': 'consumer-group',
'"enable.auto.commit"': False}, '"enable.auto.commit"': False},
'extractor.kafka_source_extractor.{}'.format(KafkaSourceExtractor.RAW_VALUE_TRANSFORMER): 'extractor.kafka_source.{}'.format(KafkaSourceExtractor.RAW_VALUE_TRANSFORMER):
'databuilder.transformer.base_transformer.NoopTransformer', 'databuilder.transformer.base_transformer.NoopTransformer',
'extractor.kafka_source_extractor.{}'.format(KafkaSourceExtractor.TOPIC_NAME_LIST): ['test-topic'], 'extractor.kafka_source.{}'.format(KafkaSourceExtractor.TOPIC_NAME_LIST): ['test-topic'],
'extractor.kafka_source_extractor.{}'.format(KafkaSourceExtractor.CONSUMER_TOTAL_TIMEOUT_SEC): 1, 'extractor.kafka_source.{}'.format(KafkaSourceExtractor.CONSUMER_TOTAL_TIMEOUT_SEC): 1,
} }
self.conf = ConfigFactory.from_dict(config_dict) self.conf = ConfigFactory.from_dict(config_dict)
......
import copy import copy
import unittest import unittest
from databuilder.models.table_metadata import TableMetadata, ColumnMetadata from databuilder.models.table_metadata import ColumnMetadata, TableMetadata
class TestTableMetadata(unittest.TestCase): class TestTableMetadata(unittest.TestCase):
...@@ -31,6 +31,10 @@ class TestTableMetadata(unittest.TestCase): ...@@ -31,6 +31,10 @@ class TestTableMetadata(unittest.TestCase):
ColumnMetadata('etl_created_at', 'description of etl_created_at', 'timestamp', 4), ColumnMetadata('etl_created_at', 'description of etl_created_at', 'timestamp', 4),
ColumnMetadata('ds', None, 'varchar', 5)], is_view=False, attr1='uri', attr2='attr2') ColumnMetadata('ds', None, 'varchar', 5)], is_view=False, attr1='uri', attr2='attr2')
self.table_metadata4 = TableMetadata('hive', 'gold', 'test_schema4', 'test_table4', 'test_table4', [
ColumnMetadata('test_id1', 'description of test_table1', 'bigint', 0, ['col-tag1', 'col-tag2'])],
is_view=False, tags=['tag1', 'tag2'], attr1='uri', attr2='attr2')
self.expected_nodes_deduped = [ self.expected_nodes_deduped = [
{'name': 'test_table1', 'KEY': 'hive://gold.test_schema1/test_table1', 'LABEL': 'Table', {'name': 'test_table1', 'KEY': 'hive://gold.test_schema1/test_table1', 'LABEL': 'Table',
'is_view:UNQUOTED': False}, 'is_view:UNQUOTED': False},
...@@ -140,14 +144,57 @@ class TestTableMetadata(unittest.TestCase): ...@@ -140,14 +144,57 @@ class TestTableMetadata(unittest.TestCase):
self.assertEqual(self.expected_rels_deduped, actual) self.assertEqual(self.expected_rels_deduped, actual)
# Test additional K/V Attributes
node_row = self.table_metadata3.next_node() node_row = self.table_metadata3.next_node()
t2_actual = [] actual = []
while node_row: while node_row:
t2_actual.append(node_row) actual.append(node_row)
node_row = self.table_metadata3.next_node() node_row = self.table_metadata3.next_node()
self.assertEqual(t2_actual[0].get('attr1'), 'uri') self.assertEqual(actual[0].get('attr1'), 'uri')
self.assertEqual(t2_actual[0].get('attr2'), 'attr2') self.assertEqual(actual[0].get('attr2'), 'attr2')
# Test tag field
node_row = self.table_metadata4.next_node()
actual = []
while node_row:
actual.append(node_row)
node_row = self.table_metadata4.next_node()
self.assertEqual(actual[0].get('attr1'), 'uri')
self.assertEqual(actual[0].get('attr2'), 'attr2')
self.assertEqual(actual[2].get('LABEL'), 'Tag')
self.assertEqual(actual[2].get('KEY'), 'tag1')
self.assertEqual(actual[3].get('KEY'), 'tag2')
self.assertEqual(actual[6].get('KEY'), 'col-tag1')
self.assertEqual(actual[7].get('KEY'), 'col-tag2')
relation_row = self.table_metadata4.next_relation()
actual = []
while relation_row:
actual.append(relation_row)
relation_row = self.table_metadata4.next_relation()
# Table tag relationship
expected_tab_tag_rel1 = {'END_KEY': 'tag1', 'START_LABEL': 'Table', 'END_LABEL':
'Tag', 'START_KEY': 'hive://gold.test_schema4/test_table4',
'TYPE': 'TAGGED_BY', 'REVERSE_TYPE': 'TAG'}
expected_tab_tag_rel2 = {'END_KEY': 'tag2', 'START_LABEL': 'Table',
'END_LABEL': 'Tag', 'START_KEY': 'hive://gold.test_schema4/test_table4',
'TYPE': 'TAGGED_BY', 'REVERSE_TYPE': 'TAG'}
expected_col_tag_rel1 = {'END_KEY': 'col-tag1', 'START_LABEL': 'Table',
'END_LABEL': 'Tag',
'START_KEY': 'hive://gold.test_schema4/test_table4',
'TYPE': 'TAGGED_BY', 'REVERSE_TYPE': 'TAG'}
expected_col_tag_rel2 = {'END_KEY': 'col-tag2', 'START_LABEL': 'Table',
'END_LABEL': 'Tag',
'START_KEY': 'hive://gold.test_schema4/test_table4',
'TYPE': 'TAGGED_BY', 'REVERSE_TYPE': 'TAG'}
self.assertEqual(actual[2], expected_tab_tag_rel1)
self.assertEqual(actual[3], expected_tab_tag_rel2)
self.assertEqual(actual[6], expected_col_tag_rel1)
self.assertEqual(actual[7], expected_col_tag_rel2)
if __name__ == '__main__': if __name__ == '__main__':
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment