Commit 822bd16b authored by jornh's avatar jornh Committed by Tao Feng

Add neo4j_es_last_updated to sample_dataloader (#86)

parent 0487bb13
"""
This is a example script which demo how to load data into neo4j without using Airflow DAG.
This is a example script which demo how to load data
into Neo4j and Elasticsearch without using an Airflow DAG.
"""
import csv
......@@ -11,6 +12,7 @@ from sqlalchemy.ext.declarative import declarative_base
import textwrap
import uuid
from databuilder.extractor.neo4j_es_last_updated_extractor import Neo4jEsLastUpdatedExtractor
from databuilder.extractor.neo4j_search_data_extractor import Neo4jSearchDataExtractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.job.job import DefaultJob
......@@ -23,7 +25,7 @@ from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher
from databuilder.task.task import DefaultTask
from databuilder.transformer.base_transformer import NoopTransformer
# change to the address of Elasticsearh service
# change to the address of Elasticsearch service
es = Elasticsearch([
{'host': 'localhost'},
])
......@@ -193,9 +195,46 @@ def create_sample_job(table_name, model_name):
return job
def create_es_publisher_sample_job():
def create_last_updated_job():
# loader saves data to these folders and publisher reads it from here
tmp_folder = '/var/tmp/amundsen/last_updated_data'
node_files_folder = '{tmp_folder}/nodes'.format(tmp_folder=tmp_folder)
relationship_files_folder = '{tmp_folder}/relationships'.format(tmp_folder=tmp_folder)
task = DefaultTask(extractor=Neo4jEsLastUpdatedExtractor(),
loader=FsNeo4jCSVLoader())
job_config = ConfigFactory.from_dict({
'extractor.neo4j_es_last_updated.model_class':
'databuilder.models.neo4j_es_last_updated.Neo4jESLastUpdated',
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.NODE_DIR_PATH):
node_files_folder,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.RELATION_DIR_PATH):
relationship_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NODE_FILES_DIR):
node_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.RELATION_FILES_DIR):
relationship_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_END_POINT_KEY):
neo4j_endpoint,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_USER):
neo4j_user,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_PASSWORD):
neo4j_password,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.JOB_PUBLISH_TAG):
'unique_lastupdated_tag', # should use unique tag here like {ds}
})
# loader save data to this location and publisher read if from here
job = DefaultJob(conf=job_config,
task=task,
publisher=Neo4jCsvPublisher())
return job
def create_es_publisher_sample_job():
# loader saves data to this location and publisher reads it from here
extracted_search_data_path = '/var/tmp/amundsen/search_data.json'
task = DefaultTask(loader=FSElasticsearchJSONLoader(),
......@@ -259,6 +298,10 @@ if __name__ == "__main__":
'databuilder.models.user.User')
job_user.launch()
# start last updated job
job_lastupdated = create_last_updated_job()
job_lastupdated.launch()
# start Elasticsearch publish job
job_es = create_es_publisher_sample_job()
job_es.launch()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment