Unverified Commit 30d4cf4b authored by Tao Feng's avatar Tao Feng Committed by GitHub

Modify es transformer to support new ES document (#19)

* Modify es transformer to support new ES document

* Update per feedback
parent 7ec686ba
import json
from typing import List, Optional # noqa: F401
from databuilder.models.elasticsearch_document import ElasticsearchDocument
class UserESDocument(ElasticsearchDocument):
"""
Schema for the Search index document for user
"""
def __init__(self,
elasticsearch_index, # type: str
elasticsearch_type, # type: str
email, # type: str
first_name, # type: str
last_name, # type: str
name, # type: str
github_username, # type: str
team_name, # type: str
employee_type, # type: str
manager_email, # type: str
slack_id, # type: str
is_active, # type: bool
total_read, # type: int
total_own, # type: int
total_follow, # type: int
):
# type: (...) -> None
self.elasticsearch_index = elasticsearch_index
self.elasticsearch_type = elasticsearch_type
self.email = email
self.first_name = first_name
self.last_name = last_name
self.name = name
self.github_username = github_username
self.team_name = team_name
self.employee_type = employee_type
self.manager_email = manager_email
self.slack_id = slack_id
self.is_active = is_active
self.total_read = total_read
self.total_own = total_own
self.total_follow = total_follow
def to_json(self):
# type: () -> str
"""
Convert object to json for elasticsearch bulk upload
Bulk load JSON format is defined here:
https://www.elastic.co/guide/en/elasticsearch/reference/6.2/docs-bulk.html
:return:
"""
index_row = dict(index=dict(_index=self.elasticsearch_index,
_type=self.elasticsearch_type))
data = json.dumps(index_row) + "\n"
# convert rest of the object
obj_dict = {k: v for k, v in sorted(self.__dict__.items())
if k not in ['elasticsearch_index', 'elasticsearch_type']}
data += json.dumps(obj_dict) + "\n"
return data
......@@ -3,6 +3,7 @@ from typing import Optional # noqa: F401
from databuilder.transformer.base_transformer import Transformer
from databuilder.models.table_elasticsearch_document import TableESDocument
from databuilder.models.user_elasticsearch_document import UserESDocument
from databuilder.models.neo4j_data import Neo4jDataResult
......@@ -12,6 +13,11 @@ class ElasticsearchDocumentTransformer(Transformer):
"""
ELASTICSEARCH_INDEX_CONFIG_KEY = 'index'
ELASTICSEARCH_DOC_CONFIG_KEY = 'doc_type'
ELASTICSEARCH_RESOURCE_CONFIG_KEY = 'resource_type'
RESOURCE_TYPE_MAPPING = {
'table',
'user'
}
def init(self, conf):
# type: (ConfigTree) -> None
......@@ -19,6 +25,10 @@ class ElasticsearchDocumentTransformer(Transformer):
self.elasticsearch_index = self.conf.get_string(ElasticsearchDocumentTransformer.ELASTICSEARCH_INDEX_CONFIG_KEY)
self.elasticsearch_type = self.conf.get_string(ElasticsearchDocumentTransformer.ELASTICSEARCH_DOC_CONFIG_KEY)
# values: table, user
self.elasticsearch_resource_type = self.conf.get_string(ElasticsearchDocumentTransformer.
ELASTICSEARCH_RESOURCE_CONFIG_KEY,
'table')
def transform(self, record):
# type: (Neo4jDataResult) -> Optional[ElasticsearchDocument]
......@@ -28,20 +38,45 @@ class ElasticsearchDocumentTransformer(Transformer):
if not isinstance(record, Neo4jDataResult):
raise Exception("ElasticsearchDocumentTransformer expects record of type 'Neo4jDataResult'!")
elasticsearch_obj = TableESDocument(elasticsearch_index=self.elasticsearch_index,
elasticsearch_type=self.elasticsearch_type,
database=record.database,
cluster=record.cluster,
schema_name=record.schema_name,
table_name=record.table_name,
table_key=record.table_key,
table_description=record.table_description,
table_last_updated_epoch=record.table_last_updated_epoch,
column_names=record.column_names,
column_descriptions=record.column_descriptions,
total_usage=record.total_usage,
unique_usage=record.unique_usage,
tag_names=record.tag_names)
if self.elasticsearch_resource_type.lower() not in \
ElasticsearchDocumentTransformer.RESOURCE_TYPE_MAPPING:
raise Exception('resource type needs to define in RESOURCE_TYPE_MAPPING')
if self.elasticsearch_resource_type == 'table':
elasticsearch_obj = TableESDocument(elasticsearch_index=self.elasticsearch_index,
elasticsearch_type=self.elasticsearch_type,
database=record.database,
cluster=record.cluster,
schema_name=record.schema_name,
table_name=record.table_name,
table_key=record.table_key,
table_description=record.table_description,
table_last_updated_epoch=record.table_last_updated_epoch,
column_names=record.column_names,
column_descriptions=record.column_descriptions,
total_usage=record.total_usage,
unique_usage=record.unique_usage,
tag_names=record.tag_names)
elif self.elasticsearch_resource_type == 'user':
elasticsearch_obj = UserESDocument(elasticsearch_index=self.elasticsearch_index,
elasticsearch_type=self.elasticsearch_type,
email=record.email,
first_name=record.first_name,
last_name=record.last_name,
name=record.name,
github_username=record.github_username,
team_name=record.team_name,
employee_type=record.employee_type,
manager_email=record.manager_email,
slack_id=record.slack_id,
is_active=record.is_active,
total_read=record.total_read,
total_own=record.total_own,
total_follow=record.total_follow,
)
else:
raise NotImplementedError()
return elasticsearch_obj
def get_scope(self):
......
from setuptools import setup, find_packages
__version__ = '1.0.8'
__version__ = '1.0.9'
setup(
......
......@@ -4,7 +4,7 @@ import unittest
from databuilder.models.table_elasticsearch_document import TableESDocument
class TestElasticsearchDocument(unittest.TestCase):
class TestTableElasticsearchDocument(unittest.TestCase):
def test_to_json(self):
# type: () -> None
......
import json
import unittest
from databuilder.models.user_elasticsearch_document import UserESDocument
class TestUserElasticsearchDocument(unittest.TestCase):
def test_to_json(self):
# type: () -> None
"""
Test string generated from to_json method
"""
test_obj = UserESDocument(elasticsearch_index='test_index',
elasticsearch_type='test_type',
email='test@email.com',
first_name='test_firstname',
last_name='test_lastname',
name='full_name',
github_username='github_user',
team_name='team',
employee_type='fte',
manager_email='test_manager',
slack_id='test_slack',
is_active=True,
total_read=2,
total_own=3,
total_follow=1)
expected_index_dict = {"index": {"_type": "test_type", "_index": "test_index"}}
expected_document_dict = {"first_name": "test_firstname",
"last_name": "test_lastname",
"name": "full_name",
"team_name": "team",
"total_follow": 1,
"total_read": 2,
"is_active": True,
"total_own": 3,
"slack_id": 'test_slack',
"manager_email": "test_manager",
'github_username': "github_user",
"employee_type": 'fte',
"email": "test@email.com",
}
result = test_obj.to_json()
results = result.split("\n")
# verify two new line characters in result
self.assertEqual(len(results), 3, "Result from to_json() function doesn't have 2 newlines!")
self.assertDictEqual(json.loads(results[0]), expected_index_dict)
self.assertDictEqual(json.loads(results[1]), expected_document_dict)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment