Commit 2fc8bc6c authored by jornh's avatar jornh Committed by Tao Feng

feat(users): add a `user_search_index` Neo4j to ES quickstart sample sync job (#130)

* feat(user): add Neo4j to ES quickstart sample job

* lint

* Fix (user): Whitespace glitch giving quotes in UI

* Correct ES mapping, thanks Tao!

* lint

* Trailing Linefeed
parent b16d4a24
email,first_name,last_name,name,github_username,team_name,employee_type,manager_email,slack_id email,first_name,last_name,name,github_username,team_name,employee_type,manager_email,slack_id
roald.amundsen@example.org,Roald,Amundsen,"Roald Amundsen",lyft,"Team Amundsen",sailor,"phboss@example.org",ramundzn roald.amundsen@example.org,Roald,Amundsen,"Roald Amundsen",lyft,"Team Amundsen",sailor,"phboss@example.org",ramundzn
chrisc@example.org,Christopher,Columbus,"Christopher Columbus",ChristopherColumbusFAKE,"Team Amundsen",sailor,"phboss@example.org",chrisc chrisc@example.org,Christopher,Columbus,"Christopher Columbus",ChristopherColumbusFAKE,"Team Amundsen",sailor,"phboss@example.org",chrisc
buzz@example.org, Buzz, Aldrin, "Buzz Aldrin",BuzzAldrinFAKE, "Team Amundsen",astronaut,"phboss@example.org",buzz buzz@example.org, Buzz, Aldrin,"Buzz Aldrin",BuzzAldrinFAKE,"Team Amundsen",astronaut,"phboss@example.org",buzz
\ No newline at end of file
...@@ -288,7 +288,22 @@ def create_last_updated_job(): ...@@ -288,7 +288,22 @@ def create_last_updated_job():
return job return job
def create_es_publisher_sample_job(): def create_es_publisher_sample_job(elasticsearch_index_alias='table_search_index',
elasticsearch_doc_type_key='table',
model_name='databuilder.models.table_elasticsearch_document.TableESDocument',
cypher_query=None,
elasticsearch_mapping=None):
"""
:param elasticsearch_index_alias: alias for Elasticsearch used in
amundsensearchlibrary/search_service/config.py as an index
:param elasticsearch_doc_type_key: name the ElasticSearch index is prepended with. Defaults to `table` resulting in
`table_search_index`
:param model_name: the Databuilder model class used in transporting between Extractor and Loader
:param cypher_query: Query handed to the `Neo4jSearchDataExtractor` class, if None is given (default)
it uses the `Table` query baked into the Extractor
:param elasticsearch_mapping: Elasticsearch field mapping "DDL" handed to the `ElasticsearchPublisher` class,
if None is given (default) it uses the `Table` query baked into the Publisher
"""
# loader saves data to this location and publisher reads it from here # loader saves data to this location and publisher reads it from here
extracted_search_data_path = '/var/tmp/amundsen/search_data.json' extracted_search_data_path = '/var/tmp/amundsen/search_data.json'
...@@ -300,15 +315,10 @@ def create_es_publisher_sample_job(): ...@@ -300,15 +315,10 @@ def create_es_publisher_sample_job():
elasticsearch_client = es elasticsearch_client = es
# unique name of new index in Elasticsearch # unique name of new index in Elasticsearch
elasticsearch_new_index_key = 'tables' + str(uuid.uuid4()) elasticsearch_new_index_key = 'tables' + str(uuid.uuid4())
# related to mapping type from /databuilder/publisher/elasticsearch_publisher.py#L38
elasticsearch_new_index_key_type = 'table'
# alias for Elasticsearch used in amundsensearchlibrary/search_service/config.py as an index
elasticsearch_index_alias = 'table_search_index'
job_config = ConfigFactory.from_dict({ job_config = ConfigFactory.from_dict({
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.GRAPH_URL_CONFIG_KEY): neo4j_endpoint, 'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.GRAPH_URL_CONFIG_KEY): neo4j_endpoint,
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.MODEL_CLASS_CONFIG_KEY): 'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.MODEL_CLASS_CONFIG_KEY): model_name,
'databuilder.models.table_elasticsearch_document.TableESDocument',
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_USER): neo4j_user, 'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_USER): neo4j_user,
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_PW): neo4j_password, 'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_PW): neo4j_password,
'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY): 'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY):
...@@ -322,11 +332,19 @@ def create_es_publisher_sample_job(): ...@@ -322,11 +332,19 @@ def create_es_publisher_sample_job():
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY): 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY):
elasticsearch_new_index_key, elasticsearch_new_index_key,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_DOC_TYPE_CONFIG_KEY): 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_DOC_TYPE_CONFIG_KEY):
elasticsearch_new_index_key_type, elasticsearch_doc_type_key,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY): 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY):
elasticsearch_index_alias elasticsearch_index_alias,
}) })
# only optionally add these keys, so need to dynamically `put` them
if cypher_query:
job_config.put('extractor.search_data.{}'.format(Neo4jSearchDataExtractor.CYPHER_QUERY_CONFIG_KEY),
cypher_query)
if elasticsearch_mapping:
job_config.put('publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_MAPPING_CONFIG_KEY),
elasticsearch_mapping)
job = DefaultJob(conf=job_config, job = DefaultJob(conf=job_config,
task=task, task=task,
publisher=ElasticsearchPublisher()) publisher=ElasticsearchPublisher())
...@@ -342,6 +360,7 @@ if __name__ == "__main__": ...@@ -342,6 +360,7 @@ if __name__ == "__main__":
load_usage_data_from_csv('sample_column_usage.csv') load_usage_data_from_csv('sample_column_usage.csv')
load_user_data_from_csv('sample_user.csv') load_user_data_from_csv('sample_user.csv')
load_application_data_from_csv('sample_application.csv') load_application_data_from_csv('sample_application.csv')
if create_connection(DB_FILE): if create_connection(DB_FILE):
# start table job # start table job
job1 = create_sample_job('test_table_metadata', job1 = create_sample_job('test_table_metadata',
...@@ -372,6 +391,93 @@ if __name__ == "__main__": ...@@ -372,6 +391,93 @@ if __name__ == "__main__":
job_lastupdated = create_last_updated_job() job_lastupdated = create_last_updated_job()
job_lastupdated.launch() job_lastupdated.launch()
# start Elasticsearch publish job # start Elasticsearch publish jobs
job_es = create_es_publisher_sample_job() job_es_table = create_es_publisher_sample_job(
job_es.launch() elasticsearch_index_alias='table_search_index',
elasticsearch_doc_type_key='table',
model_name='databuilder.models.table_elasticsearch_document.TableESDocument')
job_es_table.launch()
user_cypher_query = textwrap.dedent(
"""
MATCH (user:User)
OPTIONAL MATCH (user)-[read:READ]->(a)
OPTIONAL MATCH (user)-[own:OWNER_OF]->(b)
OPTIONAL MATCH (user)-[follow:FOLLOWED_BY]->(c)
OPTIONAL MATCH (user)-[manage_by:MANAGE_BY]->(manager)
with user, a, b, c, read, own, follow, manager
where user.full_name is not null
return user.email as email, user.first_name as first_name, user.last_name as last_name,
user.full_name as name, user.github_username as github_username, user.team_name as team_name,
user.employee_type as employee_type, manager.email as manager_email, user.slack_id as slack_id,
user.is_active as is_active,
REDUCE(sum_r = 0, r in COLLECT(DISTINCT read)| sum_r + r.read_count) AS total_read,
count(distinct b) as total_own,
count(distinct c) AS total_follow
order by user.email
"""
)
user_elasticsearch_mapping = """
{
"mappings":{
"user":{
"properties": {
"email": {
"type":"text",
"analyzer": "simple",
"fields": {
"raw": {
"type": "keyword"
}
}
},
"first_name": {
"type":"text",
"analyzer": "simple",
"fields": {
"raw": {
"type": "keyword"
}
}
},
"last_name": {
"type":"text",
"analyzer": "simple",
"fields": {
"raw": {
"type": "keyword"
}
}
},
"name": {
"type":"text",
"analyzer": "simple",
"fields": {
"raw": {
"type": "keyword"
}
}
},
"total_read":{
"type": "long"
},
"total_own": {
"type": "long"
},
"total_follow": {
"type": "long"
}
}
}
}
}
"""
job_es_user = create_es_publisher_sample_job(
elasticsearch_index_alias='user_search_index',
elasticsearch_doc_type_key='user',
model_name='databuilder.models.user_elasticsearch_document.UserESDocument',
cypher_query=user_cypher_query,
elasticsearch_mapping=user_elasticsearch_mapping)
job_es_user.launch()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment