Unverified Commit eb3e4d37 authored by Jin Hyuk Chang's avatar Jin Hyuk Chang Committed by GitHub

other: Add Dashboard sample data (#292)

* Add Dashboard sample data

* Update
parent b4c24ef8
...@@ -649,6 +649,9 @@ Transforms string timestamp into int epoch ...@@ -649,6 +649,9 @@ Transforms string timestamp into int epoch
#### [RemoveFieldTransformer](./databuilder/transformer/remove_field_transformer.py) #### [RemoveFieldTransformer](./databuilder/transformer/remove_field_transformer.py)
Remove fields from the Dict. Remove fields from the Dict.
#### [GenericTransformer](./databuilder/transformer/generic_transformer.py)
Transforms dictionary based on callback function that user provides.
## List of loader ## List of loader
#### [FsNeo4jCSVLoader](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/loader/file_system_neo4j_csv_loader.py "FsNeo4jCSVLoader") #### [FsNeo4jCSVLoader](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/loader/file_system_neo4j_csv_loader.py "FsNeo4jCSVLoader")
......
import logging
from pyhocon import ConfigTree # noqa: F401
from typing import Any, Dict # noqa: F401
from databuilder.transformer.base_transformer import Transformer
CALLBACK_FUNCTION = 'callback_function'
FIELD_NAME = 'field_name'
LOGGER = logging.getLogger(__name__)
class GenericTransformer(Transformer):
"""
A generic transformer that accepts a callback function that transforms the record on specified field.
"""
def init(self, conf):
# type: (ConfigTree) -> None
self._callback_function = conf.get(CALLBACK_FUNCTION)
self._field_name = conf.get_string(FIELD_NAME)
def transform(self, record):
# type: (Dict[str, Any]) -> Dict[str, Any]
for k, v in record.items():
if k == self._field_name:
new_val = self._callback_function(v)
record[k] = new_val
return record
def get_scope(self):
# type: () -> str
return 'transformer.generic'
product,cluster,dashboard_group,dashboard_group_id,dashboard_group_description,dashboard_group_url,dashboard_name,dashboard_id,description,created_timestamp,dashboard_url
mode,gold,test group1,test_group_id_1,test group description 1,http://mode.test_group_id_1.com,test dashboard,test_dashboard_id_1,test dashboard description,1592333799,http://mode.test_group_id_1.com/test_dashboard_id_1
mode,gold,test group1,test_group_id_1,test group description 1_2,http://mode.test_group_id_1.com,test dashboard,test_dashboard_id_1_2,test dashboard description 1_2,1592332799,http://mode.test_group_id_1.com/test_dashboard_id_1_2
mode,gold,test group2,test_group_id_2,test group description 2,http://mode.test_group_id_2.com,test dashboard,test_dashboard_id_2,test dashboard description,1592133799,http://mode.test_group_id_2.com/test_dashboard_id_2
superset,gold,test group3,test_group_id_3,test group description 1,http://mode.test_group_id_3.com,test dashboard,test_dashboard_id_3,test dashboard description,1591333799,http://mode.test_group_id_3.com/test_dashboard_id_3
product,cluster,dashboard_group_id,dashboard_id,execution_id,execution_timestamp,execution_state
mode,gold,test_group_id_1,test_dashboard_id_1,_last_successful_execution,1592351193,success
mode,gold,test_group_id_2,test_dashboard_id_2,_last_successful_execution,1592351210,success
mode,gold,test_group_id_1,test_dashboard_id_1,_last_execution,1593351193,fail
mode,gold,test_group_id_2,test_dashboard_id_2,_last_execution,1594351210,success
\ No newline at end of file
product,cluster,dashboard_group_id,dashboard_id,last_modified_timestamp
mode,gold,test_group_id_1,test_dashboard_id_1,1592351454
mode,gold,test_group_id_2,test_dashboard_id_2,1592311423
\ No newline at end of file
product,cluster,dashboard_group_id,dashboard_id,email
mode,gold,test_group_id_1,test_dashboard_id_1,roald.amundsen@example.org
mode,gold,test_group_id_2,test_dashboard_id_2,buzz@example.org
\ No newline at end of file
product,cluster,dashboard_group_id,dashboard_id,query_name,query_id,url,query_text
mode,gold,test_group_id_1,test_dashboard_id_1,first query,query_1,http://mode.test_group_id_1.com/test_dashboard_id_1/query/query_1,SELECT * FROM foo.bar
mode,gold,test_group_id_2,test_dashboard_id_2,second query,query_2,http://mode.test_group_id_2.com/test_dashboard_id_2/query/query_2,SELECT * FROM bar.foo JOIN foo.bar USING (baz)
\ No newline at end of file
product,cluster,dashboard_group_id,dashboard_id,table_ids
mode,gold,test_group_id_1,test_dashboard_id_1,"hive://gold.test_schema/test_table1"
mode,gold,test_group_id_2,test_dashboard_id_2,"hive://gold.test_schema/test_view1,hive://gold.test_schema/test_table3"
product,cluster,dashboard_group_id,dashboard_id,view_count,email
mode,gold,test_group_id_1,test_dashboard_id_1,100,roald.amundsen@example.org
mode,gold,test_group_id_2,test_dashboard_id_2,2000,chrisc@example.org
\ No newline at end of file
...@@ -22,6 +22,7 @@ import os ...@@ -22,6 +22,7 @@ import os
import sqlite3 import sqlite3
import sys import sys
import uuid import uuid
from elasticsearch import Elasticsearch from elasticsearch import Elasticsearch
from pyhocon import ConfigFactory from pyhocon import ConfigFactory
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.ext.declarative import declarative_base
...@@ -32,10 +33,15 @@ from databuilder.extractor.neo4j_search_data_extractor import Neo4jSearchDataExt ...@@ -32,10 +33,15 @@ from databuilder.extractor.neo4j_search_data_extractor import Neo4jSearchDataExt
from databuilder.job.job import DefaultJob from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_elasticsearch_json_loader import FSElasticsearchJSONLoader from databuilder.loader.file_system_elasticsearch_json_loader import FSElasticsearchJSONLoader
from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader
from databuilder.publisher.elasticsearch_constants import DASHBOARD_ELASTICSEARCH_INDEX_MAPPING, \
USER_ELASTICSEARCH_INDEX_MAPPING
from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher
from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher
from databuilder.task.task import DefaultTask from databuilder.task.task import DefaultTask
from databuilder.transformer.base_transformer import ChainedTransformer
from databuilder.transformer.base_transformer import NoopTransformer from databuilder.transformer.base_transformer import NoopTransformer
from databuilder.transformer.dict_to_model import DictToModel, MODEL_CLASS
from databuilder.transformer.generic_transformer import GenericTransformer, CALLBACK_FUNCTION, FIELD_NAME
es_host = os.getenv('CREDENTIALS_ELASTICSEARCH_PROXY_HOST', 'localhost') es_host = os.getenv('CREDENTIALS_ELASTICSEARCH_PROXY_HOST', 'localhost')
neo_host = os.getenv('CREDENTIALS_NEO4J_PROXY_HOST', 'localhost') neo_host = os.getenv('CREDENTIALS_NEO4J_PROXY_HOST', 'localhost')
...@@ -62,18 +68,20 @@ neo4j_endpoint = NEO4J_ENDPOINT ...@@ -62,18 +68,20 @@ neo4j_endpoint = NEO4J_ENDPOINT
neo4j_user = 'neo4j' neo4j_user = 'neo4j'
neo4j_password = 'test' neo4j_password = 'test'
LOGGER = logging.getLogger(__name__)
def create_connection(db_file): def create_connection(db_file):
try: try:
conn = sqlite3.connect(db_file) conn = sqlite3.connect(db_file)
return conn return conn
except Exception: except Exception:
logging.exception('exception') LOGGER.exception('exception')
return None return None
def run_csv_job(file_loc, table_name, model): def run_csv_job(file_loc, job_name, model):
tmp_folder = '/var/tmp/amundsen/{table_name}'.format(table_name=table_name) tmp_folder = '/var/tmp/amundsen/{job_name}'.format(job_name=job_name)
node_files_folder = '{tmp_folder}/nodes'.format(tmp_folder=tmp_folder) node_files_folder = '{tmp_folder}/nodes'.format(tmp_folder=tmp_folder)
relationship_files_folder = '{tmp_folder}/relationships'.format(tmp_folder=tmp_folder) relationship_files_folder = '{tmp_folder}/relationships'.format(tmp_folder=tmp_folder)
...@@ -162,6 +170,52 @@ def create_last_updated_job(): ...@@ -162,6 +170,52 @@ def create_last_updated_job():
publisher=Neo4jCsvPublisher()) publisher=Neo4jCsvPublisher())
def _str_to_list(str_val):
return str_val.split(',')
def create_dashboard_tables_job():
# loader saves data to these folders and publisher reads it from here
tmp_folder = '/var/tmp/amundsen/dashboard_table'
node_files_folder = '{tmp_folder}/nodes'.format(tmp_folder=tmp_folder)
relationship_files_folder = '{tmp_folder}/relationships'.format(tmp_folder=tmp_folder)
csv_extractor = CsvExtractor()
csv_loader = FsNeo4jCSVLoader()
generic_transformer = GenericTransformer()
dict_to_model_transformer = DictToModel()
transformer = ChainedTransformer(transformers=[generic_transformer, dict_to_model_transformer],
is_init_transformers=True)
task = DefaultTask(extractor=csv_extractor,
loader=csv_loader,
transformer=transformer)
publisher = Neo4jCsvPublisher()
job_config = ConfigFactory.from_dict({
'{}.file_location'.format(csv_extractor.get_scope()): 'example/sample_data/sample_dashboard_table.csv',
'{}.{}.{}'.format(transformer.get_scope(), generic_transformer.get_scope(), FIELD_NAME): 'table_ids',
'{}.{}.{}'.format(transformer.get_scope(), generic_transformer.get_scope(), CALLBACK_FUNCTION): _str_to_list,
'{}.{}.{}'.format(transformer.get_scope(), dict_to_model_transformer.get_scope(), MODEL_CLASS):
'databuilder.models.dashboard.dashboard_table.DashboardTable',
'{}.node_dir_path'.format(csv_loader.get_scope()): node_files_folder,
'{}.relationship_dir_path'.format(csv_loader.get_scope()): relationship_files_folder,
'{}.delete_created_directories'.format(csv_loader.get_scope()): True,
'{}.node_files_directory'.format(publisher.get_scope()): node_files_folder,
'{}.relation_files_directory'.format(publisher.get_scope()): relationship_files_folder,
'{}.neo4j_endpoint'.format(publisher.get_scope()): neo4j_endpoint,
'{}.neo4j_user'.format(publisher.get_scope()): neo4j_user,
'{}.neo4j_password'.format(publisher.get_scope()): neo4j_password,
'{}.neo4j_encrypted'.format(publisher.get_scope()): False,
'{}.job_publish_tag'.format(publisher.get_scope()): 'unique_tag', # should use unique tag here like {ds}
})
return DefaultJob(conf=job_config,
task=task,
publisher=publisher)
def create_es_publisher_sample_job(elasticsearch_index_alias='table_search_index', def create_es_publisher_sample_job(elasticsearch_index_alias='table_search_index',
elasticsearch_doc_type_key='table', elasticsearch_doc_type_key='table',
model_name='databuilder.models.table_elasticsearch_document.TableESDocument', model_name='databuilder.models.table_elasticsearch_document.TableESDocument',
...@@ -171,7 +225,7 @@ def create_es_publisher_sample_job(elasticsearch_index_alias='table_search_index ...@@ -171,7 +225,7 @@ def create_es_publisher_sample_job(elasticsearch_index_alias='table_search_index
:param elasticsearch_index_alias: alias for Elasticsearch used in :param elasticsearch_index_alias: alias for Elasticsearch used in
amundsensearchlibrary/search_service/config.py as an index amundsensearchlibrary/search_service/config.py as an index
:param elasticsearch_doc_type_key: name the ElasticSearch index is prepended with. Defaults to `table` resulting in :param elasticsearch_doc_type_key: name the ElasticSearch index is prepended with. Defaults to `table` resulting in
`table_search_index` `table_{uuid}`
:param model_name: the Databuilder model class used in transporting between Extractor and Loader :param model_name: the Databuilder model class used in transporting between Extractor and Loader
:param entity_type: Entity type handed to the `Neo4jSearchDataExtractor` class, used to determine :param entity_type: Entity type handed to the `Neo4jSearchDataExtractor` class, used to determine
Cypher query to extract data from Neo4j. Defaults to `table`. Cypher query to extract data from Neo4j. Defaults to `table`.
...@@ -188,7 +242,7 @@ def create_es_publisher_sample_job(elasticsearch_index_alias='table_search_index ...@@ -188,7 +242,7 @@ def create_es_publisher_sample_job(elasticsearch_index_alias='table_search_index
# elastic search client instance # elastic search client instance
elasticsearch_client = es elasticsearch_client = es
# unique name of new index in Elasticsearch # unique name of new index in Elasticsearch
elasticsearch_new_index_key = 'tables' + str(uuid.uuid4()) elasticsearch_new_index_key = '{}_'.format(elasticsearch_doc_type_key) + str(uuid.uuid4())
job_config = ConfigFactory.from_dict({ job_config = ConfigFactory.from_dict({
'extractor.search_data.entity_type': entity_type, 'extractor.search_data.entity_type': entity_type,
...@@ -246,6 +300,20 @@ if __name__ == "__main__": ...@@ -246,6 +300,20 @@ if __name__ == "__main__":
'databuilder.models.table_last_updated.TableLastUpdated') 'databuilder.models.table_last_updated.TableLastUpdated')
run_csv_job('example/sample_data/sample_schema_description.csv', 'test_schema_description', run_csv_job('example/sample_data/sample_schema_description.csv', 'test_schema_description',
'databuilder.models.schema.schema.SchemaModel') 'databuilder.models.schema.schema.SchemaModel')
run_csv_job('example/sample_data/sample_dashboard_base.csv', 'test_dashboard_base',
'databuilder.models.dashboard.dashboard_metadata.DashboardMetadata')
run_csv_job('example/sample_data/sample_dashboard_usage.csv', 'test_dashboard_usage',
'databuilder.models.dashboard.dashboard_usage.DashboardUsage')
run_csv_job('example/sample_data/sample_dashboard_owner.csv', 'test_dashboard_owner',
'databuilder.models.dashboard.dashboard_owner.DashboardOwner')
run_csv_job('example/sample_data/sample_dashboard_query.csv', 'test_dashboard_query',
'databuilder.models.dashboard.dashboard_query.DashboardQuery')
run_csv_job('example/sample_data/sample_dashboard_last_execution.csv', 'test_dashboard_last_execution',
'databuilder.models.dashboard.dashboard_execution.DashboardExecution')
run_csv_job('example/sample_data/sample_dashboard_last_modified.csv', 'test_dashboard_last_modified',
'databuilder.models.dashboard.dashboard_last_modified.DashboardLastModifiedTimestamp')
create_dashboard_tables_job().launch()
create_last_updated_job().launch() create_last_updated_job().launch()
...@@ -256,66 +324,18 @@ if __name__ == "__main__": ...@@ -256,66 +324,18 @@ if __name__ == "__main__":
model_name='databuilder.models.table_elasticsearch_document.TableESDocument') model_name='databuilder.models.table_elasticsearch_document.TableESDocument')
job_es_table.launch() job_es_table.launch()
user_elasticsearch_mapping = """
{
"mappings":{
"user":{
"properties": {
"email": {
"type":"text",
"analyzer": "simple",
"fields": {
"raw": {
"type": "keyword"
}
}
},
"first_name": {
"type":"text",
"analyzer": "simple",
"fields": {
"raw": {
"type": "keyword"
}
}
},
"last_name": {
"type":"text",
"analyzer": "simple",
"fields": {
"raw": {
"type": "keyword"
}
}
},
"full_name": {
"type":"text",
"analyzer": "simple",
"fields": {
"raw": {
"type": "keyword"
}
}
},
"total_read":{
"type": "long"
},
"total_own": {
"type": "long"
},
"total_follow": {
"type": "long"
}
}
}
}
}
"""
job_es_user = create_es_publisher_sample_job( job_es_user = create_es_publisher_sample_job(
elasticsearch_index_alias='user_search_index', elasticsearch_index_alias='user_search_index',
elasticsearch_doc_type_key='user', elasticsearch_doc_type_key='user',
model_name='databuilder.models.user_elasticsearch_document.UserESDocument', model_name='databuilder.models.user_elasticsearch_document.UserESDocument',
entity_type='user', entity_type='user',
elasticsearch_mapping=user_elasticsearch_mapping) elasticsearch_mapping=USER_ELASTICSEARCH_INDEX_MAPPING)
job_es_user.launch() job_es_user.launch()
job_es_dashboard = create_es_publisher_sample_job(
elasticsearch_index_alias='dashboard_search_index',
elasticsearch_doc_type_key='dashboard',
model_name='databuilder.models.dashboard_elasticsearch_document.DashboardESDocument',
entity_type='dashboard',
elasticsearch_mapping=DASHBOARD_ELASTICSEARCH_INDEX_MAPPING)
job_es_dashboard.launch()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment