Unverified Commit 06c1dbef authored by Tao Feng's avatar Tao Feng Committed by GitHub

chore: remove deprecate hive watermark model (#281)

* chore: remove deprecate hive watermark model

* bump version
parent 29bfceab
from databuilder.models.watermark import Watermark
import warnings
warnings.warn("HiveWatermark class is deprecated. Use Watermark instead",
DeprecationWarning, stacklevel=2)
class HiveWatermark(Watermark):
# type: (...) -> None
"""
Hive table watermark result model.
Each instance represents one row of hive table watermark result.
"""
def __init__(self,
create_time, # type: str
schema, # type: str
table_name, # type: str
part_name, # type: str
part_type='high_watermark', # type: str
cluster='gold', # type: str
):
# type: (...) -> None
super(HiveWatermark, self).__init__(create_time=create_time,
database='hive',
schema=schema,
table_name=table_name,
part_name=part_name,
part_type=part_type,
cluster=cluster,
)
...@@ -97,7 +97,7 @@ def create_table_wm_job(**kwargs): ...@@ -97,7 +97,7 @@ def create_table_wm_job(**kwargs):
job_config = ConfigFactory.from_dict({ job_config = ConfigFactory.from_dict({
'extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): connection_string(), 'extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): connection_string(),
'extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.EXTRACT_SQL): sql, 'extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.EXTRACT_SQL): sql,
'extractor.sqlalchemy.model_class': 'databuilder.models.hive_watermark.HiveWatermark', 'extractor.sqlalchemy.model_class': 'databuilder.models.watermark.Watermark',
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.NODE_DIR_PATH): 'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.NODE_DIR_PATH):
node_files_folder, node_files_folder,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.RELATION_DIR_PATH): 'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.RELATION_DIR_PATH):
......
...@@ -2,7 +2,7 @@ import os ...@@ -2,7 +2,7 @@ import os
from setuptools import setup, find_packages from setuptools import setup, find_packages
__version__ = '2.5.19' __version__ = '2.6.0'
requirements_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'requirements.txt') requirements_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'requirements.txt')
with open(requirements_path) as requirements_file: with open(requirements_path) as requirements_file:
......
import unittest
from databuilder.models.hive_watermark import HiveWatermark
from databuilder.models.neo4j_csv_serde import NODE_KEY, \
NODE_LABEL, RELATION_START_KEY, RELATION_START_LABEL, RELATION_END_KEY, \
RELATION_END_LABEL, RELATION_TYPE, RELATION_REVERSE_TYPE
CREATE_TIME = '2017-09-18T00:00:00'
DATABASE = 'HIVE'
SCHEMA = 'BASE'
TABLE = 'TEST'
NESTED_PART = 'ds=2017-09-18/feature_id=9'
CLUSTER = 'DEFAULT'
PART_TYPE = 'LOW_WATERMARK'
class TestHiveWatermark(unittest.TestCase):
def setUp(self):
# type: () -> None
super(TestHiveWatermark, self).setUp()
self.watermark = HiveWatermark(create_time='2017-09-18T00:00:00',
schema=SCHEMA,
table_name=TABLE,
cluster=CLUSTER,
part_type=PART_TYPE,
part_name=NESTED_PART)
self.expected_node_result = {
NODE_KEY: '{database}://{cluster}.{schema}/{table}/{part_type}/'
.format(
database=DATABASE.lower(),
cluster=CLUSTER.lower(),
schema=SCHEMA.lower(),
table=TABLE.lower(),
part_type=PART_TYPE.lower()),
NODE_LABEL: 'Watermark',
'partition_key': 'ds',
'partition_value': '2017-09-18/feature_id=9',
'create_time': '2017-09-18T00:00:00'
}
self.expected_relation_result = {
RELATION_START_KEY: '{database}://{cluster}.{schema}/{table}/{part_type}/'
.format(
database=DATABASE.lower(),
cluster=CLUSTER.lower(),
schema=SCHEMA.lower(),
table=TABLE.lower(),
part_type=PART_TYPE.lower()),
RELATION_START_LABEL: 'Watermark',
RELATION_END_KEY: '{database}://{cluster}.{schema}/{table}'
.format(
database=DATABASE.lower(),
cluster=CLUSTER.lower(),
schema=SCHEMA.lower(),
table=TABLE.lower()),
RELATION_END_LABEL: 'Table',
RELATION_TYPE: 'BELONG_TO_TABLE',
RELATION_REVERSE_TYPE: 'WATERMARK'
}
def test_get_watermark_model_key(self):
# type: () -> None
watermark = self.watermark.get_watermark_model_key()
self.assertEquals(
watermark, '{database}://{cluster}.{schema}/{table}/{part_type}/'
.format(database=DATABASE.lower(),
cluster=CLUSTER.lower(),
schema=SCHEMA.lower(),
table=TABLE.lower(),
part_type=PART_TYPE.lower()))
def test_get_metadata_model_key(self):
# type: () -> None
metadata = self.watermark.get_metadata_model_key()
self.assertEquals(metadata, '{database}://{cluster}.{schema}/{table}'
.format(database=DATABASE.lower(),
cluster=CLUSTER.lower(),
schema=SCHEMA.lower(),
table=TABLE.lower()))
def test_create_nodes(self):
# type: () -> None
nodes = self.watermark.create_nodes()
self.assertEquals(len(nodes), 1)
self.assertEquals(nodes[0], self.expected_node_result)
def test_create_relation(self):
# type: () -> None
relation = self.watermark.create_relation()
self.assertEquals(len(relation), 1)
self.assertEquals(relation[0], self.expected_relation_result)
def test_create_next_node(self):
# type: () -> None
next_node = self.watermark.create_next_node()
self.assertEquals(next_node, self.expected_node_result)
def test_create_next_relation(self):
# type: () -> None
next_relation = self.watermark.create_next_relation()
self.assertEquals(next_relation, self.expected_relation_result)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment