Unverified Commit 37ee834e authored by Tao Feng's avatar Tao Feng Committed by GitHub

Add table source model (#110)

parent 4656f236
from typing import Any, Dict, List, Union # noqa: F401
from databuilder.models.neo4j_csv_serde import Neo4jCsvSerializable, NODE_KEY, \
NODE_LABEL, RELATION_START_KEY, RELATION_START_LABEL, RELATION_END_KEY, \
RELATION_END_LABEL, RELATION_TYPE, RELATION_REVERSE_TYPE
from databuilder.models.table_metadata import TableMetadata
class TableSource(Neo4jCsvSerializable):
# type: (...) -> None
"""
Hive table source model.
"""
LABEL = 'Source'
KEY_FORMAT = '{db}://{cluster}.{schema}/{tbl}/_source'
SOURCE_TABLE_RELATION_TYPE = 'SOURCE_OF'
TABLE_SOURCE_RELATION_TYPE = 'SOURCE'
def __init__(self,
db_name, # type: str
schema_name, # type: str
table_name, # type: str
cluster, # type: str
source, # type: str
source_type='github', # type: str
):
# type: (...) -> None
self.db = db_name.lower()
self.schema = schema_name.lower()
self.table = table_name.lower()
self.cluster = cluster.lower() if cluster else 'gold'
# source is the source file location
self.source = source
self.source_type = source_type
self._node_iter = iter(self.create_nodes())
self._relation_iter = iter(self.create_relation())
def create_next_node(self):
# type: (...) -> Union[Dict[str, Any], None]
# return the string representation of the data
try:
return next(self._node_iter)
except StopIteration:
return None
def create_next_relation(self):
# type: (...) -> Union[Dict[str, Any], None]
try:
return next(self._relation_iter)
except StopIteration:
return None
def get_source_model_key(self):
# type: (...) -> str
return TableSource.KEY_FORMAT.format(db=self.db,
cluster=self.cluster,
schema=self.schema,
tbl=self.table)
def get_metadata_model_key(self):
# type: (...) -> str
return '{db}://{cluster}.{schema}/{table}'.format(db=self.db,
cluster=self.cluster,
schema=self.schema,
table=self.table)
def create_nodes(self):
# type: () -> List[Dict[str, Any]]
"""
Create a list of Neo4j node records
:return:
"""
results = [{
NODE_KEY: self.get_source_model_key(),
NODE_LABEL: TableSource.LABEL,
'source': self.source,
'source_type': self.source_type
}]
return results
def create_relation(self):
# type: () -> List[Dict[str, Any]]
"""
Create a list of relation map between owner record with original hive table
:return:
"""
results = [{
RELATION_START_KEY: self.get_source_model_key(),
RELATION_START_LABEL: TableSource.LABEL,
RELATION_END_KEY: self.get_metadata_model_key(),
RELATION_END_LABEL: TableMetadata.TABLE_NODE_LABEL,
RELATION_TYPE: TableSource.SOURCE_TABLE_RELATION_TYPE,
RELATION_REVERSE_TYPE: TableSource.TABLE_SOURCE_RELATION_TYPE
}]
return results
def __repr__(self):
# type: () -> str
return 'TableSource({!r}, {!r}, {!r}, {!r}, {!r})'.format(self.db,
self.cluster,
self.schema,
self.table,
self.source)
import unittest
from databuilder.models.table_source import TableSource
from databuilder.models.neo4j_csv_serde import RELATION_START_KEY, RELATION_START_LABEL, RELATION_END_KEY, \
RELATION_END_LABEL, RELATION_TYPE, RELATION_REVERSE_TYPE
DB = 'hive'
SCHEMA = 'base'
TABLE = 'test'
CLUSTER = 'default'
SOURCE = '/etl/sql/file.py'
class TestTableSource(unittest.TestCase):
def setUp(self):
# type: () -> None
super(TestTableSource, self).setUp()
self.table_source = TableSource(db_name='hive',
schema_name=SCHEMA,
table_name=TABLE,
cluster=CLUSTER,
source=SOURCE)
def test_get_source_model_key(self):
# type: () -> None
source = self.table_source.get_source_model_key()
self.assertEquals(source, '{db}://{cluster}.{schema}/{tbl}/_source'.format(db=DB,
schema=SCHEMA,
tbl=TABLE,
cluster=CLUSTER,
))
def test_get_metadata_model_key(self):
# type: () -> None
metadata = self.table_source.get_metadata_model_key()
self.assertEquals(metadata, 'hive://default.base/test')
def test_create_nodes(self):
# type: () -> None
nodes = self.table_source.create_nodes()
self.assertEquals(len(nodes), 1)
def test_create_relation(self):
# type: () -> None
relations = self.table_source.create_relation()
self.assertEquals(len(relations), 1)
start_key = '{db}://{cluster}.{schema}/{tbl}/_source'.format(db=DB,
schema=SCHEMA,
tbl=TABLE,
cluster=CLUSTER)
end_key = '{db}://{cluster}.{schema}/{tbl}'.format(db=DB,
schema=SCHEMA,
tbl=TABLE,
cluster=CLUSTER)
relation = {
RELATION_START_KEY: start_key,
RELATION_START_LABEL: TableSource.LABEL,
RELATION_END_KEY: end_key,
RELATION_END_LABEL: 'Table',
RELATION_TYPE: TableSource.SOURCE_TABLE_RELATION_TYPE,
RELATION_REVERSE_TYPE: TableSource.TABLE_SOURCE_RELATION_TYPE
}
self.assertTrue(relation in relations)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment