# Copyright Contributors to the Amundsen project. # SPDX-License-Identifier: Apache-2.0 """ This is a example script demonstrating how to load data into Neo4j and Elasticsearch without using an Airflow DAG. It contains several jobs: - `run_tableau_*_job: executes a job to execute a specific Tableau extractor and publish the resulting metadata to neo4j - `create_es_publisher_sample_job`: creates a job that extracts data from neo4j and pubishes it into elasticsearch. For other available extractors, please take a look at https://github.com/lyft/amundsendatabuilder#list-of-extractors """ import logging import os import sys import uuid from elasticsearch import Elasticsearch from pyhocon import ConfigFactory from sqlalchemy.ext.declarative import declarative_base from databuilder.extractor.neo4j_search_data_extractor import Neo4jSearchDataExtractor from databuilder.job.job import DefaultJob from databuilder.loader.file_system_elasticsearch_json_loader import FSElasticsearchJSONLoader from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader from databuilder.publisher.elasticsearch_constants import DASHBOARD_ELASTICSEARCH_INDEX_MAPPING from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher from databuilder.task.task import DefaultTask from databuilder.transformer.base_transformer import NoopTransformer from databuilder.extractor.dashboard.tableau.tableau_dashboard_extractor import TableauDashboardExtractor from databuilder.extractor.dashboard.tableau.tableau_dashboard_last_modified_extractor import \ TableauDashboardLastModifiedExtractor from databuilder.extractor.dashboard.tableau.tableau_dashboard_query_extractor import TableauDashboardQueryExtractor from databuilder.extractor.dashboard.tableau.tableau_dashboard_table_extractor import TableauDashboardTableExtractor from databuilder.extractor.dashboard.tableau.tableau_external_table_extractor import \ TableauDashboardExternalTableExtractor es_host = os.getenv('CREDENTIALS_ELASTICSEARCH_PROXY_HOST', 'localhost') neo_host = os.getenv('CREDENTIALS_NEO4J_PROXY_HOST', 'localhost') es_port = os.getenv('CREDENTIALS_ELASTICSEARCH_PROXY_PORT', 9200) neo_port = os.getenv('CREDENTIALS_NEO4J_PROXY_PORT', 7687) if len(sys.argv) > 1: es_host = sys.argv[1] if len(sys.argv) > 2: neo_host = sys.argv[2] es = Elasticsearch([ {'host': es_host, 'port': es_port}, ]) Base = declarative_base() NEO4J_ENDPOINT = 'bolt://{}:{}'.format(neo_host, neo_port) neo4j_endpoint = NEO4J_ENDPOINT neo4j_user = 'neo4j' neo4j_password = 'test' LOGGER = logging.getLogger(__name__) tableau_base_url = "" tableau_api_base_url = "" tableau_api_version = 0 tableau_site_name = "" tableau_personal_access_token_name = "" tableau_personal_access_token_secret = "" tableau_excluded_projects = [] tableau_dashboard_cluster = "" tableau_dashboard_database = "" tableau_external_table_cluster = "" tableau_external_table_schema = "" tableau_external_table_types = [] tableau_verify_request = None common_tableau_config = { 'publisher.neo4j.neo4j_endpoint': neo4j_endpoint, 'publisher.neo4j.neo4j_user': neo4j_user, 'publisher.neo4j.neo4j_password': neo4j_password, 'publisher.neo4j.neo4j_encrypted': False, 'publisher.neo4j.job_publish_tag': 'unique_tag', # should use unique tag here like {ds} } def create_es_publisher_sample_job(elasticsearch_index_alias='table_search_index', elasticsearch_doc_type_key='table', model_name='databuilder.models.table_elasticsearch_document.TableESDocument', entity_type='table', elasticsearch_mapping=None): """ :param elasticsearch_index_alias: alias for Elasticsearch used in amundsensearchlibrary/search_service/config.py as an index :param elasticsearch_doc_type_key: name the ElasticSearch index is prepended with. Defaults to `table` resulting in `table_{uuid}` :param model_name: the Databuilder model class used in transporting between Extractor and Loader :param entity_type: Entity type handed to the `Neo4jSearchDataExtractor` class, used to determine Cypher query to extract data from Neo4j. Defaults to `table`. :param elasticsearch_mapping: Elasticsearch field mapping "DDL" handed to the `ElasticsearchPublisher` class, if None is given (default) it uses the `Table` query baked into the Publisher """ # loader saves data to this location and publisher reads it from here extracted_search_data_path = '/var/tmp/amundsen/search_data.json' task = DefaultTask(loader=FSElasticsearchJSONLoader(), extractor=Neo4jSearchDataExtractor(), transformer=NoopTransformer()) # elastic search client instance elasticsearch_client = es # unique name of new index in Elasticsearch elasticsearch_new_index_key = '{}_'.format(elasticsearch_doc_type_key) + str(uuid.uuid4()) job_config = ConfigFactory.from_dict({ 'extractor.search_data.entity_type': entity_type, 'extractor.search_data.extractor.neo4j.graph_url': neo4j_endpoint, 'extractor.search_data.extractor.neo4j.model_class': model_name, 'extractor.search_data.extractor.neo4j.neo4j_auth_user': neo4j_user, 'extractor.search_data.extractor.neo4j.neo4j_auth_pw': neo4j_password, 'extractor.search_data.extractor.neo4j.neo4j_encrypted': False, 'loader.filesystem.elasticsearch.file_path': extracted_search_data_path, 'loader.filesystem.elasticsearch.mode': 'w', 'publisher.elasticsearch.file_path': extracted_search_data_path, 'publisher.elasticsearch.mode': 'r', 'publisher.elasticsearch.client': elasticsearch_client, 'publisher.elasticsearch.new_index': elasticsearch_new_index_key, 'publisher.elasticsearch.doc_type': elasticsearch_doc_type_key, 'publisher.elasticsearch.alias': elasticsearch_index_alias, }) # only optionally add these keys, so need to dynamically `put` them if elasticsearch_mapping: job_config.put('publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_MAPPING_CONFIG_KEY), elasticsearch_mapping) job = DefaultJob(conf=job_config, task=task, publisher=ElasticsearchPublisher()) return job def run_tableau_metadata_job(): task = DefaultTask(extractor=TableauDashboardExtractor(), loader=FsNeo4jCSVLoader()) tmp_folder = '/var/tmp/amundsen/tableau_dashboard_metadata' node_files_folder = '{tmp_folder}/nodes'.format(tmp_folder=tmp_folder) relationship_files_folder = '{tmp_folder}/relationships'.format(tmp_folder=tmp_folder) dict_config = common_tableau_config dict_config.update({ 'extractor.tableau_dashboard_metadata.api_base_url': tableau_api_base_url, 'extractor.tableau_dashboard_metadata.tableau_base_url': tableau_base_url, 'extractor.tableau_dashboard_metadata.api_version': tableau_api_version, 'extractor.tableau_dashboard_metadata.site_name': tableau_site_name, 'extractor.tableau_dashboard_metadata.tableau_personal_access_token_name': tableau_personal_access_token_name, 'extractor.tableau_dashboard_metadata.tableau_personal_access_token_secret': tableau_personal_access_token_secret, 'extractor.tableau_dashboard_metadata.excluded_projects': tableau_excluded_projects, 'extractor.tableau_dashboard_metadata.cluster': tableau_dashboard_cluster, 'extractor.tableau_dashboard_metadata.database': tableau_dashboard_database, 'extractor.tableau_dashboard_metadata.transformer.timestamp_str_to_epoch.timestamp_format': "%Y-%m-%dT%H:%M:%SZ", 'extractor.tableau_dashboard_metadata.verify_request': tableau_verify_request, 'loader.filesystem_csv_neo4j.node_dir_path': node_files_folder, 'loader.filesystem_csv_neo4j.relationship_dir_path': relationship_files_folder, 'loader.filesystem_csv_neo4j.delete_created_directories': True, 'task.progress_report_frequency': 100, 'publisher.neo4j.node_files_directory': node_files_folder, 'publisher.neo4j.relation_files_directory': relationship_files_folder, }) job_config = ConfigFactory.from_dict(dict_config) job = DefaultJob(conf=job_config, task=task, publisher=Neo4jCsvPublisher()) job.launch() def run_tableau_last_modified_job(): task = DefaultTask(extractor=TableauDashboardLastModifiedExtractor(), loader=FsNeo4jCSVLoader()) tmp_folder = '/var/tmp/amundsen/tableau_dashboard_user' node_files_folder = '{tmp_folder}/nodes'.format(tmp_folder=tmp_folder) relationship_files_folder = '{tmp_folder}/relationships'.format(tmp_folder=tmp_folder) dict_config = common_tableau_config dict_config.update({ 'extractor.tableau_dashboard_last_modified.api_base_url': tableau_api_base_url, 'extractor.tableau_dashboard_last_modified.api_version': tableau_api_version, 'extractor.tableau_dashboard_last_modified.site_name': tableau_site_name, 'extractor.tableau_dashboard_last_modified.tableau_personal_access_token_name': tableau_personal_access_token_name, 'extractor.tableau_dashboard_last_modified.tableau_personal_access_token_secret': tableau_personal_access_token_secret, 'extractor.tableau_dashboard_last_modified.excluded_projects': tableau_excluded_projects, 'extractor.tableau_dashboard_last_modified.cluster': tableau_dashboard_cluster, 'extractor.tableau_dashboard_last_modified.database': tableau_dashboard_database, 'extractor.tableau_dashboard_last_modified.transformer.timestamp_str_to_epoch.timestamp_format': "%Y-%m-%dT%H:%M:%SZ", 'extractor.tableau_dashboard_last_modified.verify_request': tableau_verify_request, 'loader.filesystem_csv_neo4j.node_dir_path': node_files_folder, 'loader.filesystem_csv_neo4j.relationship_dir_path': relationship_files_folder, 'loader.filesystem_csv_neo4j.delete_created_directories': True, 'task.progress_report_frequency': 100, 'publisher.neo4j.node_files_directory': node_files_folder, 'publisher.neo4j.relation_files_directory': relationship_files_folder, }) job_config = ConfigFactory.from_dict(dict_config) job = DefaultJob(conf=job_config, task=task, publisher=Neo4jCsvPublisher()) job.launch() def run_tableau_query_job(): task = DefaultTask(extractor=TableauDashboardQueryExtractor(), loader=FsNeo4jCSVLoader()) tmp_folder = '/var/tmp/amundsen/tableau_dashboard_query' node_files_folder = '{tmp_folder}/nodes'.format(tmp_folder=tmp_folder) relationship_files_folder = '{tmp_folder}/relationships'.format(tmp_folder=tmp_folder) dict_config = common_tableau_config dict_config.update({ 'extractor.tableau_dashboard_query.api_base_url': tableau_api_base_url, 'extractor.tableau_dashboard_query.api_version': tableau_api_version, 'extractor.tableau_dashboard_query.site_name': tableau_site_name, 'extractor.tableau_dashboard_query.tableau_personal_access_token_name': tableau_personal_access_token_name, 'extractor.tableau_dashboard_query.tableau_personal_access_token_secret': tableau_personal_access_token_secret, 'extractor.tableau_dashboard_query.excluded_projects': tableau_excluded_projects, 'extractor.tableau_dashboard_query.cluster': tableau_dashboard_cluster, 'extractor.tableau_dashboard_query.database': tableau_dashboard_database, 'extractor.tableau_dashboard_query.transformer.timestamp_str_to_epoch.timestamp_format': "%Y-%m-%dT%H:%M:%SZ", 'extractor.tableau_dashboard_query.verify_request': tableau_verify_request, 'loader.filesystem_csv_neo4j.node_dir_path': node_files_folder, 'loader.filesystem_csv_neo4j.relationship_dir_path': relationship_files_folder, 'loader.filesystem_csv_neo4j.delete_created_directories': True, 'task.progress_report_frequency': 100, 'publisher.neo4j.node_files_directory': node_files_folder, 'publisher.neo4j.relation_files_directory': relationship_files_folder, }) job_config = ConfigFactory.from_dict(dict_config) job = DefaultJob(conf=job_config, task=task, publisher=Neo4jCsvPublisher()) job.launch() def run_tableau_table_job(): task = DefaultTask(extractor=TableauDashboardTableExtractor(), loader=FsNeo4jCSVLoader()) tmp_folder = '/var/tmp/amundsen/tableau_dashboard_table' node_files_folder = '{tmp_folder}/nodes'.format(tmp_folder=tmp_folder) relationship_files_folder = '{tmp_folder}/relationships'.format(tmp_folder=tmp_folder) dict_config = common_tableau_config dict_config.update({ 'extractor.tableau_dashboard_table.api_base_url': tableau_api_base_url, 'extractor.tableau_dashboard_table.api_version': tableau_api_version, 'extractor.tableau_dashboard_table.site_name': tableau_site_name, 'extractor.tableau_dashboard_table.tableau_personal_access_token_name': tableau_personal_access_token_name, 'extractor.tableau_dashboard_table.tableau_personal_access_token_secret': tableau_personal_access_token_secret, 'extractor.tableau_dashboard_table.excluded_projects': tableau_excluded_projects, 'extractor.tableau_dashboard_table.cluster': tableau_dashboard_cluster, 'extractor.tableau_dashboard_table.database': tableau_dashboard_database, 'extractor.tableau_dashboard_table.external_cluster_name': tableau_external_table_cluster, 'extractor.tableau_dashboard_table.external_schema_name': tableau_external_table_schema, 'extractor.tableau_dashboard_table.transformer.timestamp_str_to_epoch.timestamp_format': "%Y-%m-%dT%H:%M:%SZ", 'extractor.tableau_dashboard_table.verify_request': tableau_verify_request, 'loader.filesystem_csv_neo4j.node_dir_path': node_files_folder, 'loader.filesystem_csv_neo4j.relationship_dir_path': relationship_files_folder, 'loader.filesystem_csv_neo4j.delete_created_directories': True, 'task.progress_report_frequency': 100, 'publisher.neo4j.node_files_directory': node_files_folder, 'publisher.neo4j.relation_files_directory': relationship_files_folder, }) job_config = ConfigFactory.from_dict(dict_config) job = DefaultJob(conf=job_config, task=task, publisher=Neo4jCsvPublisher()) job.launch() def run_tableau_external_table_job(): task = DefaultTask(extractor=TableauDashboardExternalTableExtractor(), loader=FsNeo4jCSVLoader()) tmp_folder = '/var/tmp/amundsen/tableau_dashboard_external_table' node_files_folder = '{tmp_folder}/nodes'.format(tmp_folder=tmp_folder) relationship_files_folder = '{tmp_folder}/relationships'.format(tmp_folder=tmp_folder) dict_config = common_tableau_config dict_config.update({ 'extractor.tableau_external_table.api_base_url': tableau_api_base_url, 'extractor.tableau_external_table.api_version': tableau_api_version, 'extractor.tableau_external_table.site_name': tableau_site_name, 'extractor.tableau_external_table.tableau_personal_access_token_name': tableau_personal_access_token_name, 'extractor.tableau_external_table.tableau_personal_access_token_secret': tableau_personal_access_token_secret, 'extractor.tableau_external_table.excluded_projects': tableau_excluded_projects, 'extractor.tableau_external_table.cluster': tableau_dashboard_cluster, 'extractor.tableau_external_table.database': tableau_dashboard_database, 'extractor.tableau_external_table.external_cluster_name': tableau_external_table_cluster, 'extractor.tableau_external_table.external_schema_name': tableau_external_table_schema, 'extractor.tableau_external_table.external_table_types': tableau_external_table_types, 'extractor.tableau_external_table.verify_request': tableau_verify_request, 'loader.filesystem_csv_neo4j.node_dir_path': node_files_folder, 'loader.filesystem_csv_neo4j.relationship_dir_path': relationship_files_folder, 'loader.filesystem_csv_neo4j.delete_created_directories': True, 'task.progress_report_frequency': 100, 'publisher.neo4j.node_files_directory': node_files_folder, 'publisher.neo4j.relation_files_directory': relationship_files_folder, }) job_config = ConfigFactory.from_dict(dict_config) job = DefaultJob(conf=job_config, task=task, publisher=Neo4jCsvPublisher()) job.launch() if __name__ == "__main__": # Uncomment next line to get INFO level logging # logging.basicConfig(level=logging.INFO) run_tableau_metadata_job() run_tableau_external_table_job() run_tableau_table_job() run_tableau_query_job() run_tableau_last_modified_job() job_es_table = create_es_publisher_sample_job( elasticsearch_index_alias='table_search_index', elasticsearch_doc_type_key='table', entity_type='table', model_name='databuilder.models.table_elasticsearch_document.TableESDocument') job_es_table.launch() job_es_dashboard = create_es_publisher_sample_job( elasticsearch_index_alias='dashboard_search_index', elasticsearch_doc_type_key='dashboard', model_name='databuilder.models.dashboard_elasticsearch_document.DashboardESDocument', entity_type='dashboard', elasticsearch_mapping=DASHBOARD_ELASTICSEARCH_INDEX_MAPPING) job_es_dashboard.launch()