Commit 89b5ec3e authored by Mikhail Ivanov's avatar Mikhail Ivanov Committed by Tao Feng

Add sample table owner data to quick start (#139) (#150)

* Add sample table owner data to quick start (#139)

* Update example/sample_data/sample_table_owner.csv
Co-Authored-By: 's avatarjornh <jornhansen@gmail.com>

* Add sample table owner data to quick start
parent 67e7495c
...@@ -19,14 +19,16 @@ class TableOwner(Neo4jCsvSerializable): ...@@ -19,14 +19,16 @@ class TableOwner(Neo4jCsvSerializable):
db_name, # type: str db_name, # type: str
schema_name, # type: str schema_name, # type: str
table_name, # type: str table_name, # type: str
owners, # type: List owners, # type: Union[List, str]
cluster='gold', # type: str cluster='gold', # type: str
): ):
# type: (...) -> None # type: (...) -> None
self.db = db_name.lower() self.db = db_name.lower()
self.schema = schema_name.lower() self.schema = schema_name.lower()
self.table = table_name.lower() self.table = table_name.lower()
self.owners = owners if isinstance(owners, str):
owners = owners.split(',')
self.owners = [owner.lower().strip() for owner in owners]
self.cluster = cluster.lower() self.cluster = cluster.lower()
self._node_iter = iter(self.create_nodes()) self._node_iter = iter(self.create_nodes())
......
database,cluster,schema_name,table_name,owners
hive,gold,test_schema,test_table1,"roald.amundsen@example.org,chrisc@example.org"
dynamo,gold,test_schema,test_table2,
...@@ -285,7 +285,7 @@ def load_usage_data_from_csv(file_name): ...@@ -285,7 +285,7 @@ def load_usage_data_from_csv(file_name):
cur = conn.cursor() cur = conn.cursor()
cur.execute('drop table if exists test_usage_metadata') cur.execute('drop table if exists test_usage_metadata')
cur.execute('create table if not exists test_usage_metadata ' cur.execute('create table if not exists test_usage_metadata '
'(database VARCHAR(64) NOT NULL , ' '(database VARCHAR(64) NOT NULL, '
'cluster VARCHAR(64) NOT NULL, ' 'cluster VARCHAR(64) NOT NULL, '
'schema_name VARCHAR(64) NOT NULL, ' 'schema_name VARCHAR(64) NOT NULL, '
'table_name VARCHAR(64) NOT NULL, ' 'table_name VARCHAR(64) NOT NULL, '
...@@ -310,6 +310,34 @@ def load_usage_data_from_csv(file_name): ...@@ -310,6 +310,34 @@ def load_usage_data_from_csv(file_name):
conn.commit() conn.commit()
def load_table_owner_data_from_csv(file_name):
# Load usage data
conn = create_connection(DB_FILE)
if conn:
cur = conn.cursor()
cur.execute('drop table if exists test_table_owner_metadata')
cur.execute('create table if not exists test_table_owner_metadata '
'(db_name VARCHAR(64) NOT NULL, '
'schema_name VARCHAR(64) NOT NULL, '
'table_name VARCHAR(64) NOT NULL, '
'owners VARCHAR(128) NOT NULL, '
'cluster VARCHAR(64) NOT NULL)')
file_loc = 'example/sample_data/' + file_name
with open(file_loc, 'r') as fin:
dr = csv.DictReader(fin)
to_db = [(i['database'],
i['schema_name'],
i['table_name'],
i['owners'],
i['cluster']
) for i in dr]
cur.executemany("INSERT INTO test_table_owner_metadata "
"(db_name, schema_name, table_name, owners, cluster) "
"VALUES (?, ?, ?, ?, ?);", to_db)
conn.commit()
def create_last_updated_job(): def create_last_updated_job():
# loader saves data to these folders and publisher reads it from here # loader saves data to these folders and publisher reads it from here
tmp_folder = '/var/tmp/amundsen/last_updated_data' tmp_folder = '/var/tmp/amundsen/last_updated_data'
...@@ -418,6 +446,7 @@ if __name__ == "__main__": ...@@ -418,6 +446,7 @@ if __name__ == "__main__":
load_table_data_from_csv('sample_table.csv') load_table_data_from_csv('sample_table.csv')
load_col_data_from_csv('sample_col.csv') load_col_data_from_csv('sample_col.csv')
load_watermark_data_from_csv('sample_watermark.csv') load_watermark_data_from_csv('sample_watermark.csv')
load_table_owner_data_from_csv('sample_table_owner.csv')
load_usage_data_from_csv('sample_column_usage.csv') load_usage_data_from_csv('sample_column_usage.csv')
load_user_data_from_csv('sample_user.csv') load_user_data_from_csv('sample_user.csv')
load_application_data_from_csv('sample_application.csv') load_application_data_from_csv('sample_application.csv')
...@@ -439,6 +468,11 @@ if __name__ == "__main__": ...@@ -439,6 +468,11 @@ if __name__ == "__main__":
'databuilder.models.watermark.Watermark') 'databuilder.models.watermark.Watermark')
job3.launch() job3.launch()
# start owner job
job_table_owner = create_sample_job('test_table_owner_metadata',
'databuilder.models.table_owner.TableOwner')
job_table_owner.launch()
# start usage job # start usage job
job_col_usage = create_sample_job('test_usage_metadata', job_col_usage = create_sample_job('test_usage_metadata',
'example.models.test_column_usage_model.TestColumnUsageModel') 'example.models.test_column_usage_model.TestColumnUsageModel')
......
...@@ -3,7 +3,8 @@ from databuilder.models.user import User ...@@ -3,7 +3,8 @@ from databuilder.models.user import User
from databuilder.models.table_owner import TableOwner from databuilder.models.table_owner import TableOwner
from databuilder.models.neo4j_csv_serde import RELATION_START_KEY, RELATION_START_LABEL, RELATION_END_KEY, \ from databuilder.models.neo4j_csv_serde import NODE_KEY, NODE_LABEL, \
RELATION_START_KEY, RELATION_START_LABEL, RELATION_END_KEY, \
RELATION_END_LABEL, RELATION_TYPE, RELATION_REVERSE_TYPE RELATION_END_LABEL, RELATION_TYPE, RELATION_REVERSE_TYPE
...@@ -11,6 +12,8 @@ db = 'hive' ...@@ -11,6 +12,8 @@ db = 'hive'
SCHEMA = 'BASE' SCHEMA = 'BASE'
TABLE = 'TEST' TABLE = 'TEST'
CLUSTER = 'DEFAULT' CLUSTER = 'DEFAULT'
owner1 = 'user1@1'
owner2 = 'user2@2'
class TestTableOwner(unittest.TestCase): class TestTableOwner(unittest.TestCase):
...@@ -22,12 +25,12 @@ class TestTableOwner(unittest.TestCase): ...@@ -22,12 +25,12 @@ class TestTableOwner(unittest.TestCase):
schema_name=SCHEMA, schema_name=SCHEMA,
table_name=TABLE, table_name=TABLE,
cluster=CLUSTER, cluster=CLUSTER,
owners=['user1@1', 'user2@2']) owners="user1@1, UsER2@2 ")
def test_get_owner_model_key(self): def test_get_owner_model_key(self):
# type: () -> None # type: () -> None
owner = self.table_owner.get_owner_model_key('user1@1') owner = self.table_owner.get_owner_model_key(owner1)
self.assertEquals(owner, 'user1@1') self.assertEquals(owner, owner1)
def test_get_metadata_model_key(self): def test_get_metadata_model_key(self):
# type: () -> None # type: () -> None
...@@ -39,18 +42,65 @@ class TestTableOwner(unittest.TestCase): ...@@ -39,18 +42,65 @@ class TestTableOwner(unittest.TestCase):
nodes = self.table_owner.create_nodes() nodes = self.table_owner.create_nodes()
self.assertEquals(len(nodes), 2) self.assertEquals(len(nodes), 2)
node1 = {
NODE_KEY: User.USER_NODE_KEY_FORMAT.format(email=owner1),
NODE_LABEL: User.USER_NODE_LABEL,
User.USER_NODE_EMAIL: owner1
}
node2 = {
NODE_KEY: User.USER_NODE_KEY_FORMAT.format(email=owner2),
NODE_LABEL: User.USER_NODE_LABEL,
User.USER_NODE_EMAIL: owner2
}
self.assertTrue(node1 in nodes)
self.assertTrue(node2 in nodes)
def test_create_relation(self): def test_create_relation(self):
# type: () -> None # type: () -> None
relations = self.table_owner.create_relation() relations = self.table_owner.create_relation()
self.assertEquals(len(relations), 2) self.assertEquals(len(relations), 2)
relation = { relation1 = {
RELATION_START_KEY: 'user1@1', RELATION_START_KEY: owner1,
RELATION_START_LABEL: User.USER_NODE_LABEL, RELATION_START_LABEL: User.USER_NODE_LABEL,
RELATION_END_KEY: self.table_owner.get_metadata_model_key(), RELATION_END_KEY: self.table_owner.get_metadata_model_key(),
RELATION_END_LABEL: 'Table', RELATION_END_LABEL: 'Table',
RELATION_TYPE: TableOwner.OWNER_TABLE_RELATION_TYPE, RELATION_TYPE: TableOwner.OWNER_TABLE_RELATION_TYPE,
RELATION_REVERSE_TYPE: TableOwner.TABLE_OWNER_RELATION_TYPE RELATION_REVERSE_TYPE: TableOwner.TABLE_OWNER_RELATION_TYPE
} }
relation2 = {
RELATION_START_KEY: owner2,
RELATION_START_LABEL: User.USER_NODE_LABEL,
RELATION_END_KEY: self.table_owner.get_metadata_model_key(),
RELATION_END_LABEL: 'Table',
RELATION_TYPE: TableOwner.OWNER_TABLE_RELATION_TYPE,
RELATION_REVERSE_TYPE: TableOwner.TABLE_OWNER_RELATION_TYPE
}
self.assertTrue(relation1 in relations)
self.assertTrue(relation2 in relations)
def test_create_nodes_with_owners_list(self):
# type: () -> None
self.table_owner_list = TableOwner(db_name='hive',
schema_name=SCHEMA,
table_name=TABLE,
cluster=CLUSTER,
owners=['user1@1', ' UsER2@2 '])
nodes = self.table_owner_list.create_nodes()
self.assertEquals(len(nodes), 2)
node1 = {
NODE_KEY: User.USER_NODE_KEY_FORMAT.format(email=owner1),
NODE_LABEL: User.USER_NODE_LABEL,
User.USER_NODE_EMAIL: owner1
}
node2 = {
NODE_KEY: User.USER_NODE_KEY_FORMAT.format(email=owner2),
NODE_LABEL: User.USER_NODE_LABEL,
User.USER_NODE_EMAIL: owner2
}
self.assertTrue(relation in relations) self.assertTrue(node1 in nodes)
self.assertTrue(node2 in nodes)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment