Commit 6c6073c4 authored by Shaun Elliott's avatar Shaun Elliott Committed by Tao Feng

Fix for ISSUE 98 - add example for usage data (#100)

* Fix for ISSUE 98 - add example for usage data

* Issue 27 linting fixes.

* Update test_column_usage_model.py

* Update sample_data_loader.py
parent 319ebcc1
from typing import Union, Dict, Any, Iterator # noqa: F401
from databuilder.models.neo4j_csv_serde import (
Neo4jCsvSerializable, RELATION_START_KEY, RELATION_END_KEY,
RELATION_START_LABEL, RELATION_END_LABEL, RELATION_TYPE, RELATION_REVERSE_TYPE
)
from databuilder.models.table_metadata import TableMetadata
from databuilder.models.user import User
from databuilder.publisher.neo4j_csv_publisher import UNQUOTED_SUFFIX
class TestColumnUsageModel(Neo4jCsvSerializable):
"""
A model represents user <--> column graph model
Currently it only support to serialize to table level
"""
TABLE_NODE_LABEL = TableMetadata.TABLE_NODE_LABEL
TABLE_NODE_KEY_FORMAT = TableMetadata.TABLE_KEY_FORMAT
USER_TABLE_RELATION_TYPE = 'READ'
TABLE_USER_RELATION_TYPE = 'READ_BY'
# Property key for relationship read, readby relationship
READ_RELATION_COUNT = 'read_count{}'.format(UNQUOTED_SUFFIX)
def __init__(self,
database, # type: str
cluster, # type: str
schema_name, # type: str
table_name, # type: str
column_name, # type: str
user_email, # type: str
read_count, # type: int
):
# type: (...) -> None
self.database = database
self.cluster = cluster
self.schema_name = schema_name
self.table_name = table_name
self.column_name = column_name
self.user_email = user_email
self.read_count = read_count
self._node_iter = iter(self.create_nodes())
self._relation_iter = iter(self.create_relation())
def create_next_node(self):
# type: () -> Union[Dict[str, Any], None]
try:
return next(self._node_iter)
except StopIteration:
return None
def create_nodes(self):
# type: () -> List[Dict[str, Any]]
"""
Create a list of Neo4j node records
:return:
"""
return User(email=self.user_email).create_nodes()
def create_next_relation(self):
# type: () -> Union[Dict[str, Any], None]
try:
return next(self._relation_iter)
except StopIteration:
return None
def create_relation(self):
# type: () -> Iterator[Any]
return [{
RELATION_START_LABEL: TableMetadata.TABLE_NODE_LABEL,
RELATION_END_LABEL: User.USER_NODE_LABEL,
RELATION_START_KEY: self._get_table_key(),
RELATION_END_KEY: self._get_user_key(self.user_email),
RELATION_TYPE: TestColumnUsageModel.TABLE_USER_RELATION_TYPE,
RELATION_REVERSE_TYPE: TestColumnUsageModel.USER_TABLE_RELATION_TYPE,
TestColumnUsageModel.READ_RELATION_COUNT: self.read_count
}]
def _get_table_key(self):
# type: (ColumnReader) -> str
return TableMetadata.TABLE_KEY_FORMAT.format(db=self.database,
cluster=self.cluster,
schema=self.schema_name,
tbl=self.table_name)
def _get_user_key(self, email):
# type: (str) -> str
return User.get_user_model_key(email=email)
def __repr__(self):
# type: () -> str
return 'TableColumnUsage(col_readers={!r})'.format(self.col_readers)
database,cluster,schema_name,table_name,column_name,user_email,read_count
hive,gold,test_schema,test_table1,col1,roald.amundsen@example.org,100
dynamo,gold,test_schema,test_table2,col1,chrisc@example.org,500
\ No newline at end of file
......@@ -218,6 +218,38 @@ def create_sample_job(table_name, model_name, transformer=NoopTransformer()):
return job
def load_usage_data_from_csv(file_name):
# Load usage data
conn = create_connection(DB_FILE)
if conn:
cur = conn.cursor()
cur.execute('drop table if exists test_usage_metadata')
cur.execute('create table if not exists test_usage_metadata '
'(database VARCHAR(64) NOT NULL , '
'cluster VARCHAR(64) NOT NULL, '
'schema_name VARCHAR(64) NOT NULL, '
'table_name VARCHAR(64) NOT NULL, '
'column_name VARCHAR(64) NOT NULL, '
'user_email VARCHAR(64) NOT NULL, '
'read_count INTEGER NOT NULL)')
file_loc = 'example/sample_data/' + file_name
with open(file_loc, 'r') as fin:
dr = csv.DictReader(fin)
to_db = [(i['database'],
i['cluster'],
i['schema_name'],
i['table_name'],
i['column_name'],
i['user_email'],
i['read_count']
) for i in dr]
cur.executemany("INSERT INTO test_usage_metadata (database, cluster, "
"schema_name, table_name, column_name, user_email, read_count) "
"VALUES (?, ?, ?, ?, ?, ?, ?);", to_db)
conn.commit()
def create_last_updated_job():
# loader saves data to these folders and publisher reads it from here
tmp_folder = '/var/tmp/amundsen/last_updated_data'
......@@ -307,6 +339,7 @@ if __name__ == "__main__":
load_table_data_from_csv('sample_table.csv')
load_col_data_from_csv('sample_col.csv')
load_usage_data_from_csv('sample_column_usage.csv')
load_user_data_from_csv('sample_user.csv')
load_application_data_from_csv('sample_application.csv')
if create_connection(DB_FILE):
......@@ -320,6 +353,11 @@ if __name__ == "__main__":
'example.models.test_column_model.TestColumnMetadata')
job2.launch()
# start usage job
job_col_usage = create_sample_job('test_usage_metadata',
'example.models.test_column_usage_model.TestColumnUsageModel')
job_col_usage.launch()
# start user job
job_user = create_sample_job('test_user_metadata',
'databuilder.models.user.User')
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment