Unverified Commit 1e01e8c7 authored by samshuster's avatar samshuster Committed by GitHub

Completely different badge approach. We can use a separate file to define what...

Completely different badge approach. We can use a separate file to define what type a particular tag is in a more normalized vs denormalized fashion. (#185)

Required having TagMetadata be also Neo4jCsvSerializable
Refactored Tag Node creation to live inside TagMetadata.
parent 160406b0
......@@ -11,10 +11,14 @@ from databuilder.publisher.neo4j_csv_publisher import UNQUOTED_SUFFIX
DESCRIPTION_NODE_LABEL = 'Description'
class TagMetadata:
class TagMetadata(Neo4jCsvSerializable):
TAG_NODE_LABEL = 'Tag'
TAG_KEY_FORMAT = '{tag}'
TAG_TYPE = 'tag_type'
DEFAULT_TYPE = 'default'
BADGE_TYPE = 'badge'
DASHBOARD_TYPE = 'dashboard'
METRIC_TYPE = 'metric'
def __init__(self,
name, # type: str,
......@@ -22,6 +26,8 @@ class TagMetadata:
):
self._name = name
self._tag_type = tag_type
self._nodes = iter([self.create_tag_node(self._name, self._tag_type)])
self._relations = iter([])
@staticmethod
def get_tag_key(name):
......@@ -30,6 +36,28 @@ class TagMetadata:
return ''
return TagMetadata.TAG_KEY_FORMAT.format(tag=name)
@staticmethod
def create_tag_node(name, tag_type=DEFAULT_TYPE):
return {NODE_LABEL: TagMetadata.TAG_NODE_LABEL,
NODE_KEY: TagMetadata.get_tag_key(name),
TagMetadata.TAG_TYPE: tag_type}
def create_next_node(self):
# type: (...) -> Union[Dict[str, Any], None]
# return the string representation of the data
try:
return next(self._nodes)
except StopIteration:
return None
def create_next_relation(self):
# type: () -> Union[Dict[str, Any], None]
# We don't emit any relations for Tag ingestion
try:
return next(self._relations)
except StopIteration:
return None
class ColumnMetadata:
COLUMN_NODE_LABEL = 'Column'
......@@ -256,9 +284,7 @@ class TableMetadata(Neo4jCsvSerializable):
# Create the table tag node
if self.tags:
for tag in self.tags:
yield {NODE_LABEL: TagMetadata.TAG_NODE_LABEL,
NODE_KEY: TagMetadata.get_tag_key(tag),
TagMetadata.TAG_TYPE: 'default'}
yield TagMetadata.create_tag_node(tag)
for col in self.columns:
yield {
......
database,cluster,schema_name,name,description,tags
hive,gold,test_schema,test_table1,"1st test table","tag1,tag2"
dynamo,gold,test_schema,test_table2,"2nd test table",
hive,gold,test_schema,test_table1,"1st test table","tag1,tag2,pii,high_quality"
dynamo,gold,test_schema,test_table2,"2nd test table","high_quality,recommended"
name,tag_type
pii,badge
high_quality,badge
\ No newline at end of file
......@@ -86,6 +86,24 @@ def load_table_data_from_csv(file_name):
conn.commit()
def load_tag_data_from_csv(file_name):
conn = create_connection(DB_FILE)
if conn:
cur = conn.cursor()
cur.execute('drop table if exists test_tag_metadata')
cur.execute('create table if not exists test_tag_metadata '
'(name VARCHAR(64) NOT NULL , '
'tag_type VARCHAR(64) NOT NULL)')
file_loc = 'example/sample_data/' + file_name
with open(file_loc, 'r') as fin:
dr = csv.DictReader(fin)
to_db = [(i['name'],
i['tag_type']) for i in dr]
cur.executemany("INSERT INTO test_tag_metadata (name, tag_type) VALUES (?, ?);", to_db)
conn.commit()
def load_col_data_from_csv(file_name):
conn = create_connection(DB_FILE)
if conn:
......@@ -428,7 +446,6 @@ def create_last_updated_job():
job_config = ConfigFactory.from_dict({
'extractor.neo4j_es_last_updated.model_class':
'databuilder.models.neo4j_es_last_updated.Neo4jESLastUpdated',
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.NODE_DIR_PATH):
node_files_folder,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.RELATION_DIR_PATH):
......@@ -530,6 +547,7 @@ if __name__ == "__main__":
load_user_data_from_csv('sample_user.csv')
load_application_data_from_csv('sample_application.csv')
load_source_data_from_csv('sample_source.csv')
load_tag_data_from_csv('sample_tags.csv')
load_test_last_updated_data_from_csv('sample_table_last_updated.csv')
if create_connection(DB_FILE):
......@@ -578,6 +596,10 @@ if __name__ == "__main__":
'databuilder.models.table_source.TableSource')
job_source.launch()
job_tag = create_sample_job('test_tag_metadata',
'databuilder.models.table_metadata.TagMetadata')
job_tag.launch()
# start job_source job
job_table_last_updated = create_sample_job('test_table_last_updated_metadata',
'databuilder.models.table_last_updated.TableLastUpdated')
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment