diff --git a/README.md b/README.md index e0463de38140ab66a50f2162d531ef64dadea871..9c25b8c78518e28e0bc3a5a867383d9f0e06743c 100644 --- a/README.md +++ b/README.md @@ -198,6 +198,25 @@ job = DefaultJob( job.launch() ``` +#### [Db2MetadataExtractor](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/db2_metadata_extractor.py "Db2MetadataExtractor") +An extractor that extracts table and column metadata including database, schema, table name, table description, column name and column description from a Unix, Windows or Linux Db2 database or BigSQL. + +The `where_clause_suffix` below should define which schemas you'd like to query or those that you would not (see [the sample data loader](https://github.com/lyft/amundsendatabuilder/blob/master/example/sample_db2_data_loader.py) for an example). + +The SQL query driving the extraction is defined [here](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/db2_metadata_extractor.py) + +```python +job_config = ConfigFactory.from_dict({ + 'extractor.db2_metadata.{}'.format(Db2MetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY): where_clause_suffix, + 'extractor.db2_metadata.extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): connection_string()}) +job = DefaultJob( + conf=job_config, + task=DefaultTask( + extractor=Db2MetadataExtractor(), + loader=AnyLoader())) +job.launch() +``` + #### [SnowflakeMetadataExtractor](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/snowflake_metadata_extractor.py "SnowflakeMetadataExtractor") An extractor that extracts table and column metadata including database, schema, table name, table description, column name and column description from a Snowflake database. diff --git a/databuilder/extractor/db2_metadata_extractor.py b/databuilder/extractor/db2_metadata_extractor.py new file mode 100644 index 0000000000000000000000000000000000000000..6d9920f98700cf962c5d884f43c7bb240925d359 --- /dev/null +++ b/databuilder/extractor/db2_metadata_extractor.py @@ -0,0 +1,136 @@ +import logging +import six +from collections import namedtuple + +from pyhocon import ConfigFactory, ConfigTree # noqa: F401 +from typing import Iterator, Union, Dict, Any # noqa: F401 + +from databuilder import Scoped +from databuilder.extractor.base_extractor import Extractor +from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor +from databuilder.models.table_metadata import TableMetadata, ColumnMetadata +from itertools import groupby + +TableKey = namedtuple('TableKey', ['schema', 'table_name']) + +LOGGER = logging.getLogger(__name__) + + +class Db2MetadataExtractor(Extractor): + """ + Extracts Db2 table and column metadata from underlying meta store database using SQLAlchemyExtractor + """ + # SELECT statement from Db2 SYSIBM to extract table and column metadata + SQL_STATEMENT = """ + SELECT + {cluster_source} as cluster, c.TABSCHEMA as schema, c.TABNAME as name, t.REMARKS as description, + c.COLNAME as col_name, + CASE WHEN c.TYPENAME='VARCHAR' OR c.TYPENAME='CHARACTER' THEN + TRIM (TRAILING FROM c.TYPENAME) concat '(' concat c.LENGTH concat ')' + WHEN c.TYPENAME='DECIMAL' THEN + TRIM (TRAILING FROM c.TYPENAME) concat '(' concat c.LENGTH concat ',' concat c.SCALE concat ')' + ELSE TRIM (TRAILING FROM c.TYPENAME) END as col_type, + c.REMARKS as col_description, c.COLNO as col_sort_order + FROM SYSCAT.COLUMNS c + INNER JOIN + SYSCAT.TABLES as t on c.TABSCHEMA=t.TABSCHEMA and c.TABNAME=t.TABNAME + {where_clause_suffix} + ORDER by cluster, schema, name, col_sort_order ; + """ + + # CONFIG KEYS + WHERE_CLAUSE_SUFFIX_KEY = 'where_clause_suffix' + CLUSTER_KEY = 'cluster_key' + DATABASE_KEY = 'database_key' + + # Default values + DEFAULT_CLUSTER_NAME = 'master' + + DEFAULT_CONFIG = ConfigFactory.from_dict( + {WHERE_CLAUSE_SUFFIX_KEY: ' ', CLUSTER_KEY: DEFAULT_CLUSTER_NAME} + ) + + def init(self, conf): + # type: (ConfigTree) -> None + conf = conf.with_fallback(Db2MetadataExtractor.DEFAULT_CONFIG) + self._cluster = '{}'.format(conf.get_string(Db2MetadataExtractor.CLUSTER_KEY)) + + cluster_source = "'{}'".format(self._cluster) + + database = conf.get_string(Db2MetadataExtractor.DATABASE_KEY, default='db2') + if six.PY2 and isinstance(database, six.text_type): + database = database.encode('utf-8', 'ignore') + + self._database = database + + self.sql_stmt = Db2MetadataExtractor.SQL_STATEMENT.format( + where_clause_suffix=conf.get_string(Db2MetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY), + cluster_source=cluster_source + ) + + self._alchemy_extractor = SQLAlchemyExtractor() + sql_alch_conf = Scoped.get_scoped_conf(conf, self._alchemy_extractor.get_scope())\ + .with_fallback(ConfigFactory.from_dict({SQLAlchemyExtractor.EXTRACT_SQL: self.sql_stmt})) + + self.sql_stmt = sql_alch_conf.get_string(SQLAlchemyExtractor.EXTRACT_SQL) + + LOGGER.info('SQL for Db2 metadata: {}'.format(self.sql_stmt)) + + self._alchemy_extractor.init(sql_alch_conf) + self._extract_iter = None # type: Union[None, Iterator] + + def extract(self): + # type: () -> Union[TableMetadata, None] + if not self._extract_iter: + self._extract_iter = self._get_extract_iter() + try: + return next(self._extract_iter) + except StopIteration: + return None + + def get_scope(self): + # type: () -> str + return 'extractor.db2_metadata' + + def _get_extract_iter(self): + # type: () -> Iterator[TableMetadata] + """ + Using itertools.groupby and raw level iterator, it groups to table and yields TableMetadata + :return: + """ + for key, group in groupby(self._get_raw_extract_iter(), self._get_table_key): + columns = [] + + for row in group: + last_row = row + columns.append(ColumnMetadata(row['col_name'], row['col_description'], + row['col_type'], row['col_sort_order'])) + + yield TableMetadata(self._database, last_row['cluster'], + last_row['schema'], + last_row['name'], + last_row['description'], + columns) + + def _get_raw_extract_iter(self): + # type: () -> Iterator[Dict[str, Any]] + """ + Provides iterator of result row from SQLAlchemy extractor + :return: + """ + row = self._alchemy_extractor.extract() + while row: + yield row + row = self._alchemy_extractor.extract() + + def _get_table_key(self, row): + # type: (Dict[str, Any]) -> Union[TableKey, None] + """ + Table key consists of schema and table name + :param row: + :return: + """ + if row: + return TableKey(schema=row['schema'], table_name=row['name']) + + return None diff --git a/example/scripts/sample_db2_data_loader.py b/example/scripts/sample_db2_data_loader.py new file mode 100644 index 0000000000000000000000000000000000000000..e366d26661ab30c5b9c44e4533a45cf37796dc60 --- /dev/null +++ b/example/scripts/sample_db2_data_loader.py @@ -0,0 +1,161 @@ +""" +This is a example script which demo how to load data into neo4j without using Airflow DAG. +""" + +import logging +import os +from pyhocon import ConfigFactory + +import sys +import uuid + +from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor +from databuilder.extractor.db2_metadata_extractor import Db2MetadataExtractor +from databuilder.job.job import DefaultJob +from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader +from databuilder.publisher import neo4j_csv_publisher +from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher +from databuilder.task.task import DefaultTask + +from elasticsearch import Elasticsearch +from databuilder.loader.file_system_elasticsearch_json_loader import FSElasticsearchJSONLoader +from databuilder.extractor.neo4j_search_data_extractor import Neo4jSearchDataExtractor +from databuilder.transformer.base_transformer import NoopTransformer +from databuilder.extractor.neo4j_extractor import Neo4jExtractor +from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.INFO) +# Disable Db2 logging +logging.getLogger("db2.connector.network").disabled = True + +DB2_CONN_STRING = 'db2+ibm_db://username:password@database.host.name:50000/DB;' + +# set env NEO4J_HOST to override localhost +NEO4J_ENDPOINT = 'bolt://{}:7687'.format(os.getenv('NEO4J_HOST', 'localhost')) +neo4j_endpoint = NEO4J_ENDPOINT + +neo4j_user = 'neo4j' +neo4j_password = 'test' + +es_host = None +neo_host = None +if len(sys.argv) > 1: + es_host = sys.argv[1] +if len(sys.argv) > 2: + neo_host = sys.argv[2] + +es = Elasticsearch([ + {'host': es_host if es_host else 'localhost'}, +]) + +IGNORED_SCHEMAS = ['\'SYSIBM\'', '\'SYSIBMTS\'', '\'SYSTOOLS\'', '\'SYSCAT\'', '\'SYSIBMADM\'', '\'SYSSTAT\''] + + +def create_sample_db2_job(): + + where_clause = "WHERE c.TABSCHEMA not in ({0}) ;".format(','.join(IGNORED_SCHEMAS)) + + tmp_folder = '/var/tmp/amundsen/{}'.format('tables') + node_files_folder = '{tmp_folder}/nodes'.format(tmp_folder=tmp_folder) + relationship_files_folder = '{tmp_folder}/relationships'.format(tmp_folder=tmp_folder) + + sql_extractor = Db2MetadataExtractor() + csv_loader = FsNeo4jCSVLoader() + + task = DefaultTask(extractor=sql_extractor, + loader=csv_loader) + + job_config = ConfigFactory.from_dict({ + 'extractor.db2_metadata.extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): DB2_CONN_STRING, + 'extractor.db2_metadata.{}'.format(Db2MetadataExtractor.DATABASE_KEY): 'DEMODB', + 'extractor.db2_metadata.{}'.format(Db2MetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY): where_clause, + 'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.NODE_DIR_PATH): node_files_folder, + 'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.RELATION_DIR_PATH): relationship_files_folder, + 'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.SHOULD_DELETE_CREATED_DIR): True, + 'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.FORCE_CREATE_DIR): True, + 'publisher.neo4j.{}'.format(neo4j_csv_publisher.NODE_FILES_DIR): node_files_folder, + 'publisher.neo4j.{}'.format(neo4j_csv_publisher.RELATION_FILES_DIR): relationship_files_folder, + 'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_END_POINT_KEY): neo4j_endpoint, + 'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_USER): neo4j_user, + 'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_PASSWORD): neo4j_password, + 'publisher.neo4j.{}'.format(neo4j_csv_publisher.JOB_PUBLISH_TAG): 'unique_tag' + }) + job = DefaultJob(conf=job_config, + task=task, + publisher=Neo4jCsvPublisher()) + return job + + +def create_es_publisher_sample_job(elasticsearch_index_alias='table_search_index', + elasticsearch_doc_type_key='table', + model_name='databuilder.models.table_elasticsearch_document.TableESDocument', + cypher_query=None, + elasticsearch_mapping=None): + """ + :param elasticsearch_index_alias: alias for Elasticsearch used in + amundsensearchlibrary/search_service/config.py as an index + :param elasticsearch_doc_type_key: name the ElasticSearch index is prepended with. Defaults to `table` resulting in + `table_search_index` + :param model_name: the Databuilder model class used in transporting between Extractor and Loader + :param cypher_query: Query handed to the `Neo4jSearchDataExtractor` class, if None is given (default) + it uses the `Table` query baked into the Extractor + :param elasticsearch_mapping: Elasticsearch field mapping "DDL" handed to the `ElasticsearchPublisher` class, + if None is given (default) it uses the `Table` query baked into the Publisher + """ + # loader saves data to this location and publisher reads it from here + extracted_search_data_path = '/var/tmp/amundsen/db2_data.json' + + task = DefaultTask(loader=FSElasticsearchJSONLoader(), + extractor=Neo4jSearchDataExtractor(), + transformer=NoopTransformer()) + + # elastic search client instance + elasticsearch_client = es + # unique name of new index in Elasticsearch + elasticsearch_new_index_key = 'tables' + str(uuid.uuid4()) + + job_config = ConfigFactory.from_dict({ + 'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.GRAPH_URL_CONFIG_KEY): neo4j_endpoint, + 'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.MODEL_CLASS_CONFIG_KEY): model_name, + 'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_USER): neo4j_user, + 'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_PW): neo4j_password, + 'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY): + extracted_search_data_path, + 'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY): 'w', + 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_PATH_CONFIG_KEY): + extracted_search_data_path, + 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_MODE_CONFIG_KEY): 'r', + 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY): + elasticsearch_client, + 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY): + elasticsearch_new_index_key, + 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_DOC_TYPE_CONFIG_KEY): + elasticsearch_doc_type_key, + 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY): + elasticsearch_index_alias, + }) + + # only optionally add these keys, so need to dynamically `put` them + if cypher_query: + job_config.put('extractor.search_data.{}'.format(Neo4jSearchDataExtractor.CYPHER_QUERY_CONFIG_KEY), + cypher_query) + if elasticsearch_mapping: + job_config.put('publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_MAPPING_CONFIG_KEY), + elasticsearch_mapping) + + job = DefaultJob(conf=job_config, + task=task, + publisher=ElasticsearchPublisher()) + return job + + +if __name__ == "__main__": + job = create_sample_db2_job() + job.launch() + + job_es_table = create_es_publisher_sample_job( + elasticsearch_index_alias='table_search_index', + elasticsearch_doc_type_key='table', + model_name='databuilder.models.table_elasticsearch_document.TableESDocument') + job_es_table.launch() diff --git a/requirements.txt b/requirements.txt index 6b0a64913a0ebde4efb5f72fbd58a1a1db95beaa..dc5b21b9d2b3945282cf7c5889cbd9fde2d3dc26 100644 --- a/requirements.txt +++ b/requirements.txt @@ -57,4 +57,3 @@ unicodecsv==0.14.1,<1.0 httplib2~=0.9.2 unidecode - diff --git a/setup.py b/setup.py index 631da41470316cc818973b50dc7a48e8f354a3b1..bd1def6894efcd1f5b9357945d3f163dc7d29252 100644 --- a/setup.py +++ b/setup.py @@ -32,7 +32,12 @@ bigquery = [ jsonpath = ['jsonpath_rw==1.4.0'] -all_deps = requirements + kafka + cassandra + glue + snowflake + athena + bigquery + jsonpath +db2 = [ + 'ibm_db', + 'ibm-db-sa-py3' +] + +all_deps = requirements + kafka + cassandra + glue + snowflake + athena + bigquery + jsonpath + db2 setup( name='amundsen-databuilder', @@ -54,7 +59,8 @@ setup( 'snowflake': snowflake, 'athena': athena, 'bigquery': bigquery, - 'jsonpath': jsonpath + 'jsonpath': jsonpath, + 'db2': db2 }, classifiers=[ 'Programming Language :: Python :: 2.7',