Commit e64a83be authored by Mikhail Ivanov's avatar Mikhail Ivanov Committed by Tao Feng

Add sample high/low watermark data to quick start (#62) (#145)

* Add sample high/low watermark data to quick start (#62)

* Update setup.py
parent ab9b1386
from typing import Any, Dict, List, Union # noqa: F401 from databuilder.models.watermark import Watermark
import warnings
warnings.warn("HiveWatermark class is deprecated. Use Watermark instead",
DeprecationWarning, stacklevel=2)
from databuilder.models.neo4j_csv_serde import Neo4jCsvSerializable, NODE_KEY, \
NODE_LABEL, RELATION_START_KEY, RELATION_START_LABEL, RELATION_END_KEY, \
RELATION_END_LABEL, RELATION_TYPE, RELATION_REVERSE_TYPE
class HiveWatermark(Watermark):
class HiveWatermark(Neo4jCsvSerializable):
# type: (...) -> None # type: (...) -> None
""" """
Hive table watermark result model. Hive table watermark result model.
Each instance represents one row of hive watermark result. Each instance represents one row of hive table watermark result.
""" """
LABEL = 'Watermark'
KEY_FORMAT = 'hive://{cluster}.{schema}' \
'/{table}/{part_type}/'
WATERMARK_TABLE_RELATION_TYPE = 'BELONG_TO_TABLE'
TABLE_WATERMARK_RELATION_TYPE = 'WATERMARK'
def __init__(self, def __init__(self,
create_time, # type: str create_time, # type: str
...@@ -26,80 +20,11 @@ class HiveWatermark(Neo4jCsvSerializable): ...@@ -26,80 +20,11 @@ class HiveWatermark(Neo4jCsvSerializable):
cluster='gold', # type: str cluster='gold', # type: str
): ):
# type: (...) -> None # type: (...) -> None
self.create_time = create_time super(HiveWatermark, self).__init__(create_time=create_time,
self.schema = schema_name.lower() database='hive',
self.table = table_name.lower() schema_name=schema_name,
self.parts = [] # type: list table_name=table_name,
part_name=part_name,
if '=' not in part_name: part_type=part_type,
raise Exception('Only partition table has high watermark') cluster=cluster,
)
# currently we don't consider nested partitions
idx = part_name.find('=')
name, value = part_name.lower()[:idx], part_name.lower()[idx + 1:]
self.parts = [(name, value)]
self.part_type = part_type.lower()
self.cluster = cluster.lower()
self._node_iter = iter(self.create_nodes())
self._relation_iter = iter(self.create_relation())
def create_next_node(self):
# type: (...) -> Union[Dict[str, Any], None]
# return the string representation of the data
try:
return next(self._node_iter)
except StopIteration:
return None
def create_next_relation(self):
# type: (...) -> Union[Dict[str, Any], None]
try:
return next(self._relation_iter)
except StopIteration:
return None
def get_watermark_model_key(self):
# type: (...) -> str
return HiveWatermark.KEY_FORMAT.format(cluster=self.cluster,
schema=self.schema,
table=self.table,
part_type=self.part_type)
def get_metadata_model_key(self):
# type: (...) -> str
return 'hive://{cluster}.{schema}/{table}'.format(cluster=self.cluster,
schema=self.schema,
table=self.table)
def create_nodes(self):
# type: () -> List[Dict[str, Any]]
"""
Create a list of Neo4j node records
:return:
"""
results = []
for part in self.parts:
results.append({
NODE_KEY: self.get_watermark_model_key(),
NODE_LABEL: HiveWatermark.LABEL,
'partition_key': part[0],
'partition_value': part[1],
'create_time': self.create_time
})
return results
def create_relation(self):
# type: () -> List[Dict[str, Any]]
"""
Create a list of relation map between watermark record with original hive table
:return:
"""
results = [{
RELATION_START_KEY: self.get_watermark_model_key(),
RELATION_START_LABEL: HiveWatermark.LABEL,
RELATION_END_KEY: self.get_metadata_model_key(),
RELATION_END_LABEL: 'Table',
RELATION_TYPE: HiveWatermark.WATERMARK_TABLE_RELATION_TYPE,
RELATION_REVERSE_TYPE: HiveWatermark.TABLE_WATERMARK_RELATION_TYPE
}]
return results
from typing import Any, Dict, List, Union # noqa: F401
from databuilder.models.neo4j_csv_serde import Neo4jCsvSerializable, NODE_KEY, \
NODE_LABEL, RELATION_START_KEY, RELATION_START_LABEL, RELATION_END_KEY, \
RELATION_END_LABEL, RELATION_TYPE, RELATION_REVERSE_TYPE
class Watermark(Neo4jCsvSerializable):
# type: (...) -> None
"""
Table watermark result model.
Each instance represents one row of table watermark result.
"""
LABEL = 'Watermark'
KEY_FORMAT = '{database}://{cluster}.{schema}' \
'/{table}/{part_type}/'
WATERMARK_TABLE_RELATION_TYPE = 'BELONG_TO_TABLE'
TABLE_WATERMARK_RELATION_TYPE = 'WATERMARK'
def __init__(self,
create_time, # type: str
database, # type: str
schema_name, # type: str
table_name, # type: str
part_name, # type: str
part_type='high_watermark', # type: str
cluster='gold', # type: str
):
# type: (...) -> None
self.create_time = create_time
self.database = database.lower()
self.schema = schema_name.lower()
self.table = table_name.lower()
self.parts = [] # type: list
if '=' not in part_name:
raise Exception('Only partition table has high watermark')
# currently we don't consider nested partitions
idx = part_name.find('=')
name, value = part_name.lower()[:idx], part_name.lower()[idx + 1:]
self.parts = [(name, value)]
self.part_type = part_type.lower()
self.cluster = cluster.lower()
self._node_iter = iter(self.create_nodes())
self._relation_iter = iter(self.create_relation())
def create_next_node(self):
# type: (...) -> Union[Dict[str, Any], None]
# return the string representation of the data
try:
return next(self._node_iter)
except StopIteration:
return None
def create_next_relation(self):
# type: (...) -> Union[Dict[str, Any], None]
try:
return next(self._relation_iter)
except StopIteration:
return None
def get_watermark_model_key(self):
# type: (...) -> str
return Watermark.KEY_FORMAT.format(database=self.database,
cluster=self.cluster,
schema=self.schema,
table=self.table,
part_type=self.part_type)
def get_metadata_model_key(self):
# type: (...) -> str
return '{database}://{cluster}.{schema}/{table}'.format(database=self.database,
cluster=self.cluster,
schema=self.schema,
table=self.table)
def create_nodes(self):
# type: () -> List[Dict[str, Any]]
"""
Create a list of Neo4j node records
:return:
"""
results = []
for part in self.parts:
results.append({
NODE_KEY: self.get_watermark_model_key(),
NODE_LABEL: Watermark.LABEL,
'partition_key': part[0],
'partition_value': part[1],
'create_time': self.create_time
})
return results
def create_relation(self):
# type: () -> List[Dict[str, Any]]
"""
Create a list of relation map between watermark record with original table
:return:
"""
results = [{
RELATION_START_KEY: self.get_watermark_model_key(),
RELATION_START_LABEL: Watermark.LABEL,
RELATION_END_KEY: self.get_metadata_model_key(),
RELATION_END_LABEL: 'Table',
RELATION_TYPE: Watermark.WATERMARK_TABLE_RELATION_TYPE,
RELATION_REVERSE_TYPE: Watermark.TABLE_WATERMARK_RELATION_TYPE
}]
return results
create_time,database,schema_name,table_name,part_name,part_type,cluster
2019-10-01T12:13:14,hive,test_schema,test_table1,col3=2017-04-22/col4=0,LOW_WATERMARK,gold
2019-10-01T12:13:14,hive,test_schema,test_table1,col3=2019-09-30/col4=11,HIGH_WATERMARK,gold
2019-10-01T12:13:14,dynamo,test_schema,test_table2,col3=2018-01-01,LOW_WATERMARK,gold
2019-10-01T12:13:14,dynamo,test_schema,test_table2,col3=2019-10-01,HIGH_WATERMARK,gold
...@@ -113,6 +113,39 @@ def load_col_data_from_csv(file_name): ...@@ -113,6 +113,39 @@ def load_col_data_from_csv(file_name):
conn.commit() conn.commit()
def load_watermark_data_from_csv(file_name):
conn = create_connection(DB_FILE)
if conn:
cur = conn.cursor()
cur.execute('drop table if exists test_watermark_metadata')
cur.execute('create table if not exists test_watermark_metadata '
'(create_time VARCHAR(64) NOT NULL , '
'database VARCHAR(64) NOT NULL , '
'schema_name VARCHAR(64) NOT NULL , '
'table_name VARCHAR(64) NOT NULL , '
'part_name VARCHAR(64) NOT NULL , '
'part_type VARCHAR(64) NOT NULL , '
'cluster VARCHAR(64) NOT NULL)')
file_loc = 'example/sample_data/' + file_name
with open(file_loc, 'r') as fin:
dr = csv.DictReader(fin)
to_db = []
for i in dr:
to_db.append((i['create_time'],
i['database'],
i['schema_name'],
i['table_name'],
i['part_name'],
i['part_type'],
i['cluster']))
cur.executemany("INSERT INTO test_watermark_metadata ("
"create_time, database, schema_name, table_name,"
"part_name, part_type, cluster) VALUES "
"(?, ?, ?, ?, ?, ?, ?);", to_db)
conn.commit()
def load_user_data_from_csv(file_name): def load_user_data_from_csv(file_name):
conn = create_connection(DB_FILE) conn = create_connection(DB_FILE)
if conn: if conn:
...@@ -357,6 +390,7 @@ if __name__ == "__main__": ...@@ -357,6 +390,7 @@ if __name__ == "__main__":
load_table_data_from_csv('sample_table.csv') load_table_data_from_csv('sample_table.csv')
load_col_data_from_csv('sample_col.csv') load_col_data_from_csv('sample_col.csv')
load_watermark_data_from_csv('sample_watermark.csv')
load_usage_data_from_csv('sample_column_usage.csv') load_usage_data_from_csv('sample_column_usage.csv')
load_user_data_from_csv('sample_user.csv') load_user_data_from_csv('sample_user.csv')
load_application_data_from_csv('sample_application.csv') load_application_data_from_csv('sample_application.csv')
...@@ -372,6 +406,11 @@ if __name__ == "__main__": ...@@ -372,6 +406,11 @@ if __name__ == "__main__":
'example.models.test_column_model.TestColumnMetadata') 'example.models.test_column_model.TestColumnMetadata')
job2.launch() job2.launch()
# # start watermark job
job3 = create_sample_job('test_watermark_metadata',
'databuilder.models.watermark.Watermark')
job3.launch()
# start usage job # start usage job
job_col_usage = create_sample_job('test_usage_metadata', job_col_usage = create_sample_job('test_usage_metadata',
'example.models.test_column_usage_model.TestColumnUsageModel') 'example.models.test_column_usage_model.TestColumnUsageModel')
......
...@@ -2,7 +2,7 @@ import os ...@@ -2,7 +2,7 @@ import os
from setuptools import setup, find_packages from setuptools import setup, find_packages
__version__ = '1.4.6' __version__ = '1.4.7'
requirements_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'requirements.txt') requirements_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'requirements.txt')
......
...@@ -5,8 +5,8 @@ from databuilder.models.neo4j_csv_serde import NODE_KEY, \ ...@@ -5,8 +5,8 @@ from databuilder.models.neo4j_csv_serde import NODE_KEY, \
NODE_LABEL, RELATION_START_KEY, RELATION_START_LABEL, RELATION_END_KEY, \ NODE_LABEL, RELATION_START_KEY, RELATION_START_LABEL, RELATION_END_KEY, \
RELATION_END_LABEL, RELATION_TYPE, RELATION_REVERSE_TYPE RELATION_END_LABEL, RELATION_TYPE, RELATION_REVERSE_TYPE
CREATE_TIME = '2017-09-18T00:00:00' CREATE_TIME = '2017-09-18T00:00:00'
DATABASE = 'HIVE'
SCHEMA = 'BASE' SCHEMA = 'BASE'
TABLE = 'TEST' TABLE = 'TEST'
NESTED_PART = 'ds=2017-09-18/feature_id=9' NESTED_PART = 'ds=2017-09-18/feature_id=9'
...@@ -19,7 +19,7 @@ class TestHiveWatermark(unittest.TestCase): ...@@ -19,7 +19,7 @@ class TestHiveWatermark(unittest.TestCase):
def setUp(self): def setUp(self):
# type: () -> None # type: () -> None
super(TestHiveWatermark, self).setUp() super(TestHiveWatermark, self).setUp()
self.hive_watermark = HiveWatermark(create_time='2017-09-18T00:00:00', self.watermark = HiveWatermark(create_time='2017-09-18T00:00:00',
schema_name=SCHEMA, schema_name=SCHEMA,
table_name=TABLE, table_name=TABLE,
cluster=CLUSTER, cluster=CLUSTER,
...@@ -27,7 +27,13 @@ class TestHiveWatermark(unittest.TestCase): ...@@ -27,7 +27,13 @@ class TestHiveWatermark(unittest.TestCase):
part_name=NESTED_PART) part_name=NESTED_PART)
self.expected_node_result = { self.expected_node_result = {
NODE_KEY: 'hive://default.base/test/low_watermark/', NODE_KEY: '{database}://{cluster}.{schema}/{table}/{part_type}/'
.format(
database=DATABASE.lower(),
cluster=CLUSTER.lower(),
schema=SCHEMA.lower(),
table=TABLE.lower(),
part_type=PART_TYPE.lower()),
NODE_LABEL: 'Watermark', NODE_LABEL: 'Watermark',
'partition_key': 'ds', 'partition_key': 'ds',
'partition_value': '2017-09-18/feature_id=9', 'partition_value': '2017-09-18/feature_id=9',
...@@ -35,9 +41,20 @@ class TestHiveWatermark(unittest.TestCase): ...@@ -35,9 +41,20 @@ class TestHiveWatermark(unittest.TestCase):
} }
self.expected_relation_result = { self.expected_relation_result = {
RELATION_START_KEY: 'hive://default.base/test/low_watermark/', RELATION_START_KEY: '{database}://{cluster}.{schema}/{table}/{part_type}/'
.format(
database=DATABASE.lower(),
cluster=CLUSTER.lower(),
schema=SCHEMA.lower(),
table=TABLE.lower(),
part_type=PART_TYPE.lower()),
RELATION_START_LABEL: 'Watermark', RELATION_START_LABEL: 'Watermark',
RELATION_END_KEY: 'hive://default.base/test', RELATION_END_KEY: '{database}://{cluster}.{schema}/{table}'
.format(
database=DATABASE.lower(),
cluster=CLUSTER.lower(),
schema=SCHEMA.lower(),
table=TABLE.lower()),
RELATION_END_LABEL: 'Table', RELATION_END_LABEL: 'Table',
RELATION_TYPE: 'BELONG_TO_TABLE', RELATION_TYPE: 'BELONG_TO_TABLE',
RELATION_REVERSE_TYPE: 'WATERMARK' RELATION_REVERSE_TYPE: 'WATERMARK'
...@@ -45,32 +62,42 @@ class TestHiveWatermark(unittest.TestCase): ...@@ -45,32 +62,42 @@ class TestHiveWatermark(unittest.TestCase):
def test_get_watermark_model_key(self): def test_get_watermark_model_key(self):
# type: () -> None # type: () -> None
watermark = self.hive_watermark.get_watermark_model_key() watermark = self.watermark.get_watermark_model_key()
self.assertEquals(watermark, 'hive://default.base/test/low_watermark/') self.assertEquals(
watermark, '{database}://{cluster}.{schema}/{table}/{part_type}/'
.format(database=DATABASE.lower(),
cluster=CLUSTER.lower(),
schema=SCHEMA.lower(),
table=TABLE.lower(),
part_type=PART_TYPE.lower()))
def test_get_metadata_model_key(self): def test_get_metadata_model_key(self):
# type: () -> None # type: () -> None
metadata = self.hive_watermark.get_metadata_model_key() metadata = self.watermark.get_metadata_model_key()
self.assertEquals(metadata, 'hive://default.base/test') self.assertEquals(metadata, '{database}://{cluster}.{schema}/{table}'
.format(database=DATABASE.lower(),
cluster=CLUSTER.lower(),
schema=SCHEMA.lower(),
table=TABLE.lower()))
def test_create_nodes(self): def test_create_nodes(self):
# type: () -> None # type: () -> None
nodes = self.hive_watermark.create_nodes() nodes = self.watermark.create_nodes()
self.assertEquals(len(nodes), 1) self.assertEquals(len(nodes), 1)
self.assertEquals(nodes[0], self.expected_node_result) self.assertEquals(nodes[0], self.expected_node_result)
def test_create_relation(self): def test_create_relation(self):
# type: () -> None # type: () -> None
relation = self.hive_watermark.create_relation() relation = self.watermark.create_relation()
self.assertEquals(len(relation), 1) self.assertEquals(len(relation), 1)
self.assertEquals(relation[0], self.expected_relation_result) self.assertEquals(relation[0], self.expected_relation_result)
def test_create_next_node(self): def test_create_next_node(self):
# type: () -> None # type: () -> None
next_node = self.hive_watermark.create_next_node() next_node = self.watermark.create_next_node()
self.assertEquals(next_node, self.expected_node_result) self.assertEquals(next_node, self.expected_node_result)
def test_create_next_relation(self): def test_create_next_relation(self):
# type: () -> None # type: () -> None
next_relation = self.hive_watermark.create_next_relation() next_relation = self.watermark.create_next_relation()
self.assertEquals(next_relation, self.expected_relation_result) self.assertEquals(next_relation, self.expected_relation_result)
import unittest
from databuilder.models.watermark import Watermark
from databuilder.models.neo4j_csv_serde import NODE_KEY, \
NODE_LABEL, RELATION_START_KEY, RELATION_START_LABEL, RELATION_END_KEY, \
RELATION_END_LABEL, RELATION_TYPE, RELATION_REVERSE_TYPE
CREATE_TIME = '2017-09-18T00:00:00'
DATABASE = 'DYNAMO'
SCHEMA = 'BASE'
TABLE = 'TEST'
NESTED_PART = 'ds=2017-09-18/feature_id=9'
CLUSTER = 'DEFAULT'
PART_TYPE = 'LOW_WATERMARK'
class TestWatermark(unittest.TestCase):
def setUp(self):
# type: () -> None
super(TestWatermark, self).setUp()
self.watermark = Watermark(create_time='2017-09-18T00:00:00',
database=DATABASE,
schema_name=SCHEMA,
table_name=TABLE,
cluster=CLUSTER,
part_type=PART_TYPE,
part_name=NESTED_PART)
self.expected_node_result = {
NODE_KEY: '{database}://{cluster}.{schema}/{table}/{part_type}/'
.format(
database=DATABASE.lower(),
cluster=CLUSTER.lower(),
schema=SCHEMA.lower(),
table=TABLE.lower(),
part_type=PART_TYPE.lower()),
NODE_LABEL: 'Watermark',
'partition_key': 'ds',
'partition_value': '2017-09-18/feature_id=9',
'create_time': '2017-09-18T00:00:00'
}
self.expected_relation_result = {
RELATION_START_KEY: '{database}://{cluster}.{schema}/{table}/{part_type}/'
.format(
database=DATABASE.lower(),
cluster=CLUSTER.lower(),
schema=SCHEMA.lower(),
table=TABLE.lower(),
part_type=PART_TYPE.lower()),
RELATION_START_LABEL: 'Watermark',
RELATION_END_KEY: '{database}://{cluster}.{schema}/{table}'
.format(
database=DATABASE.lower(),
cluster=CLUSTER.lower(),
schema=SCHEMA.lower(),
table=TABLE.lower()),
RELATION_END_LABEL: 'Table',
RELATION_TYPE: 'BELONG_TO_TABLE',
RELATION_REVERSE_TYPE: 'WATERMARK'
}
def test_get_watermark_model_key(self):
# type: () -> None
watermark = self.watermark.get_watermark_model_key()
self.assertEquals(
watermark, '{database}://{cluster}.{schema}/{table}/{part_type}/'
.format(database=DATABASE.lower(),
cluster=CLUSTER.lower(),
schema=SCHEMA.lower(),
table=TABLE.lower(),
part_type=PART_TYPE.lower()))
def test_get_metadata_model_key(self):
# type: () -> None
metadata = self.watermark.get_metadata_model_key()
self.assertEquals(metadata, '{database}://{cluster}.{schema}/{table}'
.format(database=DATABASE.lower(),
cluster=CLUSTER.lower(),
schema=SCHEMA.lower(),
table=TABLE.lower()))
def test_create_nodes(self):
# type: () -> None
nodes = self.watermark.create_nodes()
self.assertEquals(len(nodes), 1)
self.assertEquals(nodes[0], self.expected_node_result)
def test_create_relation(self):
# type: () -> None
relation = self.watermark.create_relation()
self.assertEquals(len(relation), 1)
self.assertEquals(relation[0], self.expected_relation_result)
def test_create_next_node(self):
# type: () -> None
next_node = self.watermark.create_next_node()
self.assertEquals(next_node, self.expected_node_result)
def test_create_next_relation(self):
# type: () -> None
next_relation = self.watermark.create_next_relation()
self.assertEquals(next_relation, self.expected_relation_result)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment