# Copyright Contributors to the Amundsen project. # SPDX-License-Identifier: Apache-2.0 import textwrap from datetime import datetime, timedelta import uuid from elasticsearch import Elasticsearch from airflow import DAG # noqa from airflow import macros # noqa from airflow.operators.python_operator import PythonOperator # noqa from pyhocon import ConfigFactory from databuilder.extractor.neo4j_search_data_extractor import Neo4jSearchDataExtractor from databuilder.extractor.postgres_metadata_extractor import PostgresMetadataExtractor from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher from databuilder.extractor.neo4j_extractor import Neo4jExtractor from databuilder.job.job import DefaultJob from databuilder.loader.file_system_elasticsearch_json_loader import FSElasticsearchJSONLoader from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader from databuilder.publisher import neo4j_csv_publisher from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher from databuilder.task.task import DefaultTask from databuilder.transformer.base_transformer import NoopTransformer dag_args = { 'concurrency': 10, # One dagrun at a time 'max_active_runs': 1, # 4AM, 4PM PST 'schedule_interval': '0 11 * * *', 'catchup': False } default_args = { 'owner': 'amundsen', 'start_date': datetime(2018, 6, 18), 'depends_on_past': False, 'email': [''], 'email_on_failure': False, 'email_on_retry': False, 'retries': 3, 'priority_weight': 10, 'retry_delay': timedelta(minutes=5), 'execution_timeout': timedelta(minutes=120) } # NEO4J cluster endpoints NEO4J_ENDPOINT = 'bolt://neo4j:7687' neo4j_endpoint = NEO4J_ENDPOINT neo4j_user = 'neo4j' neo4j_password = 'test' es = Elasticsearch([ {'host': 'elasticsearch'}, ]) # TODO: user provides a list of schema for indexing SUPPORTED_SCHEMAS = ['public'] # String format - ('schema1', schema2', .... 'schemaN') SUPPORTED_SCHEMA_SQL_IN_CLAUSE = "('{schemas}')".format(schemas="', '".join(SUPPORTED_SCHEMAS)) OPTIONAL_TABLE_NAMES = '' def connection_string(): user = 'user' password = 'password' host = 'host.docker.internal' port = '5432' db = 'moviesdemo' return "postgresql://%s:%s@%s:%s/%s" % (user, password, host, port, db) def create_table_extract_job(**kwargs): where_clause_suffix = textwrap.dedent(""" where table_schema in {schemas} """).format(schemas=SUPPORTED_SCHEMA_SQL_IN_CLAUSE) tmp_folder = '/var/tmp/amundsen/table_metadata' node_files_folder = '{tmp_folder}/nodes/'.format(tmp_folder=tmp_folder) relationship_files_folder = '{tmp_folder}/relationships/'.format(tmp_folder=tmp_folder) job_config = ConfigFactory.from_dict({ 'extractor.postgres_metadata.{}'.format(PostgresMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY): where_clause_suffix, 'extractor.postgres_metadata.{}'.format(PostgresMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME): True, 'extractor.postgres_metadata.extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): connection_string(), 'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.NODE_DIR_PATH): node_files_folder, 'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.RELATION_DIR_PATH): relationship_files_folder, 'publisher.neo4j.{}'.format(neo4j_csv_publisher.NODE_FILES_DIR): node_files_folder, 'publisher.neo4j.{}'.format(neo4j_csv_publisher.RELATION_FILES_DIR): relationship_files_folder, 'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_END_POINT_KEY): neo4j_endpoint, 'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_USER): neo4j_user, 'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_PASSWORD): neo4j_password, 'publisher.neo4j.{}'.format(neo4j_csv_publisher.JOB_PUBLISH_TAG): 'unique_tag', # should use unique tag here like {ds} }) job = DefaultJob(conf=job_config, task=DefaultTask(extractor=PostgresMetadataExtractor(), loader=FsNeo4jCSVLoader()), publisher=Neo4jCsvPublisher()) job.launch() def create_es_publisher_sample_job(): # loader saves data to this location and publisher reads it from here extracted_search_data_path = '/var/tmp/amundsen/search_data.json' task = DefaultTask(loader=FSElasticsearchJSONLoader(), extractor=Neo4jSearchDataExtractor(), transformer=NoopTransformer()) # elastic search client instance elasticsearch_client = es # unique name of new index in Elasticsearch elasticsearch_new_index_key = 'tables' + str(uuid.uuid4()) # related to mapping type from /databuilder/publisher/elasticsearch_publisher.py#L38 elasticsearch_new_index_key_type = 'table' # alias for Elasticsearch used in amundsensearchlibrary/search_service/config.py as an index elasticsearch_index_alias = 'table_search_index' job_config = ConfigFactory.from_dict({ 'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.GRAPH_URL_CONFIG_KEY): neo4j_endpoint, 'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.MODEL_CLASS_CONFIG_KEY): 'databuilder.models.table_elasticsearch_document.TableESDocument', 'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_USER): neo4j_user, 'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_PW): neo4j_password, 'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY): extracted_search_data_path, 'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY): 'w', 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_PATH_CONFIG_KEY): extracted_search_data_path, 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_MODE_CONFIG_KEY): 'r', 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY): elasticsearch_client, 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY): elasticsearch_new_index_key, 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_DOC_TYPE_CONFIG_KEY): elasticsearch_new_index_key_type, 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY): elasticsearch_index_alias }) job = DefaultJob(conf=job_config, task=task, publisher=ElasticsearchPublisher()) job.launch() with DAG('amundsen_databuilder', default_args=default_args, **dag_args) as dag: create_table_extract_job = PythonOperator( task_id='create_table_extract_job', python_callable=create_table_extract_job ) create_es_index_job = PythonOperator( task_id='create_es_publisher_sample_job', python_callable=create_es_publisher_sample_job )