Commit c7d6da1f authored by Jakub Hettler's avatar Jakub Hettler Committed by Tao Feng

Add Elasticseach sample publisher (#23)

parent 1a2b5859
...@@ -3,20 +3,31 @@ This is a example script which demo how to load data into neo4j without using Ai ...@@ -3,20 +3,31 @@ This is a example script which demo how to load data into neo4j without using Ai
""" """
import csv import csv
from elasticsearch import Elasticsearch
import logging import logging
from pyhocon import ConfigFactory from pyhocon import ConfigFactory
import sqlite3 import sqlite3
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.ext.declarative import declarative_base
import textwrap import textwrap
import uuid
from databuilder.extractor.neo4j_search_data_extractor import Neo4jSearchDataExtractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.job.job import DefaultJob from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader
from databuilder.loader.file_system_elasticsearch_json_loader import FSElasticsearchJSONLoader
from databuilder.publisher import neo4j_csv_publisher from databuilder.publisher import neo4j_csv_publisher
from databuilder.extractor.neo4j_extractor import Neo4jExtractor
from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher
from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher
from databuilder.task.task import DefaultTask from databuilder.task.task import DefaultTask
from databuilder.transformer.base_transformer import NoopTransformer from databuilder.transformer.base_transformer import NoopTransformer
from databuilder.transformer.elasticsearch_document_transformer import ElasticsearchDocumentTransformer
# change to the address of Elasticsearh service
es = Elasticsearch([
{'host': '0.0.0.0'},
])
DB_FILE = '/tmp/test.db' DB_FILE = '/tmp/test.db'
SQLITE_CONN_STRING = 'sqlite:////tmp/test.db' SQLITE_CONN_STRING = 'sqlite:////tmp/test.db'
...@@ -147,6 +158,54 @@ def create_sample_job(table_name, model_name): ...@@ -147,6 +158,54 @@ def create_sample_job(table_name, model_name):
return job return job
def create_es_publisher_sample_job():
# loader save data to this location and publisher read if from here
extracted_search_data_path = '/var/tmp/amundsen/search_data.json'
task = DefaultTask(loader=FSElasticsearchJSONLoader(),
extractor=Neo4jSearchDataExtractor(),
transformer=ElasticsearchDocumentTransformer())
# elastic search client instance
elasticsearch_client = es
# unique name of new index in Elasticsearch
elasticsearch_new_index_key = 'tables' + str(uuid.uuid4())
# related to mapping type from /databuilder/publisher/elasticsearch_publisher.py#L38
elasticsearch_new_index_key_type = 'table'
# alias for Elasticsearch used in amundsensearchlibrary/search_service/config.py as an index
elasticsearch_index_alias = 'tables_alias'
job_config = ConfigFactory.from_dict({
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.GRAPH_URL_CONFIG_KEY): neo4j_endpoint,
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.MODEL_CLASS_CONFIG_KEY):
'databuilder.models.neo4j_data.Neo4jDataResult',
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_USER): neo4j_user,
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_PW): neo4j_password,
'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY):
extracted_search_data_path,
'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY): 'w',
'transformer.elasticsearch.{}'.format(ElasticsearchDocumentTransformer.ELASTICSEARCH_INDEX_CONFIG_KEY):
elasticsearch_new_index_key,
'transformer.elasticsearch.{}'.format(ElasticsearchDocumentTransformer.ELASTICSEARCH_DOC_CONFIG_KEY):
elasticsearch_new_index_key_type,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_PATH_CONFIG_KEY):
extracted_search_data_path,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_MODE_CONFIG_KEY): 'r',
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY):
elasticsearch_client,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY):
elasticsearch_new_index_key,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY):
elasticsearch_index_alias
})
job = DefaultJob(conf=job_config,
task=task,
publisher=ElasticsearchPublisher())
return job
if __name__ == "__main__": if __name__ == "__main__":
load_table_data_from_csv('sample_table.csv') load_table_data_from_csv('sample_table.csv')
load_col_data_from_csv('sample_col.csv') load_col_data_from_csv('sample_col.csv')
...@@ -160,3 +219,7 @@ if __name__ == "__main__": ...@@ -160,3 +219,7 @@ if __name__ == "__main__":
job2 = create_sample_job('test_col_metadata', job2 = create_sample_job('test_col_metadata',
'example.models.test_column_model.TestColumnMetadata') 'example.models.test_column_model.TestColumnMetadata')
job2.launch() job2.launch()
# start Elasticsearch publish job
job3 = create_es_publisher_sample_job()
job3.launch()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment