Unverified Commit cb6dc2ed authored by Luis Garcés-Erice's avatar Luis Garcés-Erice Committed by GitHub

Db2 Metadata extractor (#241)

* Db2 Metadata extractor

* Fix lint requirements
parent e57e5c46
......@@ -198,6 +198,25 @@ job = DefaultJob(
job.launch()
```
#### [Db2MetadataExtractor](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/db2_metadata_extractor.py "Db2MetadataExtractor")
An extractor that extracts table and column metadata including database, schema, table name, table description, column name and column description from a Unix, Windows or Linux Db2 database or BigSQL.
The `where_clause_suffix` below should define which schemas you'd like to query or those that you would not (see [the sample data loader](https://github.com/lyft/amundsendatabuilder/blob/master/example/sample_db2_data_loader.py) for an example).
The SQL query driving the extraction is defined [here](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/db2_metadata_extractor.py)
```python
job_config = ConfigFactory.from_dict({
'extractor.db2_metadata.{}'.format(Db2MetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY): where_clause_suffix,
'extractor.db2_metadata.extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): connection_string()})
job = DefaultJob(
conf=job_config,
task=DefaultTask(
extractor=Db2MetadataExtractor(),
loader=AnyLoader()))
job.launch()
```
#### [SnowflakeMetadataExtractor](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/snowflake_metadata_extractor.py "SnowflakeMetadataExtractor")
An extractor that extracts table and column metadata including database, schema, table name, table description, column name and column description from a Snowflake database.
......
import logging
import six
from collections import namedtuple
from pyhocon import ConfigFactory, ConfigTree # noqa: F401
from typing import Iterator, Union, Dict, Any # noqa: F401
from databuilder import Scoped
from databuilder.extractor.base_extractor import Extractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.models.table_metadata import TableMetadata, ColumnMetadata
from itertools import groupby
TableKey = namedtuple('TableKey', ['schema', 'table_name'])
LOGGER = logging.getLogger(__name__)
class Db2MetadataExtractor(Extractor):
"""
Extracts Db2 table and column metadata from underlying meta store database using SQLAlchemyExtractor
"""
# SELECT statement from Db2 SYSIBM to extract table and column metadata
SQL_STATEMENT = """
SELECT
{cluster_source} as cluster, c.TABSCHEMA as schema, c.TABNAME as name, t.REMARKS as description,
c.COLNAME as col_name,
CASE WHEN c.TYPENAME='VARCHAR' OR c.TYPENAME='CHARACTER' THEN
TRIM (TRAILING FROM c.TYPENAME) concat '(' concat c.LENGTH concat ')'
WHEN c.TYPENAME='DECIMAL' THEN
TRIM (TRAILING FROM c.TYPENAME) concat '(' concat c.LENGTH concat ',' concat c.SCALE concat ')'
ELSE TRIM (TRAILING FROM c.TYPENAME) END as col_type,
c.REMARKS as col_description, c.COLNO as col_sort_order
FROM SYSCAT.COLUMNS c
INNER JOIN
SYSCAT.TABLES as t on c.TABSCHEMA=t.TABSCHEMA and c.TABNAME=t.TABNAME
{where_clause_suffix}
ORDER by cluster, schema, name, col_sort_order ;
"""
# CONFIG KEYS
WHERE_CLAUSE_SUFFIX_KEY = 'where_clause_suffix'
CLUSTER_KEY = 'cluster_key'
DATABASE_KEY = 'database_key'
# Default values
DEFAULT_CLUSTER_NAME = 'master'
DEFAULT_CONFIG = ConfigFactory.from_dict(
{WHERE_CLAUSE_SUFFIX_KEY: ' ', CLUSTER_KEY: DEFAULT_CLUSTER_NAME}
)
def init(self, conf):
# type: (ConfigTree) -> None
conf = conf.with_fallback(Db2MetadataExtractor.DEFAULT_CONFIG)
self._cluster = '{}'.format(conf.get_string(Db2MetadataExtractor.CLUSTER_KEY))
cluster_source = "'{}'".format(self._cluster)
database = conf.get_string(Db2MetadataExtractor.DATABASE_KEY, default='db2')
if six.PY2 and isinstance(database, six.text_type):
database = database.encode('utf-8', 'ignore')
self._database = database
self.sql_stmt = Db2MetadataExtractor.SQL_STATEMENT.format(
where_clause_suffix=conf.get_string(Db2MetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY),
cluster_source=cluster_source
)
self._alchemy_extractor = SQLAlchemyExtractor()
sql_alch_conf = Scoped.get_scoped_conf(conf, self._alchemy_extractor.get_scope())\
.with_fallback(ConfigFactory.from_dict({SQLAlchemyExtractor.EXTRACT_SQL: self.sql_stmt}))
self.sql_stmt = sql_alch_conf.get_string(SQLAlchemyExtractor.EXTRACT_SQL)
LOGGER.info('SQL for Db2 metadata: {}'.format(self.sql_stmt))
self._alchemy_extractor.init(sql_alch_conf)
self._extract_iter = None # type: Union[None, Iterator]
def extract(self):
# type: () -> Union[TableMetadata, None]
if not self._extract_iter:
self._extract_iter = self._get_extract_iter()
try:
return next(self._extract_iter)
except StopIteration:
return None
def get_scope(self):
# type: () -> str
return 'extractor.db2_metadata'
def _get_extract_iter(self):
# type: () -> Iterator[TableMetadata]
"""
Using itertools.groupby and raw level iterator, it groups to table and yields TableMetadata
:return:
"""
for key, group in groupby(self._get_raw_extract_iter(), self._get_table_key):
columns = []
for row in group:
last_row = row
columns.append(ColumnMetadata(row['col_name'], row['col_description'],
row['col_type'], row['col_sort_order']))
yield TableMetadata(self._database, last_row['cluster'],
last_row['schema'],
last_row['name'],
last_row['description'],
columns)
def _get_raw_extract_iter(self):
# type: () -> Iterator[Dict[str, Any]]
"""
Provides iterator of result row from SQLAlchemy extractor
:return:
"""
row = self._alchemy_extractor.extract()
while row:
yield row
row = self._alchemy_extractor.extract()
def _get_table_key(self, row):
# type: (Dict[str, Any]) -> Union[TableKey, None]
"""
Table key consists of schema and table name
:param row:
:return:
"""
if row:
return TableKey(schema=row['schema'], table_name=row['name'])
return None
"""
This is a example script which demo how to load data into neo4j without using Airflow DAG.
"""
import logging
import os
from pyhocon import ConfigFactory
import sys
import uuid
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.extractor.db2_metadata_extractor import Db2MetadataExtractor
from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader
from databuilder.publisher import neo4j_csv_publisher
from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher
from databuilder.task.task import DefaultTask
from elasticsearch import Elasticsearch
from databuilder.loader.file_system_elasticsearch_json_loader import FSElasticsearchJSONLoader
from databuilder.extractor.neo4j_search_data_extractor import Neo4jSearchDataExtractor
from databuilder.transformer.base_transformer import NoopTransformer
from databuilder.extractor.neo4j_extractor import Neo4jExtractor
from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.INFO)
# Disable Db2 logging
logging.getLogger("db2.connector.network").disabled = True
DB2_CONN_STRING = 'db2+ibm_db://username:password@database.host.name:50000/DB;'
# set env NEO4J_HOST to override localhost
NEO4J_ENDPOINT = 'bolt://{}:7687'.format(os.getenv('NEO4J_HOST', 'localhost'))
neo4j_endpoint = NEO4J_ENDPOINT
neo4j_user = 'neo4j'
neo4j_password = 'test'
es_host = None
neo_host = None
if len(sys.argv) > 1:
es_host = sys.argv[1]
if len(sys.argv) > 2:
neo_host = sys.argv[2]
es = Elasticsearch([
{'host': es_host if es_host else 'localhost'},
])
IGNORED_SCHEMAS = ['\'SYSIBM\'', '\'SYSIBMTS\'', '\'SYSTOOLS\'', '\'SYSCAT\'', '\'SYSIBMADM\'', '\'SYSSTAT\'']
def create_sample_db2_job():
where_clause = "WHERE c.TABSCHEMA not in ({0}) ;".format(','.join(IGNORED_SCHEMAS))
tmp_folder = '/var/tmp/amundsen/{}'.format('tables')
node_files_folder = '{tmp_folder}/nodes'.format(tmp_folder=tmp_folder)
relationship_files_folder = '{tmp_folder}/relationships'.format(tmp_folder=tmp_folder)
sql_extractor = Db2MetadataExtractor()
csv_loader = FsNeo4jCSVLoader()
task = DefaultTask(extractor=sql_extractor,
loader=csv_loader)
job_config = ConfigFactory.from_dict({
'extractor.db2_metadata.extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): DB2_CONN_STRING,
'extractor.db2_metadata.{}'.format(Db2MetadataExtractor.DATABASE_KEY): 'DEMODB',
'extractor.db2_metadata.{}'.format(Db2MetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY): where_clause,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.NODE_DIR_PATH): node_files_folder,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.RELATION_DIR_PATH): relationship_files_folder,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.SHOULD_DELETE_CREATED_DIR): True,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.FORCE_CREATE_DIR): True,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NODE_FILES_DIR): node_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.RELATION_FILES_DIR): relationship_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_END_POINT_KEY): neo4j_endpoint,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_USER): neo4j_user,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_PASSWORD): neo4j_password,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.JOB_PUBLISH_TAG): 'unique_tag'
})
job = DefaultJob(conf=job_config,
task=task,
publisher=Neo4jCsvPublisher())
return job
def create_es_publisher_sample_job(elasticsearch_index_alias='table_search_index',
elasticsearch_doc_type_key='table',
model_name='databuilder.models.table_elasticsearch_document.TableESDocument',
cypher_query=None,
elasticsearch_mapping=None):
"""
:param elasticsearch_index_alias: alias for Elasticsearch used in
amundsensearchlibrary/search_service/config.py as an index
:param elasticsearch_doc_type_key: name the ElasticSearch index is prepended with. Defaults to `table` resulting in
`table_search_index`
:param model_name: the Databuilder model class used in transporting between Extractor and Loader
:param cypher_query: Query handed to the `Neo4jSearchDataExtractor` class, if None is given (default)
it uses the `Table` query baked into the Extractor
:param elasticsearch_mapping: Elasticsearch field mapping "DDL" handed to the `ElasticsearchPublisher` class,
if None is given (default) it uses the `Table` query baked into the Publisher
"""
# loader saves data to this location and publisher reads it from here
extracted_search_data_path = '/var/tmp/amundsen/db2_data.json'
task = DefaultTask(loader=FSElasticsearchJSONLoader(),
extractor=Neo4jSearchDataExtractor(),
transformer=NoopTransformer())
# elastic search client instance
elasticsearch_client = es
# unique name of new index in Elasticsearch
elasticsearch_new_index_key = 'tables' + str(uuid.uuid4())
job_config = ConfigFactory.from_dict({
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.GRAPH_URL_CONFIG_KEY): neo4j_endpoint,
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.MODEL_CLASS_CONFIG_KEY): model_name,
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_USER): neo4j_user,
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_PW): neo4j_password,
'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY):
extracted_search_data_path,
'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY): 'w',
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_PATH_CONFIG_KEY):
extracted_search_data_path,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_MODE_CONFIG_KEY): 'r',
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY):
elasticsearch_client,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY):
elasticsearch_new_index_key,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_DOC_TYPE_CONFIG_KEY):
elasticsearch_doc_type_key,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY):
elasticsearch_index_alias,
})
# only optionally add these keys, so need to dynamically `put` them
if cypher_query:
job_config.put('extractor.search_data.{}'.format(Neo4jSearchDataExtractor.CYPHER_QUERY_CONFIG_KEY),
cypher_query)
if elasticsearch_mapping:
job_config.put('publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_MAPPING_CONFIG_KEY),
elasticsearch_mapping)
job = DefaultJob(conf=job_config,
task=task,
publisher=ElasticsearchPublisher())
return job
if __name__ == "__main__":
job = create_sample_db2_job()
job.launch()
job_es_table = create_es_publisher_sample_job(
elasticsearch_index_alias='table_search_index',
elasticsearch_doc_type_key='table',
model_name='databuilder.models.table_elasticsearch_document.TableESDocument')
job_es_table.launch()
......@@ -57,4 +57,3 @@ unicodecsv==0.14.1,<1.0
httplib2~=0.9.2
unidecode
......@@ -32,7 +32,12 @@ bigquery = [
jsonpath = ['jsonpath_rw==1.4.0']
all_deps = requirements + kafka + cassandra + glue + snowflake + athena + bigquery + jsonpath
db2 = [
'ibm_db',
'ibm-db-sa-py3'
]
all_deps = requirements + kafka + cassandra + glue + snowflake + athena + bigquery + jsonpath + db2
setup(
name='amundsen-databuilder',
......@@ -54,7 +59,8 @@ setup(
'snowflake': snowflake,
'athena': athena,
'bigquery': bigquery,
'jsonpath': jsonpath
'jsonpath': jsonpath,
'db2': db2
},
classifiers=[
'Programming Language :: Python :: 2.7',
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment