Unverified Commit 0487bb13 authored by Junda Yang's avatar Junda Yang Committed by GitHub

simplify elasticsearch databuilder workflow (#81)

* simplify elasticsearch databuilder workflow

* fix sample data loader
parent 104987a2
...@@ -187,9 +187,6 @@ An extractor that extracts table usage from SQL statements. It accept any extrac ...@@ -187,9 +187,6 @@ An extractor that extracts table usage from SQL statements. It accept any extrac
#### [ChainedTransformer](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/transformer/base_transformer.py#L41 "ChainedTransformer") #### [ChainedTransformer](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/transformer/base_transformer.py#L41 "ChainedTransformer")
A chanined transformer that can take multiple transformer. A chanined transformer that can take multiple transformer.
#### [ElasticsearchDocumentTransformer](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/transformer/elasticsearch_document_transformer.py "ElasticsearchDocumentTransformer")
A transformer that transform [Neo4j record](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/models/neo4j_data.py "Neo4j record") to [Elasticserach document](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/models/elasticsearch_document.py "Elasticserach document").
#### [RegexStrReplaceTransformer](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/transformer/regex_str_replace_transformer.py "RegexStrReplaceTransformer") #### [RegexStrReplaceTransformer](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/transformer/regex_str_replace_transformer.py "RegexStrReplaceTransformer")
Generic string replacement transformer using REGEX. User can pass list of tuples where tuple contains regex and replacement pair. Generic string replacement transformer using REGEX. User can pass list of tuples where tuple contains regex and replacement pair.
```python ```python
...@@ -287,6 +284,7 @@ job_config = ConfigFactory.from_dict({ ...@@ -287,6 +284,7 @@ job_config = ConfigFactory.from_dict({
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_MODE_CONFIG_KEY): 'r', 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_MODE_CONFIG_KEY): 'r',
'publisher.elasticsearch{}'.format(ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY): elasticsearch_client, 'publisher.elasticsearch{}'.format(ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY): elasticsearch_client,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY): elasticsearch_new_index, 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY): elasticsearch_new_index,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_DOC_TYPE_CONFIG_KEY): elasticsearch_doc_type,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY): elasticsearch_index_alias,) 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY): elasticsearch_index_alias,)
job = DefaultJob( job = DefaultJob(
......
from abc import ABCMeta, abstractmethod import json
from abc import ABCMeta
class ElasticsearchDocument: class ElasticsearchDocument:
...@@ -8,13 +9,12 @@ class ElasticsearchDocument: ...@@ -8,13 +9,12 @@ class ElasticsearchDocument:
""" """
__metaclass__ = ABCMeta __metaclass__ = ABCMeta
@abstractmethod
def to_json(self): def to_json(self):
# type: () -> str # type: () -> str
""" """
Convert object to json for elasticsearch bulk upload Convert object to json
Bulk load JSON format is defined here:
https://www.elastic.co/guide/en/elasticsearch/reference/6.2/docs-bulk.html
:return: :return:
""" """
pass obj_dict = {k: v for k, v in sorted(self.__dict__.items())}
data = json.dumps(obj_dict) + "\n"
return data
from typing import List, Optional # noqa: F401
class Neo4jDataResult:
"""
Neo4j Graph data model
CYPHER QUERY returns one column per row
"""
def __init__(self,
database, # type: str
cluster, # type: str
schema_name, # type: str
table_name, # type: str
table_key, # type: str
table_description, # type: str
table_last_updated_epoch, # type: Optional[int]
column_names, # type: List[str]
column_descriptions, # type: List[str]
total_usage, # type: int
unique_usage, # type: int
tag_names, # type: List[str]
):
# type: (...) -> None
self.database = database
self.cluster = cluster
self.schema_name = schema_name
self.table_name = table_name
self.table_key = table_key
self.table_description = table_description
self.table_last_updated_epoch = int(table_last_updated_epoch) if table_last_updated_epoch else None
self.column_names = column_names
self.column_descriptions = column_descriptions
self.total_usage = total_usage
self.unique_usage = unique_usage
self.tag_names = tag_names
import json
from typing import List, Optional # noqa: F401 from typing import List, Optional # noqa: F401
from databuilder.models.elasticsearch_document import ElasticsearchDocument from databuilder.models.elasticsearch_document import ElasticsearchDocument
...@@ -9,8 +8,6 @@ class TableESDocument(ElasticsearchDocument): ...@@ -9,8 +8,6 @@ class TableESDocument(ElasticsearchDocument):
Schema for the Search index document Schema for the Search index document
""" """
def __init__(self, def __init__(self,
elasticsearch_index, # type: str
elasticsearch_type, # type: str
database, # type: str database, # type: str
cluster, # type: str cluster, # type: str
schema_name, # type: str schema_name, # type: str
...@@ -25,37 +22,16 @@ class TableESDocument(ElasticsearchDocument): ...@@ -25,37 +22,16 @@ class TableESDocument(ElasticsearchDocument):
tag_names, # type: List[str] tag_names, # type: List[str]
): ):
# type: (...) -> None # type: (...) -> None
self.elasticsearch_index = elasticsearch_index
self.elasticsearch_type = elasticsearch_type
self.database = database self.database = database
self.cluster = cluster self.cluster = cluster
self.schema_name = schema_name self.schema_name = schema_name
self.table_name = table_name self.table_name = table_name
self.table_key = table_key self.table_key = table_key
self.table_description = table_description self.table_description = table_description
self.table_last_updated_epoch = table_last_updated_epoch self.table_last_updated_epoch = int(table_last_updated_epoch) if table_last_updated_epoch else None
self.column_names = column_names self.column_names = column_names
self.column_descriptions = column_descriptions self.column_descriptions = column_descriptions
self.total_usage = total_usage self.total_usage = total_usage
self.unique_usage = unique_usage self.unique_usage = unique_usage
# todo: will include tag_type once we have better understanding from UI flow. # todo: will include tag_type once we have better understanding from UI flow.
self.tag_names = tag_names self.tag_names = tag_names
def to_json(self):
# type: () -> str
"""
Convert object to json for elasticsearch bulk upload
Bulk load JSON format is defined here:
https://www.elastic.co/guide/en/elasticsearch/reference/6.2/docs-bulk.html
:return:
"""
index_row = dict(index=dict(_index=self.elasticsearch_index,
_type=self.elasticsearch_type))
data = json.dumps(index_row) + "\n"
# convert rest of the object
obj_dict = {k: v for k, v in sorted(self.__dict__.items())
if k not in ['elasticsearch_index', 'elasticsearch_type']}
data += json.dumps(obj_dict) + "\n"
return data
import json
from typing import List, Optional # noqa: F401
from databuilder.models.elasticsearch_document import ElasticsearchDocument from databuilder.models.elasticsearch_document import ElasticsearchDocument
...@@ -9,8 +6,6 @@ class UserESDocument(ElasticsearchDocument): ...@@ -9,8 +6,6 @@ class UserESDocument(ElasticsearchDocument):
Schema for the Search index document for user Schema for the Search index document for user
""" """
def __init__(self, def __init__(self,
elasticsearch_index, # type: str
elasticsearch_type, # type: str
email, # type: str email, # type: str
first_name, # type: str first_name, # type: str
last_name, # type: str last_name, # type: str
...@@ -26,8 +21,6 @@ class UserESDocument(ElasticsearchDocument): ...@@ -26,8 +21,6 @@ class UserESDocument(ElasticsearchDocument):
total_follow, # type: int total_follow, # type: int
): ):
# type: (...) -> None # type: (...) -> None
self.elasticsearch_index = elasticsearch_index
self.elasticsearch_type = elasticsearch_type
self.email = email self.email = email
self.first_name = first_name self.first_name = first_name
self.last_name = last_name self.last_name = last_name
...@@ -41,22 +34,3 @@ class UserESDocument(ElasticsearchDocument): ...@@ -41,22 +34,3 @@ class UserESDocument(ElasticsearchDocument):
self.total_read = total_read self.total_read = total_read
self.total_own = total_own self.total_own = total_own
self.total_follow = total_follow self.total_follow = total_follow
def to_json(self):
# type: () -> str
"""
Convert object to json for elasticsearch bulk upload
Bulk load JSON format is defined here:
https://www.elastic.co/guide/en/elasticsearch/reference/6.2/docs-bulk.html
:return:
"""
index_row = dict(index=dict(_index=self.elasticsearch_index,
_type=self.elasticsearch_type))
data = json.dumps(index_row) + "\n"
# convert rest of the object
obj_dict = {k: v for k, v in sorted(self.__dict__.items())
if k not in ['elasticsearch_index', 'elasticsearch_type']}
data += json.dumps(obj_dict) + "\n"
return data
...@@ -24,6 +24,7 @@ class ElasticsearchPublisher(Publisher): ...@@ -24,6 +24,7 @@ class ElasticsearchPublisher(Publisher):
FILE_MODE_CONFIG_KEY = 'mode' FILE_MODE_CONFIG_KEY = 'mode'
ELASTICSEARCH_CLIENT_CONFIG_KEY = 'client' ELASTICSEARCH_CLIENT_CONFIG_KEY = 'client'
ELASTICSEARCH_DOC_TYPE_CONFIG_KEY = 'doc_type'
ELASTICSEARCH_NEW_INDEX_CONFIG_KEY = 'new_index' ELASTICSEARCH_NEW_INDEX_CONFIG_KEY = 'new_index'
ELASTICSEARCH_ALIAS_CONFIG_KEY = 'alias' ELASTICSEARCH_ALIAS_CONFIG_KEY = 'alias'
ELASTICSEARCH_MAPPING_CONFIG_KEY = 'mapping' ELASTICSEARCH_MAPPING_CONFIG_KEY = 'mapping'
...@@ -116,6 +117,7 @@ class ElasticsearchPublisher(Publisher): ...@@ -116,6 +117,7 @@ class ElasticsearchPublisher(Publisher):
self.file_path = self.conf.get_string(ElasticsearchPublisher.FILE_PATH_CONFIG_KEY) self.file_path = self.conf.get_string(ElasticsearchPublisher.FILE_PATH_CONFIG_KEY)
self.file_mode = self.conf.get_string(ElasticsearchPublisher.FILE_MODE_CONFIG_KEY, 'w') self.file_mode = self.conf.get_string(ElasticsearchPublisher.FILE_MODE_CONFIG_KEY, 'w')
self.elasticsearch_type = self.conf.get_string(ElasticsearchPublisher.ELASTICSEARCH_DOC_TYPE_CONFIG_KEY)
self.elasticsearch_client = self.conf.get(ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY) self.elasticsearch_client = self.conf.get(ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY)
self.elasticsearch_new_index = self.conf.get(ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY) self.elasticsearch_new_index = self.conf.get(ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY)
self.elasticsearch_alias = self.conf.get(ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY) self.elasticsearch_alias = self.conf.get(ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY)
...@@ -153,11 +155,21 @@ class ElasticsearchPublisher(Publisher): ...@@ -153,11 +155,21 @@ class ElasticsearchPublisher(Publisher):
LOGGER.warning("received no data to upload to Elasticsearch!") LOGGER.warning("received no data to upload to Elasticsearch!")
return return
# Convert object to json for elasticsearch bulk upload
# Bulk load JSON format is defined here:
# https://www.elastic.co/guide/en/elasticsearch/reference/6.2/docs-bulk.html
bulk_actions = []
for action in actions:
index_row = dict(index=dict(_index=self.elasticsearch_new_index,
_type=self.elasticsearch_type))
bulk_actions.append(index_row)
bulk_actions.append(action)
# create new index with mapping # create new index with mapping
self.elasticsearch_client.indices.create(index=self.elasticsearch_new_index, body=self.elasticsearch_mapping) self.elasticsearch_client.indices.create(index=self.elasticsearch_new_index, body=self.elasticsearch_mapping)
# bulk upload data # bulk upload data
self.elasticsearch_client.bulk(actions) self.elasticsearch_client.bulk(bulk_actions)
# fetch indices that have {elasticsearch_alias} as alias # fetch indices that have {elasticsearch_alias} as alias
elasticsearch_old_indices = self._fetch_old_index() elasticsearch_old_indices = self._fetch_old_index()
......
import importlib
from pyhocon import ConfigTree # noqa: F401
from typing import Optional # noqa: F401
from databuilder.transformer.base_transformer import Transformer
from databuilder.models.neo4j_data import Neo4jDataResult
class ElasticsearchDocumentTransformer(Transformer):
"""
Transformer to convert Neo4j Graph data to Elasticsearch index document format
"""
ELASTICSEARCH_INDEX_CONFIG_KEY = 'index'
ELASTICSEARCH_DOC_CONFIG_KEY = 'doc_type'
ELASTICSEARCH_DOC_MODEL_CLASS_KEY = 'model_class'
def init(self, conf):
# type: (ConfigTree) -> None
self.conf = conf
self.elasticsearch_index = self.conf.get_string(ElasticsearchDocumentTransformer.ELASTICSEARCH_INDEX_CONFIG_KEY)
self.elasticsearch_type = self.conf.get_string(ElasticsearchDocumentTransformer.ELASTICSEARCH_DOC_CONFIG_KEY)
model_class = self.conf.get_string(ElasticsearchDocumentTransformer.ELASTICSEARCH_DOC_MODEL_CLASS_KEY, '')
if not model_class:
raise Exception('User needs to provide the ElasticsearchDocument model class')
module_name, class_name = model_class.rsplit(".", 1)
mod = importlib.import_module(module_name)
self.model_class = getattr(mod, class_name)
def transform(self, record):
# type: (Neo4jDataResult) -> Optional[ElasticsearchDocument]
if not record:
return None
if not isinstance(record, Neo4jDataResult):
raise Exception("ElasticsearchDocumentTransformer expects record of type 'Neo4jDataResult'!")
elasticsearch_obj = self.model_class(elasticsearch_index=self.elasticsearch_index,
elasticsearch_type=self.elasticsearch_type,
**vars(record))
return elasticsearch_obj
def get_scope(self):
# type: () -> str
return 'transformer.elasticsearch'
...@@ -22,7 +22,6 @@ from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher ...@@ -22,7 +22,6 @@ from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher
from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher
from databuilder.task.task import DefaultTask from databuilder.task.task import DefaultTask
from databuilder.transformer.base_transformer import NoopTransformer from databuilder.transformer.base_transformer import NoopTransformer
from databuilder.transformer.elasticsearch_document_transformer import ElasticsearchDocumentTransformer
# change to the address of Elasticsearh service # change to the address of Elasticsearh service
es = Elasticsearch([ es = Elasticsearch([
...@@ -201,7 +200,7 @@ def create_es_publisher_sample_job(): ...@@ -201,7 +200,7 @@ def create_es_publisher_sample_job():
task = DefaultTask(loader=FSElasticsearchJSONLoader(), task = DefaultTask(loader=FSElasticsearchJSONLoader(),
extractor=Neo4jSearchDataExtractor(), extractor=Neo4jSearchDataExtractor(),
transformer=ElasticsearchDocumentTransformer()) transformer=NoopTransformer())
# elastic search client instance # elastic search client instance
elasticsearch_client = es elasticsearch_client = es
...@@ -215,16 +214,12 @@ def create_es_publisher_sample_job(): ...@@ -215,16 +214,12 @@ def create_es_publisher_sample_job():
job_config = ConfigFactory.from_dict({ job_config = ConfigFactory.from_dict({
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.GRAPH_URL_CONFIG_KEY): neo4j_endpoint, 'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.GRAPH_URL_CONFIG_KEY): neo4j_endpoint,
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.MODEL_CLASS_CONFIG_KEY): 'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.MODEL_CLASS_CONFIG_KEY):
'databuilder.models.neo4j_data.Neo4jDataResult', 'databuilder.models.table_elasticsearch_document.TableESDocument',
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_USER): neo4j_user, 'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_USER): neo4j_user,
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_PW): neo4j_password, 'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_PW): neo4j_password,
'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY): 'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY):
extracted_search_data_path, extracted_search_data_path,
'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY): 'w', 'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY): 'w',
'transformer.elasticsearch.{}'.format(ElasticsearchDocumentTransformer.ELASTICSEARCH_INDEX_CONFIG_KEY):
elasticsearch_new_index_key,
'transformer.elasticsearch.{}'.format(ElasticsearchDocumentTransformer.ELASTICSEARCH_DOC_CONFIG_KEY):
elasticsearch_new_index_key_type,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_PATH_CONFIG_KEY): 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_PATH_CONFIG_KEY):
extracted_search_data_path, extracted_search_data_path,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_MODE_CONFIG_KEY): 'r', 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_MODE_CONFIG_KEY): 'r',
...@@ -232,6 +227,8 @@ def create_es_publisher_sample_job(): ...@@ -232,6 +227,8 @@ def create_es_publisher_sample_job():
elasticsearch_client, elasticsearch_client,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY): 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY):
elasticsearch_new_index_key, elasticsearch_new_index_key,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_DOC_TYPE_CONFIG_KEY):
elasticsearch_new_index_key_type,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY): 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY):
elasticsearch_index_alias elasticsearch_index_alias
}) })
......
from setuptools import setup, find_packages from setuptools import setup, find_packages
__version__ = '1.2.9' __version__ = '1.3.0'
setup( setup(
......
...@@ -6,7 +6,7 @@ from typing import Any # noqa: F401 ...@@ -6,7 +6,7 @@ from typing import Any # noqa: F401
from databuilder import Scoped from databuilder import Scoped
from databuilder.extractor.neo4j_extractor import Neo4jExtractor from databuilder.extractor.neo4j_extractor import Neo4jExtractor
from databuilder.models.neo4j_data import Neo4jDataResult from databuilder.models.table_elasticsearch_document import TableESDocument
class TestNeo4jExtractor(unittest.TestCase): class TestNeo4jExtractor(unittest.TestCase):
...@@ -90,7 +90,7 @@ class TestNeo4jExtractor(unittest.TestCase): ...@@ -90,7 +90,7 @@ class TestNeo4jExtractor(unittest.TestCase):
'extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_USER): 'TEST_USER', 'extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_USER): 'TEST_USER',
'extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_PW): 'TEST_PW', 'extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_PW): 'TEST_PW',
'extractor.neo4j.{}'.format(Neo4jExtractor.MODEL_CLASS_CONFIG_KEY): 'extractor.neo4j.{}'.format(Neo4jExtractor.MODEL_CLASS_CONFIG_KEY):
'databuilder.models.neo4j_data.Neo4jDataResult' 'databuilder.models.table_elasticsearch_document.TableESDocument'
} }
self.conf = ConfigFactory.from_dict(config_dict) self.conf = ConfigFactory.from_dict(config_dict)
...@@ -116,5 +116,5 @@ class TestNeo4jExtractor(unittest.TestCase): ...@@ -116,5 +116,5 @@ class TestNeo4jExtractor(unittest.TestCase):
extractor.results = [result_dict] extractor.results = [result_dict]
result_obj = extractor.extract() result_obj = extractor.extract()
self.assertIsInstance(result_obj, Neo4jDataResult) self.assertIsInstance(result_obj, TableESDocument)
self.assertDictEqual(vars(result_obj), result_dict) self.assertDictEqual(vars(result_obj), result_dict)
...@@ -61,9 +61,7 @@ class TestFSElasticsearchJSONLoader(unittest.TestCase): ...@@ -61,9 +61,7 @@ class TestFSElasticsearchJSONLoader(unittest.TestCase):
loader.init(conf=Scoped.get_scoped_conf(conf=self.conf, loader.init(conf=Scoped.get_scoped_conf(conf=self.conf,
scope=loader.get_scope())) scope=loader.get_scope()))
data = dict(elasticsearch_index='test_es_index', data = dict(database='test_database',
elasticsearch_type='test_es_type',
database='test_database',
cluster='test_cluster', cluster='test_cluster',
schema_name='test_schema', schema_name='test_schema',
table_name='test_table', table_name='test_table',
...@@ -91,9 +89,7 @@ class TestFSElasticsearchJSONLoader(unittest.TestCase): ...@@ -91,9 +89,7 @@ class TestFSElasticsearchJSONLoader(unittest.TestCase):
loader.init(conf=Scoped.get_scoped_conf(conf=self.conf, loader.init(conf=Scoped.get_scoped_conf(conf=self.conf,
scope=loader.get_scope())) scope=loader.get_scope()))
data = TableESDocument(elasticsearch_index='test_es_index', data = TableESDocument(database='test_database',
elasticsearch_type='test_es_type',
database='test_database',
cluster='test_cluster', cluster='test_cluster',
schema_name='test_schema', schema_name='test_schema',
table_name='test_table', table_name='test_table',
...@@ -109,7 +105,6 @@ class TestFSElasticsearchJSONLoader(unittest.TestCase): ...@@ -109,7 +105,6 @@ class TestFSElasticsearchJSONLoader(unittest.TestCase):
loader.close() loader.close()
expected = [ expected = [
'{"index": {"_type": "test_es_type", "_index": "test_es_index"}}',
('{"table_key": "test_table_key", "column_descriptions": ["test_comment1", "test_comment2"], ' ('{"table_key": "test_table_key", "column_descriptions": ["test_comment1", "test_comment2"], '
'"schema_name": "test_schema", "database": "test_database", "cluster": "test_cluster", ' '"schema_name": "test_schema", "database": "test_database", "cluster": "test_cluster", '
'"column_names": ["test_col1", "test_col2"], "table_name": "test_table", ' '"column_names": ["test_col1", "test_col2"], "table_name": "test_table", '
...@@ -130,9 +125,7 @@ class TestFSElasticsearchJSONLoader(unittest.TestCase): ...@@ -130,9 +125,7 @@ class TestFSElasticsearchJSONLoader(unittest.TestCase):
loader.init(conf=Scoped.get_scoped_conf(conf=self.conf, loader.init(conf=Scoped.get_scoped_conf(conf=self.conf,
scope=loader.get_scope())) scope=loader.get_scope()))
data = [TableESDocument(elasticsearch_index='test_es_index', data = [TableESDocument(database='test_database',
elasticsearch_type='test_es_type',
database='test_database',
cluster='test_cluster', cluster='test_cluster',
schema_name='test_schema', schema_name='test_schema',
table_name='test_table', table_name='test_table',
...@@ -150,7 +143,6 @@ class TestFSElasticsearchJSONLoader(unittest.TestCase): ...@@ -150,7 +143,6 @@ class TestFSElasticsearchJSONLoader(unittest.TestCase):
loader.close() loader.close()
expected = [ expected = [
'{"index": {"_type": "test_es_type", "_index": "test_es_index"}}',
('{"table_key": "test_table_key", "column_descriptions": ["test_comment1", "test_comment2"], ' ('{"table_key": "test_table_key", "column_descriptions": ["test_comment1", "test_comment2"], '
'"schema_name": "test_schema", "database": "test_database", "cluster": "test_cluster", ' '"schema_name": "test_schema", "database": "test_database", "cluster": "test_cluster", '
'"column_names": ["test_col1", "test_col2"], "table_name": "test_table", ' '"column_names": ["test_col1", "test_col2"], "table_name": "test_table", '
......
...@@ -11,9 +11,7 @@ class TestTableElasticsearchDocument(unittest.TestCase): ...@@ -11,9 +11,7 @@ class TestTableElasticsearchDocument(unittest.TestCase):
""" """
Test string generated from to_json method Test string generated from to_json method
""" """
test_obj = TableESDocument(elasticsearch_index='test_index', test_obj = TableESDocument(database='test_database',
elasticsearch_type='test_type',
database='test_database',
cluster='test_cluster', cluster='test_cluster',
schema_name='test_schema', schema_name='test_schema',
table_name='test_table', table_name='test_table',
...@@ -26,7 +24,6 @@ class TestTableElasticsearchDocument(unittest.TestCase): ...@@ -26,7 +24,6 @@ class TestTableElasticsearchDocument(unittest.TestCase):
unique_usage=10, unique_usage=10,
tag_names=['test']) tag_names=['test'])
expected_index_dict = {"index": {"_type": "test_type", "_index": "test_index"}}
expected_document_dict = {"database": "test_database", expected_document_dict = {"database": "test_database",
"cluster": "test_cluster", "cluster": "test_cluster",
"schema_name": "test_schema", "schema_name": "test_schema",
...@@ -45,7 +42,5 @@ class TestTableElasticsearchDocument(unittest.TestCase): ...@@ -45,7 +42,5 @@ class TestTableElasticsearchDocument(unittest.TestCase):
results = result.split("\n") results = result.split("\n")
# verify two new line characters in result # verify two new line characters in result
self.assertEqual(len(results), 3, "Result from to_json() function doesn't have 2 newlines!") self.assertEqual(len(results), 2, "Result from to_json() function doesn't have a newline!")
self.assertDictEqual(json.loads(results[0]), expected_document_dict)
self.assertDictEqual(json.loads(results[0]), expected_index_dict)
self.assertDictEqual(json.loads(results[1]), expected_document_dict)
...@@ -11,9 +11,7 @@ class TestUserElasticsearchDocument(unittest.TestCase): ...@@ -11,9 +11,7 @@ class TestUserElasticsearchDocument(unittest.TestCase):
""" """
Test string generated from to_json method Test string generated from to_json method
""" """
test_obj = UserESDocument(elasticsearch_index='test_index', test_obj = UserESDocument(email='test@email.com',
elasticsearch_type='test_type',
email='test@email.com',
first_name='test_firstname', first_name='test_firstname',
last_name='test_lastname', last_name='test_lastname',
name='full_name', name='full_name',
...@@ -27,7 +25,6 @@ class TestUserElasticsearchDocument(unittest.TestCase): ...@@ -27,7 +25,6 @@ class TestUserElasticsearchDocument(unittest.TestCase):
total_own=3, total_own=3,
total_follow=1) total_follow=1)
expected_index_dict = {"index": {"_type": "test_type", "_index": "test_index"}}
expected_document_dict = {"first_name": "test_firstname", expected_document_dict = {"first_name": "test_firstname",
"last_name": "test_lastname", "last_name": "test_lastname",
"name": "full_name", "name": "full_name",
...@@ -47,7 +44,6 @@ class TestUserElasticsearchDocument(unittest.TestCase): ...@@ -47,7 +44,6 @@ class TestUserElasticsearchDocument(unittest.TestCase):
results = result.split("\n") results = result.split("\n")
# verify two new line characters in result # verify two new line characters in result
self.assertEqual(len(results), 3, "Result from to_json() function doesn't have 2 newlines!") self.assertEqual(len(results), 2, "Result from to_json() function doesn't have a newline!")
self.assertDictEqual(json.loads(results[0]), expected_index_dict) self.assertDictEqual(json.loads(results[0]), expected_document_dict)
self.assertDictEqual(json.loads(results[1]), expected_document_dict)
...@@ -18,12 +18,14 @@ class TestElasticsearchPublisher(unittest.TestCase): ...@@ -18,12 +18,14 @@ class TestElasticsearchPublisher(unittest.TestCase):
self.mock_es_client = MagicMock() self.mock_es_client = MagicMock()
self.test_es_new_index = 'test_new_index' self.test_es_new_index = 'test_new_index'
self.test_es_alias = 'test_index_alias' self.test_es_alias = 'test_index_alias'
self.test_doc_type = 'test_doc_type'
config_dict = {'publisher.elasticsearch.file_path': self.test_file_path, config_dict = {'publisher.elasticsearch.file_path': self.test_file_path,
'publisher.elasticsearch.mode': self.test_file_mode, 'publisher.elasticsearch.mode': self.test_file_mode,
'publisher.elasticsearch.client': self.mock_es_client, 'publisher.elasticsearch.client': self.mock_es_client,
'publisher.elasticsearch.new_index': self.test_es_new_index, 'publisher.elasticsearch.new_index': self.test_es_new_index,
'publisher.elasticsearch.alias': self.test_es_alias} 'publisher.elasticsearch.alias': self.test_es_alias,
'publisher.elasticsearch.doc_type': self.test_doc_type}
self.conf = ConfigFactory.from_dict(config_dict) self.conf = ConfigFactory.from_dict(config_dict)
...@@ -69,7 +71,8 @@ class TestElasticsearchPublisher(unittest.TestCase): ...@@ -69,7 +71,8 @@ class TestElasticsearchPublisher(unittest.TestCase):
# bulk endpoint called once # bulk endpoint called once
self.mock_es_client.bulk.assert_called_once_with( self.mock_es_client.bulk.assert_called_once_with(
[{'KEY_DOESNOT_MATTER': 'NO_VALUE', 'KEY_DOESNOT_MATTER2': 'NO_VALUE2'}] [{'index': {'_type': self.test_doc_type, '_index': self.test_es_new_index}},
{'KEY_DOESNOT_MATTER': 'NO_VALUE', 'KEY_DOESNOT_MATTER2': 'NO_VALUE2'}]
) )
# update alias endpoint called once # update alias endpoint called once
...@@ -102,7 +105,8 @@ class TestElasticsearchPublisher(unittest.TestCase): ...@@ -102,7 +105,8 @@ class TestElasticsearchPublisher(unittest.TestCase):
# bulk endpoint called once # bulk endpoint called once
self.mock_es_client.bulk.assert_called_once_with( self.mock_es_client.bulk.assert_called_once_with(
[{'KEY_DOESNOT_MATTER': 'NO_VALUE', 'KEY_DOESNOT_MATTER2': 'NO_VALUE2'}] [{'index': {'_type': self.test_doc_type, '_index': self.test_es_new_index}},
{'KEY_DOESNOT_MATTER': 'NO_VALUE', 'KEY_DOESNOT_MATTER2': 'NO_VALUE2'}]
) )
# update alias endpoint called once # update alias endpoint called once
......
import unittest
from pyhocon import ConfigFactory # noqa: F401
from databuilder import Scoped
from databuilder.transformer.elasticsearch_document_transformer import ElasticsearchDocumentTransformer
from databuilder.models.elasticsearch_document import ElasticsearchDocument
from databuilder.models.table_elasticsearch_document import TableESDocument
from databuilder.models.neo4j_data import Neo4jDataResult
class TestElasticsearchDocumentTransformer(unittest.TestCase):
def setUp(self):
# type: () -> None
self.elasticsearch_index = 'test_es_index'
self.elasticsearch_type = 'test_es_type'
config_dict = {'transformer.elasticsearch.index': self.elasticsearch_index,
'transformer.elasticsearch.doc_type': self.elasticsearch_type,
'transformer.elasticsearch.model_class':
'databuilder.models.table_elasticsearch_document.TableESDocument'}
self.conf = ConfigFactory.from_dict(config_dict)
def test_empty_transform(self):
# type: () -> None
"""
Test Transform functionality with no data
"""
transformer = ElasticsearchDocumentTransformer()
transformer.init(conf=Scoped.get_scoped_conf(conf=self.conf,
scope=transformer.get_scope()))
result = transformer.transform(None) # type: ignore
self.assertIsNone(result)
def test_transform_with_dict_object(self):
# type: () -> None
"""
Test Transform functionality with Dict object
"""
transformer = ElasticsearchDocumentTransformer()
transformer.init(conf=Scoped.get_scoped_conf(conf=self.conf,
scope=transformer.get_scope()))
data = dict(test_key="DOES_NOT_MATTER",
test_key2="DOES_NOT_MATTER2")
with self.assertRaises(Exception) as context:
transformer.transform(data) # type: ignore
self.assertTrue("ElasticsearchDocumentTransformer expects record of type 'Neo4jDataResult'!"
in context.exception)
def test_transform_success_case(self):
# type: () -> None
"""
Test transform function with Neo4jDataResult Object
"""
transformer = ElasticsearchDocumentTransformer()
transformer.init(conf=Scoped.get_scoped_conf(conf=self.conf,
scope=transformer.get_scope()))
data = Neo4jDataResult(database="test_database",
cluster="test_cluster",
schema_name="test_schema_name",
table_name="test_table_name",
table_key="test_table_key",
table_last_updated_epoch=123456789,
table_description="test_table_description",
column_names=["test_col1", "test_col2"],
column_descriptions=["test_col_description1", "test_col_description2"],
total_usage=10,
unique_usage=5,
tag_names=["test_tag1", "test_tag2"])
result = transformer.transform(data)
expected = TableESDocument(elasticsearch_index='test_es_index',
elasticsearch_type='test_es_type',
database="test_database",
cluster="test_cluster",
schema_name="test_schema_name",
table_name="test_table_name",
table_key="test_table_key",
table_last_updated_epoch=123456789,
table_description="test_table_description",
column_names=["test_col1", "test_col2"],
column_descriptions=["test_col_description1",
"test_col_description2"],
total_usage=10,
unique_usage=5,
tag_names=["test_tag1", "test_tag2"])
self.assertIsInstance(result, ElasticsearchDocument)
self.assertDictEqual(vars(result), vars(expected))
def test_transform_without_model_class_conf(self):
# type: () -> None
"""
Test model_class conf is required
"""
config_dict = {'transformer.elasticsearch.index': self.elasticsearch_index,
'transformer.elasticsearch.doc_type': self.elasticsearch_type}
transformer = ElasticsearchDocumentTransformer()
with self.assertRaises(Exception) as context:
transformer.init(conf=Scoped.get_scoped_conf(conf=ConfigFactory.from_dict(config_dict),
scope=transformer.get_scope()))
self.assertTrue("User needs to provide the ElasticsearchDocument model class"
in context.exception)
def test_transform_with_invalid_model_class_conf(self):
# type: () -> None
"""
Test non existing model_class conf will throw error
"""
config_dict = {'transformer.elasticsearch.index': self.elasticsearch_index,
'transformer.elasticsearch.doc_type': self.elasticsearch_type,
'transformer.elasticsearch.model_class':
'databuilder.models.table_elasticsearch_document.NonExistingESDocument'}
transformer = ElasticsearchDocumentTransformer()
with self.assertRaises(Exception) as context:
transformer.init(conf=Scoped.get_scoped_conf(conf=ConfigFactory.from_dict(config_dict),
scope=transformer.get_scope()))
self.assertTrue("'module' object has no attribute 'NonExistingESDocument'"
in context.exception)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment