Unverified Commit 898d067a authored by Tao Feng's avatar Tao Feng Committed by GitHub

Move ESDocument to TableESDocument (#17)

parent 4af09d85
import json
from typing import List, Optional # noqa: F401
from abc import ABCMeta, abstractmethod
class ElasticsearchDocument:
"""
Schema for the Search index document
Base class for ElasticsearchDocument
Each different resource ESDoc will be a subclass
"""
def __init__(self,
elasticsearch_index, # type: str
elasticsearch_type, # type: str
database, # type: str
cluster, # type: str
schema_name, # type: str
table_name, # type: str
table_key, # type: str
table_description, # type: str
table_last_updated_epoch, # type: Optional[int]
column_names, # type: List[str]
column_descriptions, # type: List[str]
total_usage, # type: int
unique_usage, # type: int
tag_names, # type: List[str]
):
# type: (...) -> None
self.elasticsearch_index = elasticsearch_index
self.elasticsearch_type = elasticsearch_type
self.database = database
self.cluster = cluster
self.schema_name = schema_name
self.table_name = table_name
self.table_key = table_key
self.table_description = table_description
self.table_last_updated_epoch = table_last_updated_epoch
self.column_names = column_names
self.column_descriptions = column_descriptions
self.total_usage = total_usage
self.unique_usage = unique_usage
# todo: will include tag_type once we have better understanding from UI flow.
self.tag_names = tag_names
__metaclass__ = ABCMeta
@abstractmethod
def to_json(self):
# type: () -> str
"""
......@@ -47,13 +17,4 @@ class ElasticsearchDocument:
https://www.elastic.co/guide/en/elasticsearch/reference/6.2/docs-bulk.html
:return:
"""
index_row = dict(index=dict(_index=self.elasticsearch_index,
_type=self.elasticsearch_type))
data = json.dumps(index_row) + "\n"
# convert rest of the object
obj_dict = {k: v for k, v in sorted(self.__dict__.items())
if k not in ['elasticsearch_index', 'elasticsearch_type']}
data += json.dumps(obj_dict) + "\n"
return data
pass
import json
from typing import List, Optional # noqa: F401
from databuilder.models.elasticsearch_document import ElasticsearchDocument
class TableESDocument(ElasticsearchDocument):
"""
Schema for the Search index document
"""
def __init__(self,
elasticsearch_index, # type: str
elasticsearch_type, # type: str
database, # type: str
cluster, # type: str
schema_name, # type: str
table_name, # type: str
table_key, # type: str
table_description, # type: str
table_last_updated_epoch, # type: Optional[int]
column_names, # type: List[str]
column_descriptions, # type: List[str]
total_usage, # type: int
unique_usage, # type: int
tag_names, # type: List[str]
):
# type: (...) -> None
self.elasticsearch_index = elasticsearch_index
self.elasticsearch_type = elasticsearch_type
self.database = database
self.cluster = cluster
self.schema_name = schema_name
self.table_name = table_name
self.table_key = table_key
self.table_description = table_description
self.table_last_updated_epoch = table_last_updated_epoch
self.column_names = column_names
self.column_descriptions = column_descriptions
self.total_usage = total_usage
self.unique_usage = unique_usage
# todo: will include tag_type once we have better understanding from UI flow.
self.tag_names = tag_names
def to_json(self):
# type: () -> str
"""
Convert object to json for elasticsearch bulk upload
Bulk load JSON format is defined here:
https://www.elastic.co/guide/en/elasticsearch/reference/6.2/docs-bulk.html
:return:
"""
index_row = dict(index=dict(_index=self.elasticsearch_index,
_type=self.elasticsearch_type))
data = json.dumps(index_row) + "\n"
# convert rest of the object
obj_dict = {k: v for k, v in sorted(self.__dict__.items())
if k not in ['elasticsearch_index', 'elasticsearch_type']}
data += json.dumps(obj_dict) + "\n"
return data
......@@ -2,7 +2,7 @@ from pyhocon import ConfigTree # noqa: F401
from typing import Optional # noqa: F401
from databuilder.transformer.base_transformer import Transformer
from databuilder.models.elasticsearch_document import ElasticsearchDocument
from databuilder.models.table_elasticsearch_document import TableESDocument
from databuilder.models.neo4j_data import Neo4jDataResult
......@@ -28,20 +28,20 @@ class ElasticsearchDocumentTransformer(Transformer):
if not isinstance(record, Neo4jDataResult):
raise Exception("ElasticsearchDocumentTransformer expects record of type 'Neo4jDataResult'!")
elasticsearch_obj = ElasticsearchDocument(elasticsearch_index=self.elasticsearch_index,
elasticsearch_type=self.elasticsearch_type,
database=record.database,
cluster=record.cluster,
schema_name=record.schema_name,
table_name=record.table_name,
table_key=record.table_key,
table_description=record.table_description,
table_last_updated_epoch=record.table_last_updated_epoch,
column_names=record.column_names,
column_descriptions=record.column_descriptions,
total_usage=record.total_usage,
unique_usage=record.unique_usage,
tag_names=record.tag_names)
elasticsearch_obj = TableESDocument(elasticsearch_index=self.elasticsearch_index,
elasticsearch_type=self.elasticsearch_type,
database=record.database,
cluster=record.cluster,
schema_name=record.schema_name,
table_name=record.table_name,
table_key=record.table_key,
table_description=record.table_description,
table_last_updated_epoch=record.table_last_updated_epoch,
column_names=record.column_names,
column_descriptions=record.column_descriptions,
total_usage=record.total_usage,
unique_usage=record.unique_usage,
tag_names=record.tag_names)
return elasticsearch_obj
def get_scope(self):
......
from setuptools import setup, find_packages
__version__ = '1.0.7'
__version__ = '1.0.8'
setup(
......
......@@ -8,7 +8,7 @@ from typing import Any, List # noqa: F401
from databuilder import Scoped
from databuilder.loader.file_system_elasticsearch_json_loader import FSElasticsearchJSONLoader
from databuilder.models.elasticsearch_document import ElasticsearchDocument
from databuilder.models.table_elasticsearch_document import TableESDocument
class TestFSElasticsearchJSONLoader(unittest.TestCase):
......@@ -91,20 +91,20 @@ class TestFSElasticsearchJSONLoader(unittest.TestCase):
loader.init(conf=Scoped.get_scoped_conf(conf=self.conf,
scope=loader.get_scope()))
data = ElasticsearchDocument(elasticsearch_index='test_es_index',
elasticsearch_type='test_es_type',
database='test_database',
cluster='test_cluster',
schema_name='test_schema',
table_name='test_table',
table_key='test_table_key',
table_last_updated_epoch=123456789,
table_description='test_description',
column_names=['test_col1', 'test_col2'],
column_descriptions=['test_comment1', 'test_comment2'],
total_usage=10,
unique_usage=5,
tag_names=['test_tag1', 'test_tag2'])
data = TableESDocument(elasticsearch_index='test_es_index',
elasticsearch_type='test_es_type',
database='test_database',
cluster='test_cluster',
schema_name='test_schema',
table_name='test_table',
table_key='test_table_key',
table_last_updated_epoch=123456789,
table_description='test_description',
column_names=['test_col1', 'test_col2'],
column_descriptions=['test_comment1', 'test_comment2'],
total_usage=10,
unique_usage=5,
tag_names=['test_tag1', 'test_tag2'])
loader.load(data)
loader.close()
......@@ -130,20 +130,20 @@ class TestFSElasticsearchJSONLoader(unittest.TestCase):
loader.init(conf=Scoped.get_scoped_conf(conf=self.conf,
scope=loader.get_scope()))
data = [ElasticsearchDocument(elasticsearch_index='test_es_index',
elasticsearch_type='test_es_type',
database='test_database',
cluster='test_cluster',
schema_name='test_schema',
table_name='test_table',
table_key='test_table_key',
table_last_updated_epoch=123456789,
table_description='test_description',
column_names=['test_col1', 'test_col2'],
column_descriptions=['test_comment1', 'test_comment2'],
total_usage=10,
unique_usage=5,
tag_names=['test_tag1', 'test_tag2'])] * 5
data = [TableESDocument(elasticsearch_index='test_es_index',
elasticsearch_type='test_es_type',
database='test_database',
cluster='test_cluster',
schema_name='test_schema',
table_name='test_table',
table_key='test_table_key',
table_last_updated_epoch=123456789,
table_description='test_description',
column_names=['test_col1', 'test_col2'],
column_descriptions=['test_comment1', 'test_comment2'],
total_usage=10,
unique_usage=5,
tag_names=['test_tag1', 'test_tag2'])] * 5
for d in data:
loader.load(d)
......
import json
import unittest
from databuilder.models.elasticsearch_document import ElasticsearchDocument
from databuilder.models.table_elasticsearch_document import TableESDocument
class TestElasticsearchDocument(unittest.TestCase):
......@@ -11,20 +11,20 @@ class TestElasticsearchDocument(unittest.TestCase):
"""
Test string generated from to_json method
"""
test_obj = ElasticsearchDocument(elasticsearch_index='test_index',
elasticsearch_type='test_type',
database='test_database',
cluster='test_cluster',
schema_name='test_schema',
table_name='test_table',
table_key='test_table_key',
table_last_updated_epoch=123456789,
table_description='test_table_description',
column_names=['test_col1', 'test_col2'],
column_descriptions=['test_description1', 'test_description2'],
total_usage=100,
unique_usage=10,
tag_names=['test'])
test_obj = TableESDocument(elasticsearch_index='test_index',
elasticsearch_type='test_type',
database='test_database',
cluster='test_cluster',
schema_name='test_schema',
table_name='test_table',
table_key='test_table_key',
table_last_updated_epoch=123456789,
table_description='test_table_description',
column_names=['test_col1', 'test_col2'],
column_descriptions=['test_description1', 'test_description2'],
total_usage=100,
unique_usage=10,
tag_names=['test'])
expected_index_dict = {"index": {"_type": "test_type", "_index": "test_index"}}
expected_document_dict = {"database": "test_database",
......
......@@ -5,6 +5,7 @@ from pyhocon import ConfigFactory # noqa: F401
from databuilder import Scoped
from databuilder.transformer.elasticsearch_document_transformer import ElasticsearchDocumentTransformer
from databuilder.models.elasticsearch_document import ElasticsearchDocument
from databuilder.models.table_elasticsearch_document import TableESDocument
from databuilder.models.neo4j_data import Neo4jDataResult
......@@ -71,21 +72,21 @@ class TestElasticsearchDocumentTransformer(unittest.TestCase):
result = transformer.transform(data)
expected = ElasticsearchDocument(elasticsearch_index='test_es_index',
elasticsearch_type='test_es_type',
database="test_database",
cluster="test_cluster",
schema_name="test_schema_name",
table_name="test_table_name",
table_key="test_table_key",
table_last_updated_epoch=123456789,
table_description="test_table_description",
column_names=["test_col1", "test_col2"],
column_descriptions=["test_col_description1",
"test_col_description2"],
total_usage=10,
unique_usage=5,
tag_names=["test_tag1", "test_tag2"])
expected = TableESDocument(elasticsearch_index='test_es_index',
elasticsearch_type='test_es_type',
database="test_database",
cluster="test_cluster",
schema_name="test_schema_name",
table_name="test_table_name",
table_key="test_table_key",
table_last_updated_epoch=123456789,
table_description="test_table_description",
column_names=["test_col1", "test_col2"],
column_descriptions=["test_col_description1",
"test_col_description2"],
total_usage=10,
unique_usage=5,
tag_names=["test_tag1", "test_tag2"])
self.assertIsInstance(result, ElasticsearchDocument)
self.assertDictEqual(vars(result), vars(expected))
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment