Commit b797db12 authored by samshuster's avatar samshuster Committed by Tao Feng

Adding table stats to sample data loader. (lyft/amundsen/#158) (#159)

This involved adding schema_name to table_stats model as well as float and int columns to sample data
parent 04c69a37
...@@ -27,15 +27,19 @@ class TableColumnStats(Neo4jCsvSerializable): ...@@ -27,15 +27,19 @@ class TableColumnStats(Neo4jCsvSerializable):
end_epoch, # type: str end_epoch, # type: str
db='hive', # type: str db='hive', # type: str
cluster='gold', # type: str cluster='gold', # type: str
schema_name=None # type: str
): ):
# type: (...) -> None # type: (...) -> None
if schema_name is None:
self.schema, self.table = table_name.split('.')
else:
self.table = table_name.lower()
self.schema = schema_name.lower()
self.db = db self.db = db
self.schema, self.table = table_name.lower().split('.')
self.col_name = col_name.lower() self.col_name = col_name.lower()
self.start_epoch = start_epoch self.start_epoch = start_epoch
self.end_epoch = end_epoch self.end_epoch = end_epoch
self.cluster = cluster self.cluster = cluster
self.stat_name = stat_name self.stat_name = stat_name
self.stat_val = stat_val self.stat_val = stat_val
self._node_iter = iter(self.create_nodes()) self._node_iter = iter(self.create_nodes())
......
...@@ -3,7 +3,8 @@ col1,"col1 description","string",1,hive,gold,test_schema,test_table1,"1st test t ...@@ -3,7 +3,8 @@ col1,"col1 description","string",1,hive,gold,test_schema,test_table1,"1st test t
col2,"col2 description","string",2,hive,gold,test_schema,test_table1,"1st test table" col2,"col2 description","string",2,hive,gold,test_schema,test_table1,"1st test table"
col3,"col3 description","string",3,hive,gold,test_schema,test_table1,"1st test table" col3,"col3 description","string",3,hive,gold,test_schema,test_table1,"1st test table"
col4,"col4 description","string",4,hive,gold,test_schema,test_table1,"1st test table" col4,"col4 description","string",4,hive,gold,test_schema,test_table1,"1st test table"
col5,"col5 description","float",5,hive,gold,test_schema,test_table1,"1st test table"
col1,"col1 description","string",1,dynamo,gold,test_schema,test_table2,"2nd test table" col1,"col1 description","string",1,dynamo,gold,test_schema,test_table2,"2nd test table"
col2,"col2 description","string",2,dynamo,gold,test_schema,test_table2,"2nd test table" col2,"col2 description","string",2,dynamo,gold,test_schema,test_table2,"2nd test table"
col3,"col3 description","string",3,dynamo,gold,test_schema,test_table2,"2nd test table" col3,"col3 description","string",3,dynamo,gold,test_schema,test_table2,"2nd test table"
col4,"col4 description","string",4,dynamo,gold,test_schema,test_table2,"2nd test table" col4,"col4 description","int",4,dynamo,gold,test_schema,test_table2,"2nd test table"
\ No newline at end of file \ No newline at end of file
cluster,db,schema_name,table_name,col_name,stat_name,stat_val,start_epoch,end_epoch
gold,hive,test_schema,test_table1,col1,"distinct values",8,1432300762,1562300762
gold,hive,test_schema,test_table1,col1,"min",aardvark,1432300762,1562300762
gold,hive,test_schema,test_table1,col1,"max",zebra,1432300762,1562300762
gold,hive,test_schema,test_table1,col1,"num nulls",500320,1432300762,1562300762
gold,hive,test_schema,test_table1,col1,"verified",230430,1432300762,1562300762
gold,hive,test_schema,test_table1,col5,"average",5.0,1532300762,1572300762
gold,hive,test_schema,test_table1,col5,"max",500.0,1534300762,1572300762
gold,hive,test_schema,test_table1,col5,"min",-500.0,1534300762,1572300762
gold,dynamo,test_schema,test_table2,col4,"median",250,1534300762,1572300762
gold,dynamo,test_schema,test_table2,col4,"average",400,1534300762,1572300762
\ No newline at end of file
...@@ -115,6 +115,42 @@ def load_col_data_from_csv(file_name): ...@@ -115,6 +115,42 @@ def load_col_data_from_csv(file_name):
conn.commit() conn.commit()
def load_table_column_stats_from_csv(file_name):
conn = create_connection(DB_FILE)
if conn:
cur = conn.cursor()
cur.execute('drop table if exists test_table_column_stats')
cur.execute('create table if not exists test_table_column_stats '
'(cluster VARCHAR(64) NOT NULL , '
'db VARCHAR(64) NOT NULL , '
'schema_name VARCHAR(64) NOT NULL , '
'table_name INTEGER NOT NULL , '
'col_name VARCHAR(64) NOT NULL , '
'stat_name VARCHAR(64) NOT NULL, '
'stat_val VARCHAR(64) NOT NULL,'
'start_epoch VARCHAR(64) NOT NULL,'
'end_epoch VARCHAR(64) NOT NULL)')
file_loc = 'example/sample_data/' + file_name
with open(file_loc, 'r') as fin:
dr = csv.DictReader(fin)
to_db = [(i['cluster'],
i['db'],
i['schema_name'],
i['table_name'],
i['col_name'],
i['stat_name'],
'"' + i['stat_val'] + '"',
i['start_epoch'],
i['end_epoch']) for i in dr]
cur.executemany("INSERT INTO test_table_column_stats ("
"cluster, db, schema_name, table_name,"
"col_name, stat_name, "
"stat_val, start_epoch, end_epoch) VALUES "
"(?, ?, ?, ?, ?, ?, ?, ?, ?);", to_db)
conn.commit()
def load_watermark_data_from_csv(file_name): def load_watermark_data_from_csv(file_name):
conn = create_connection(DB_FILE) conn = create_connection(DB_FILE)
if conn: if conn:
...@@ -473,6 +509,7 @@ if __name__ == "__main__": ...@@ -473,6 +509,7 @@ if __name__ == "__main__":
load_table_data_from_csv('sample_table.csv') load_table_data_from_csv('sample_table.csv')
load_col_data_from_csv('sample_col.csv') load_col_data_from_csv('sample_col.csv')
load_table_column_stats_from_csv('sample_table_column_stats.csv')
load_watermark_data_from_csv('sample_watermark.csv') load_watermark_data_from_csv('sample_watermark.csv')
load_table_owner_data_from_csv('sample_table_owner.csv') load_table_owner_data_from_csv('sample_table_owner.csv')
load_usage_data_from_csv('sample_column_usage.csv') load_usage_data_from_csv('sample_column_usage.csv')
...@@ -492,6 +529,11 @@ if __name__ == "__main__": ...@@ -492,6 +529,11 @@ if __name__ == "__main__":
'example.models.test_column_model.TestColumnMetadata') 'example.models.test_column_model.TestColumnMetadata')
job2.launch() job2.launch()
# start table stats job
job_table_stats = create_sample_job('test_table_column_stats',
'databuilder.models.table_stats.TableColumnStats')
job_table_stats.launch()
# # start watermark job # # start watermark job
job3 = create_sample_job('test_watermark_metadata', job3 = create_sample_job('test_watermark_metadata',
'databuilder.models.watermark.Watermark') 'databuilder.models.watermark.Watermark')
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment