Commit 9b8e9e61 authored by Shaun Elliott's avatar Shaun Elliott Committed by Tao Feng

first class support for csv ingestion (#173)

* ISSUE-186
* prep work, moving model objects to top level

* ISSUE-186
* added csv extractor

* ISSUE-186
* minor fix, to finish the work

* * added csv extractor test
* renamed standalone column model
* fixed sample data loader and sample data

* * parameterized the sample loader connections
* fixed table owner sample data file

* * fixed linting errors

* * added some missing load calls in new data loader
* fixed table stats data problem (quoting)
parent 1fe72e0d
import csv
import importlib
from pyhocon import ConfigTree # noqa: F401
from typing import Any, Iterator # noqa: F401
from databuilder.extractor.base_extractor import Extractor
class CsvExtractor(Extractor):
# Config keys
FILE_LOCATION = 'file_location'
"""
An Extractor that extracts records via CSV.
"""
def init(self, conf):
# type: (ConfigTree) -> None
"""
:param conf:
"""
self.conf = conf
self.file_location = conf.get_string(CsvExtractor.FILE_LOCATION)
model_class = conf.get('model_class', None)
if model_class:
module_name, class_name = model_class.rsplit(".", 1)
mod = importlib.import_module(module_name)
self.model_class = getattr(mod, class_name)
self._load_csv()
def _load_csv(self):
# type: () -> None
"""
Create an iterator to execute sql.
"""
if not hasattr(self, 'results'):
with open(self.file_location, 'r') as fin:
self.results = [dict(i) for i in csv.DictReader(fin)]
if hasattr(self, 'model_class'):
results = [self.model_class(**result)
for result in self.results]
else:
results = self.results
self.iter = iter(results)
def extract(self):
# type: () -> Any
"""
Yield the csv result one at a time.
convert the result to model if a model_class is provided
"""
try:
return next(self.iter)
except StopIteration:
return None
except Exception as e:
raise e
def get_scope(self):
# type: () -> str
return 'extractor.csv'
...@@ -9,7 +9,7 @@ from databuilder.models.user import User ...@@ -9,7 +9,7 @@ from databuilder.models.user import User
from databuilder.publisher.neo4j_csv_publisher import UNQUOTED_SUFFIX from databuilder.publisher.neo4j_csv_publisher import UNQUOTED_SUFFIX
class TestColumnUsageModel(Neo4jCsvSerializable): class ColumnUsageModel(Neo4jCsvSerializable):
""" """
A model represents user <--> column graph model A model represents user <--> column graph model
...@@ -77,9 +77,9 @@ class TestColumnUsageModel(Neo4jCsvSerializable): ...@@ -77,9 +77,9 @@ class TestColumnUsageModel(Neo4jCsvSerializable):
RELATION_END_LABEL: User.USER_NODE_LABEL, RELATION_END_LABEL: User.USER_NODE_LABEL,
RELATION_START_KEY: self._get_table_key(), RELATION_START_KEY: self._get_table_key(),
RELATION_END_KEY: self._get_user_key(self.user_email), RELATION_END_KEY: self._get_user_key(self.user_email),
RELATION_TYPE: TestColumnUsageModel.TABLE_USER_RELATION_TYPE, RELATION_TYPE: ColumnUsageModel.TABLE_USER_RELATION_TYPE,
RELATION_REVERSE_TYPE: TestColumnUsageModel.USER_TABLE_RELATION_TYPE, RELATION_REVERSE_TYPE: ColumnUsageModel.USER_TABLE_RELATION_TYPE,
TestColumnUsageModel.READ_RELATION_COUNT: self.read_count ColumnUsageModel.READ_RELATION_COUNT: self.read_count
}] }]
def _get_table_key(self): def _get_table_key(self):
......
...@@ -7,7 +7,10 @@ from databuilder.models.neo4j_csv_serde import ( ...@@ -7,7 +7,10 @@ from databuilder.models.neo4j_csv_serde import (
from databuilder.models.table_metadata import TableMetadata, DESCRIPTION_NODE_LABEL from databuilder.models.table_metadata import TableMetadata, DESCRIPTION_NODE_LABEL
class TestColumnMetadata(Neo4jCsvSerializable): # This class is needed to handle csv based column loading, since the main column model
# table_metadata.ColumnMetadata requires table_metadata.TableMetadata as well, and this cannot
# be represented in csv form
class StandaloneColumnMetadata(Neo4jCsvSerializable):
COLUMN_NODE_LABEL = 'Column' COLUMN_NODE_LABEL = 'Column'
COLUMN_KEY_FORMAT = '{db}://{cluster}.{schema}/{tbl}/{col}' COLUMN_KEY_FORMAT = '{db}://{cluster}.{schema}/{tbl}/{col}'
COLUMN_NAME = 'name' COLUMN_NAME = 'name'
...@@ -69,20 +72,20 @@ class TestColumnMetadata(Neo4jCsvSerializable): ...@@ -69,20 +72,20 @@ class TestColumnMetadata(Neo4jCsvSerializable):
return None return None
def _get_col_key(self): def _get_col_key(self):
# type: (TestColumnMetadata) -> str # type: (StandaloneColumnMetadata) -> str
return TestColumnMetadata.COLUMN_KEY_FORMAT.format(db=self.database, return StandaloneColumnMetadata.COLUMN_KEY_FORMAT.format(db=self.database,
cluster=self.cluster, cluster=self.cluster,
schema=self.schema_name, schema=self.schema_name,
tbl=self.table_name, tbl=self.table_name,
col=self.name) col=self.name)
def _get_col_description_key(self): def _get_col_description_key(self):
# type: (TestColumnMetadata) -> str # type: (StandaloneColumnMetadata) -> str
return TestColumnMetadata.COLUMN_DESCRIPTION_FORMAT.format(db=self.database, return StandaloneColumnMetadata.COLUMN_DESCRIPTION_FORMAT.format(db=self.database,
cluster=self.cluster, cluster=self.cluster,
schema=self.schema_name, schema=self.schema_name,
tbl=self.table_name, tbl=self.table_name,
col=self.name) col=self.name)
def _get_table_key(self): def _get_table_key(self):
# type: () -> str # type: () -> str
...@@ -98,18 +101,18 @@ class TestColumnMetadata(Neo4jCsvSerializable): ...@@ -98,18 +101,18 @@ class TestColumnMetadata(Neo4jCsvSerializable):
:return: :return:
""" """
results = [{ results = [{
NODE_LABEL: TestColumnMetadata.COLUMN_NODE_LABEL, NODE_LABEL: StandaloneColumnMetadata.COLUMN_NODE_LABEL,
NODE_KEY: self._get_col_key(), NODE_KEY: self._get_col_key(),
TestColumnMetadata.COLUMN_NAME: self.name, StandaloneColumnMetadata.COLUMN_NAME: self.name,
TestColumnMetadata.COLUMN_TYPE: self.type, StandaloneColumnMetadata.COLUMN_TYPE: self.type,
TestColumnMetadata.COLUMN_ORDER: self.sort_order StandaloneColumnMetadata.COLUMN_ORDER: self.sort_order
}] }]
if self.description: if self.description:
results.append({ results.append({
NODE_LABEL: DESCRIPTION_NODE_LABEL, NODE_LABEL: DESCRIPTION_NODE_LABEL,
NODE_KEY: self._get_col_description_key(), NODE_KEY: self._get_col_description_key(),
TestColumnMetadata.COLUMN_DESCRIPTION: self.description StandaloneColumnMetadata.COLUMN_DESCRIPTION: self.description
}) })
return results return results
...@@ -123,7 +126,7 @@ class TestColumnMetadata(Neo4jCsvSerializable): ...@@ -123,7 +126,7 @@ class TestColumnMetadata(Neo4jCsvSerializable):
results = [{ results = [{
RELATION_START_LABEL: TableMetadata.TABLE_NODE_LABEL, RELATION_START_LABEL: TableMetadata.TABLE_NODE_LABEL,
RELATION_END_LABEL: TestColumnMetadata.COLUMN_NODE_LABEL, RELATION_END_LABEL: StandaloneColumnMetadata.COLUMN_NODE_LABEL,
RELATION_START_KEY: self._get_table_key(), RELATION_START_KEY: self._get_table_key(),
RELATION_END_KEY: self._get_col_key(), RELATION_END_KEY: self._get_col_key(),
RELATION_TYPE: TableMetadata.TABLE_COL_RELATION_TYPE, RELATION_TYPE: TableMetadata.TABLE_COL_RELATION_TYPE,
......
from typing import Any, Dict, List, Union # noqa: F401
from databuilder.models.neo4j_csv_serde import Neo4jCsvSerializable, NODE_KEY, \
NODE_LABEL
class TestTableModel(Neo4jCsvSerializable):
# type: (...) -> None
"""
Hive table watermark result model.
Each instance represents one row of hive watermark result.
"""
LABEL = 'Table'
KEY_FORMAT = '{db}://{cluster}.{schema}/{ tbl}'
def __init__(self,
database, # type: str
cluster, # type: str
schema_name, # type: str
table_name, # type: str
table_desc, # type: str
):
# type: (...) -> None
self.database = database
self.cluster = cluster
self.schema_name = schema_name
self.table_name = table_name
self.table_desc = table_desc
# currently we don't consider nested partitions
self._node_iter = iter(self.create_nodes())
self._relation_iter = iter(self.create_relation())
def create_next_node(self):
# type: (...) -> Union[Dict[str, Any], None]
# return the string representation of the data
try:
return next(self._node_iter)
except StopIteration:
return None
def create_next_relation(self):
# type: (...) -> Union[Dict[str, Any], None]
try:
return next(self._relation_iter)
except StopIteration:
return None
def create_nodes(self):
# type: () -> List[Dict[str, Any]]
"""
Create a list of Neo4j node records
:return:
"""
results = []
results.append({
NODE_KEY: '{db}://{cluster}.{schema}/{tbl}'.format(db=self.database,
cluster=self.cluster,
schema=self.schema_name,
tbl=self.table_name),
NODE_LABEL: TestTableModel.LABEL,
'table_desc': self.table_desc,
'tbl_key': '{db}://{cluster}.{schema}/{tbl}'.format(db=self.database,
cluster=self.cluster,
schema=self.schema_name,
tbl=self.table_name)
})
return results
def create_relation(self):
# type: () -> List[Dict[str, Any]]
"""
Create a list of relation map between watermark record with original hive table
:return:
"""
return []
name,description,col_type,sort_order,database,cluster,schema_name,table_name,table_desc name,description,col_type,sort_order,database,cluster,schema_name,table_name,table_description
col1,"col1 description","string",1,hive,gold,test_schema,test_table1,"1st test table" col1,"col1 description","string",1,hive,gold,test_schema,test_table1,"1st test table"
col2,"col2 description","string",2,hive,gold,test_schema,test_table1,"1st test table" col2,"col2 description","string",2,hive,gold,test_schema,test_table1,"1st test table"
col3,"col3 description","string",3,hive,gold,test_schema,test_table1,"1st test table" col3,"col3 description","string",3,hive,gold,test_schema,test_table1,"1st test table"
......
database,cluster,schema_name,table_name,table_desc,tags database,cluster,schema_name,name,description,tags
hive,gold,test_schema,test_table1,"1st test table","tag1,tag2" hive,gold,test_schema,test_table1,"1st test table","tag1,tag2"
dynamo,gold,test_schema,test_table2,"2nd test table", dynamo,gold,test_schema,test_table2,"2nd test table",
cluster,db,schema_name,table_name,col_name,stat_name,stat_val,start_epoch,end_epoch cluster,db,schema_name,table_name,col_name,stat_name,stat_val,start_epoch,end_epoch
gold,hive,test_schema,test_table1,col1,"distinct values",8,1432300762,1562300762 gold,hive,test_schema,test_table1,col1,"distinct values","8",1432300762,1562300762
gold,hive,test_schema,test_table1,col1,"min",aardvark,1432300762,1562300762 gold,hive,test_schema,test_table1,col1,"min","""aardvark""",1432300762,1562300762
gold,hive,test_schema,test_table1,col1,"max",zebra,1432300762,1562300762 gold,hive,test_schema,test_table1,col1,"max","""zebra""",1432300762,1562300762
gold,hive,test_schema,test_table1,col1,"num nulls",500320,1432300762,1562300762 gold,hive,test_schema,test_table1,col1,"num nulls","""500320""",1432300762,1562300762
gold,hive,test_schema,test_table1,col1,"verified",230430,1432300762,1562300762 gold,hive,test_schema,test_table1,col1,"verified","""230430""",1432300762,1562300762
gold,hive,test_schema,test_table1,col5,"average",5.0,1532300762,1572300762 gold,hive,test_schema,test_table1,col5,"average","""5.0""",1532300762,1572300762
gold,hive,test_schema,test_table1,col5,"max",500.0,1534300762,1572300762 gold,hive,test_schema,test_table1,col5,"max","""500.0""",1534300762,1572300762
gold,hive,test_schema,test_table1,col5,"min",-500.0,1534300762,1572300762 gold,hive,test_schema,test_table1,col5,"min","""-500.0""",1534300762,1572300762
gold,dynamo,test_schema,test_table2,col4,"median",250,1534300762,1572300762 gold,dynamo,test_schema,test_table2,col4,"median","""250""",1534300762,1572300762
gold,dynamo,test_schema,test_table2,col4,"average",400,1534300762,1572300762 gold,dynamo,test_schema,test_table2,col4,"average","""400""",1534300762,1572300762
\ No newline at end of file \ No newline at end of file
database,cluster,schema_name,table_name,owners db_name,schema_name,cluster,table_name,owners
hive,gold,test_schema,test_table1,"roald.amundsen@example.org,chrisc@example.org" hive,test_schema,gold,test_table1,"roald.amundsen@example.org,chrisc@example.org"
dynamo,gold,test_schema,test_table2, dynamo,test_schema,gold,test_table2,
This diff is collapsed.
...@@ -4,6 +4,8 @@ into Neo4j and Elasticsearch without using an Airflow DAG. ...@@ -4,6 +4,8 @@ into Neo4j and Elasticsearch without using an Airflow DAG.
""" """
import csv import csv
import sys
from elasticsearch import Elasticsearch from elasticsearch import Elasticsearch
import logging import logging
from pyhocon import ConfigFactory from pyhocon import ConfigFactory
...@@ -25,18 +27,22 @@ from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher ...@@ -25,18 +27,22 @@ from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher
from databuilder.task.task import DefaultTask from databuilder.task.task import DefaultTask
from databuilder.transformer.base_transformer import NoopTransformer from databuilder.transformer.base_transformer import NoopTransformer
# change to the address of Elasticsearch service es_host = None
neo_host = None
if len(sys.argv) > 1:
es_host = sys.argv[1]
if len(sys.argv) > 2:
neo_host = sys.argv[2]
es = Elasticsearch([ es = Elasticsearch([
{'host': 'localhost'}, {'host': es_host if es_host else 'localhost'},
]) ])
DB_FILE = '/tmp/test.db' DB_FILE = '/tmp/test.db'
SQLITE_CONN_STRING = 'sqlite:////tmp/test.db' SQLITE_CONN_STRING = 'sqlite:////tmp/test.db'
Base = declarative_base() Base = declarative_base()
# replace localhost with docker host ip NEO4J_ENDPOINT = 'bolt://{}:7687'.format(neo_host if neo_host else 'localhost')
# todo: get the ip from input argument
NEO4J_ENDPOINT = 'bolt://localhost:7687'
neo4j_endpoint = NEO4J_ENDPOINT neo4j_endpoint = NEO4J_ENDPOINT
neo4j_user = 'neo4j' neo4j_user = 'neo4j'
...@@ -70,8 +76,8 @@ def load_table_data_from_csv(file_name): ...@@ -70,8 +76,8 @@ def load_table_data_from_csv(file_name):
to_db = [(i['database'], to_db = [(i['database'],
i['cluster'], i['cluster'],
i['schema_name'], i['schema_name'],
i['table_name'], i['name'],
i['table_desc'], i['description'],
i['tags']) for i in dr] i['tags']) for i in dr]
cur.executemany("INSERT INTO test_table_metadata (database, cluster, " cur.executemany("INSERT INTO test_table_metadata (database, cluster, "
...@@ -105,7 +111,7 @@ def load_col_data_from_csv(file_name): ...@@ -105,7 +111,7 @@ def load_col_data_from_csv(file_name):
i['cluster'], i['cluster'],
i['schema_name'], i['schema_name'],
i['table_name'], i['table_name'],
i['table_desc']) for i in dr] i['table_description']) for i in dr]
cur.executemany("INSERT INTO test_col_metadata (" cur.executemany("INSERT INTO test_col_metadata ("
"name, description, col_type, sort_order," "name, description, col_type, sort_order,"
...@@ -139,7 +145,7 @@ def load_table_column_stats_from_csv(file_name): ...@@ -139,7 +145,7 @@ def load_table_column_stats_from_csv(file_name):
i['table_name'], i['table_name'],
i['col_name'], i['col_name'],
i['stat_name'], i['stat_name'],
'"' + i['stat_val'] + '"', i['stat_val'],
i['start_epoch'], i['start_epoch'],
i['end_epoch']) for i in dr] i['end_epoch']) for i in dr]
...@@ -396,15 +402,15 @@ def load_table_owner_data_from_csv(file_name): ...@@ -396,15 +402,15 @@ def load_table_owner_data_from_csv(file_name):
file_loc = 'example/sample_data/' + file_name file_loc = 'example/sample_data/' + file_name
with open(file_loc, 'r') as fin: with open(file_loc, 'r') as fin:
dr = csv.DictReader(fin) dr = csv.DictReader(fin)
to_db = [(i['database'], to_db = [(i['db_name'],
i['schema_name'], i['schema_name'],
i['cluster'],
i['table_name'], i['table_name'],
i['owners'], i['owners']
i['cluster']
) for i in dr] ) for i in dr]
cur.executemany("INSERT INTO test_table_owner_metadata " cur.executemany("INSERT INTO test_table_owner_metadata "
"(db_name, schema_name, table_name, owners, cluster) " "(db_name, schema_name, cluster, table_name, owners) "
"VALUES (?, ?, ?, ?, ?);", to_db) "VALUES (?, ?, ?, ?, ?);", to_db)
conn.commit() conn.commit()
...@@ -533,7 +539,7 @@ if __name__ == "__main__": ...@@ -533,7 +539,7 @@ if __name__ == "__main__":
# start col job # start col job
job2 = create_sample_job('test_col_metadata', job2 = create_sample_job('test_col_metadata',
'example.models.test_column_model.TestColumnMetadata') 'databuilder.models.standalone_column_model.StandaloneColumnMetadata')
job2.launch() job2.launch()
# start table stats job # start table stats job
...@@ -553,7 +559,7 @@ if __name__ == "__main__": ...@@ -553,7 +559,7 @@ if __name__ == "__main__":
# start usage job # start usage job
job_col_usage = create_sample_job('test_usage_metadata', job_col_usage = create_sample_job('test_usage_metadata',
'example.models.test_column_usage_model.TestColumnUsageModel') 'databuilder.models.column_usage_model.ColumnUsageModel')
job_col_usage.launch() job_col_usage.launch()
# start user job # start user job
......
import unittest
from pyhocon import ConfigFactory # noqa: F401
from databuilder import Scoped
from databuilder.extractor.csv_extractor import CsvExtractor
class TestCsvExtractor(unittest.TestCase):
def setUp(self):
# type: () -> None
config_dict = {
'extractor.csv.{}'.format(CsvExtractor.FILE_LOCATION): 'example/sample_data/sample_col.csv',
'extractor.csv.model_class': 'databuilder.models.standalone_column_model.StandaloneColumnMetadata',
}
self.conf = ConfigFactory.from_dict(config_dict)
def test_extraction_with_model_class(self):
# type: () -> None
"""
Test Extraction using model class
"""
extractor = CsvExtractor()
extractor.init(Scoped.get_scoped_conf(conf=self.conf,
scope=extractor.get_scope()))
result = extractor.extract()
self.assertEquals(result.name, 'col1')
self.assertEquals(result.description, 'col1 description')
self.assertEquals(result.type, 'string')
self.assertEquals(result.sort_order, '1')
self.assertEquals(result.database, 'hive')
self.assertEquals(result.cluster, 'gold')
self.assertEquals(result.schema_name, 'test_schema')
self.assertEquals(result.table_name, 'test_table1')
self.assertEquals(result.table_desc, '1st test table')
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment