Unverified Commit 014690ea authored by Junda Yang's avatar Junda Yang Committed by GitHub

generalize ElasticsearchDocumentTransformer (#79)

* generalize ElasticsearchDocumentTransformer

* remove typo

* fix lint
parent 9a261695
import importlib
from pyhocon import ConfigTree # noqa: F401
from typing import Optional # noqa: F401
from databuilder.transformer.base_transformer import Transformer
from databuilder.models.table_elasticsearch_document import TableESDocument
from databuilder.models.user_elasticsearch_document import UserESDocument
from databuilder.models.neo4j_data import Neo4jDataResult
......@@ -13,11 +13,7 @@ class ElasticsearchDocumentTransformer(Transformer):
"""
ELASTICSEARCH_INDEX_CONFIG_KEY = 'index'
ELASTICSEARCH_DOC_CONFIG_KEY = 'doc_type'
ELASTICSEARCH_RESOURCE_CONFIG_KEY = 'resource_type'
RESOURCE_TYPE_MAPPING = {
'table',
'user'
}
ELASTICSEARCH_DOC_MODEL_CLASS_KEY = 'model_class'
def init(self, conf):
# type: (ConfigTree) -> None
......@@ -25,10 +21,15 @@ class ElasticsearchDocumentTransformer(Transformer):
self.elasticsearch_index = self.conf.get_string(ElasticsearchDocumentTransformer.ELASTICSEARCH_INDEX_CONFIG_KEY)
self.elasticsearch_type = self.conf.get_string(ElasticsearchDocumentTransformer.ELASTICSEARCH_DOC_CONFIG_KEY)
# values: table, user
self.elasticsearch_resource_type = self.conf.get_string(ElasticsearchDocumentTransformer.
ELASTICSEARCH_RESOURCE_CONFIG_KEY,
'table')
model_class = self.conf.get_string(ElasticsearchDocumentTransformer.ELASTICSEARCH_DOC_MODEL_CLASS_KEY, '')
if not model_class:
raise Exception('User needs to provide the ElasticsearchDocument model class')
module_name, class_name = model_class.rsplit(".", 1)
mod = importlib.import_module(module_name)
self.model_class = getattr(mod, class_name)
def transform(self, record):
# type: (Neo4jDataResult) -> Optional[ElasticsearchDocument]
......@@ -38,44 +39,9 @@ class ElasticsearchDocumentTransformer(Transformer):
if not isinstance(record, Neo4jDataResult):
raise Exception("ElasticsearchDocumentTransformer expects record of type 'Neo4jDataResult'!")
if self.elasticsearch_resource_type.lower() not in \
ElasticsearchDocumentTransformer.RESOURCE_TYPE_MAPPING:
raise Exception('resource type needs to define in RESOURCE_TYPE_MAPPING')
if self.elasticsearch_resource_type == 'table':
elasticsearch_obj = TableESDocument(elasticsearch_index=self.elasticsearch_index,
elasticsearch_type=self.elasticsearch_type,
database=record.database,
cluster=record.cluster,
schema_name=record.schema_name,
table_name=record.table_name,
table_key=record.table_key,
table_description=record.table_description,
table_last_updated_epoch=record.table_last_updated_epoch,
column_names=record.column_names,
column_descriptions=record.column_descriptions,
total_usage=record.total_usage,
unique_usage=record.unique_usage,
tag_names=record.tag_names)
elif self.elasticsearch_resource_type == 'user':
elasticsearch_obj = UserESDocument(elasticsearch_index=self.elasticsearch_index,
elasticsearch_type=self.elasticsearch_type,
email=record.email,
first_name=record.first_name,
last_name=record.last_name,
name=record.name,
github_username=record.github_username,
team_name=record.team_name,
employee_type=record.employee_type,
manager_email=record.manager_email,
slack_id=record.slack_id,
is_active=record.is_active,
total_read=record.total_read,
total_own=record.total_own,
total_follow=record.total_follow,
)
else:
raise NotImplementedError()
elasticsearch_obj = self.model_class(elasticsearch_index=self.elasticsearch_index,
elasticsearch_type=self.elasticsearch_type,
**vars(record))
return elasticsearch_obj
......
from setuptools import setup, find_packages
__version__ = '1.2.5'
__version__ = '1.2.6'
setup(
......
......@@ -16,7 +16,9 @@ class TestElasticsearchDocumentTransformer(unittest.TestCase):
self.elasticsearch_index = 'test_es_index'
self.elasticsearch_type = 'test_es_type'
config_dict = {'transformer.elasticsearch.index': self.elasticsearch_index,
'transformer.elasticsearch.doc_type': self.elasticsearch_type}
'transformer.elasticsearch.doc_type': self.elasticsearch_type,
'transformer.elasticsearch.model_class':
'databuilder.models.table_elasticsearch_document.TableESDocument'}
self.conf = ConfigFactory.from_dict(config_dict)
def test_empty_transform(self):
......@@ -90,3 +92,33 @@ class TestElasticsearchDocumentTransformer(unittest.TestCase):
self.assertIsInstance(result, ElasticsearchDocument)
self.assertDictEqual(vars(result), vars(expected))
def test_transform_without_model_class_conf(self):
# type: () -> None
"""
Test model_class conf is required
"""
config_dict = {'transformer.elasticsearch.index': self.elasticsearch_index,
'transformer.elasticsearch.doc_type': self.elasticsearch_type}
transformer = ElasticsearchDocumentTransformer()
with self.assertRaises(Exception) as context:
transformer.init(conf=Scoped.get_scoped_conf(conf=ConfigFactory.from_dict(config_dict),
scope=transformer.get_scope()))
self.assertTrue("User needs to provide the ElasticsearchDocument model class"
in context.exception)
def test_transform_with_invalid_model_class_conf(self):
# type: () -> None
"""
Test non existing model_class conf will throw error
"""
config_dict = {'transformer.elasticsearch.index': self.elasticsearch_index,
'transformer.elasticsearch.doc_type': self.elasticsearch_type,
'transformer.elasticsearch.model_class':
'databuilder.models.table_elasticsearch_document.NonExistingESDocument'}
transformer = ElasticsearchDocumentTransformer()
with self.assertRaises(Exception) as context:
transformer.init(conf=Scoped.get_scoped_conf(conf=ConfigFactory.from_dict(config_dict),
scope=transformer.get_scope()))
self.assertTrue("'module' object has no attribute 'NonExistingESDocument'"
in context.exception)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment