Unverified Commit 146ccd0e authored by Tao Feng's avatar Tao Feng Committed by GitHub

Druid metadata extractor (#257)

* Druid metadata extractor

* add doc

* fix lint

* add extra deps

* Update version
parent 0ab3f467
...@@ -178,6 +178,28 @@ If using the filters option here is the input format ...@@ -178,6 +178,28 @@ If using the filters option here is the input format
] ]
``` ```
#### [DruidMetadataExtractor](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/druid_metadata_extractor.py)
An extractor that extracts table and column metadata including database, schema, table name, table description, column name and column description from a [Druid](https://druid.apache.org/) DB.
The `where_clause_suffix` could be defined, normally you would like to filter out the in `INFORMATION_SCHEMA`.
You could specify the following job config
```python
conn_string = "druid+https://{host}:{port}/druid/v2/sql/".format(
host=druid_broker_host,
port=443
)
job_config = ConfigFactory.from_dict({
'extractor.druid_metadata.{}'.format(PostgresMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY): where_clause_suffix,
'extractor.druid_metadata.extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): conn_string()})
job = DefaultJob(
conf=job_config,
task=DefaultTask(
extractor=DruidMetadataExtractor(),
loader=AnyLoader()))
job.launch()
```
#### [PostgresMetadataExtractor](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/postgres_metadata_extractor.py "PostgresMetadataExtractor") #### [PostgresMetadataExtractor](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/postgres_metadata_extractor.py "PostgresMetadataExtractor")
An extractor that extracts table and column metadata including database, schema, table name, table description, column name and column description from a Postgres or Redshift database. An extractor that extracts table and column metadata including database, schema, table name, table description, column name and column description from a Postgres or Redshift database.
...@@ -211,7 +233,7 @@ The `where_clause_suffix` below should define which schemas you'd like to query ...@@ -211,7 +233,7 @@ The `where_clause_suffix` below should define which schemas you'd like to query
The SQL query driving the extraction is defined [here](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/mssql_metadata_extractor.py) The SQL query driving the extraction is defined [here](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/mssql_metadata_extractor.py)
This extractor is highly derived from [PostgresMetadataExtractor](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/postgres_metadata_extractor.py "PostgresMetadataExtractor"). This extractor is highly derived from [PostgresMetadataExtractor](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/postgres_metadata_extractor.py "PostgresMetadataExtractor").
```python ```python
job_config = ConfigFactory.from_dict({ job_config = ConfigFactory.from_dict({
'extractor.mssql_metadata.{}'.format(MSSQLMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY): where_clause_suffix, 'extractor.mssql_metadata.{}'.format(MSSQLMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY): where_clause_suffix,
...@@ -225,7 +247,7 @@ job = DefaultJob( ...@@ -225,7 +247,7 @@ job = DefaultJob(
job.launch() job.launch()
``` ```
#### [MysqlMetadataExtractor](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/mysql_metadata_extractor.py "PostgresMetadataExtractor") #### [MysqlMetadataExtractor](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/mysql_metadata_extractor.py "MysqlMetadataExtractor")
An extractor that extracts table and column metadata including database, schema, table name, table description, column name and column description from a MYSQL database. An extractor that extracts table and column metadata including database, schema, table name, table description, column name and column description from a MYSQL database.
By default, the MYSQL database name is used as the cluster name. To override this, set `USE_CATALOG_AS_CLUSTER_NAME` By default, the MYSQL database name is used as the cluster name. To override this, set `USE_CATALOG_AS_CLUSTER_NAME`
...@@ -239,7 +261,7 @@ The SQL query driving the extraction is defined [here](https://github.com/lyft/a ...@@ -239,7 +261,7 @@ The SQL query driving the extraction is defined [here](https://github.com/lyft/a
job_config = ConfigFactory.from_dict({ job_config = ConfigFactory.from_dict({
'extractor.mysql_metadata.{}'.format(MysqlMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY): where_clause_suffix, 'extractor.mysql_metadata.{}'.format(MysqlMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY): where_clause_suffix,
'extractor.mysql_metadata.{}'.format(MysqlMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME): True, 'extractor.mysql_metadata.{}'.format(MysqlMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME): True,
'extractor.postgres_metadata.extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): connection_string()}) 'extractor.mysql_metadata.extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): connection_string()})
job = DefaultJob(conf=job_config, job = DefaultJob(conf=job_config,
task=DefaultTask(extractor=MysqlMetadataExtractor(), loader=FsNeo4jCSVLoader()), task=DefaultTask(extractor=MysqlMetadataExtractor(), loader=FsNeo4jCSVLoader()),
publisher=Neo4jCsvPublisher()) publisher=Neo4jCsvPublisher())
......
import logging
from collections import namedtuple
import textwrap
from pyhocon import ConfigFactory, ConfigTree # noqa: F401
from typing import Iterator, Union, Dict, Any # noqa: F401
from databuilder import Scoped
from databuilder.extractor.base_extractor import Extractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.models.table_metadata import TableMetadata, ColumnMetadata
from itertools import groupby
TableKey = namedtuple('TableKey', ['schema', 'table_name'])
LOGGER = logging.getLogger(__name__)
class DruidMetadataExtractor(Extractor):
"""
Extracts Druid table and column metadata from druid using dbapi extractor
"""
SQL_STATEMENT = textwrap.dedent("""
SELECT
TABLE_SCHEMA as schema,
TABLE_NAME as name,
COLUMN_NAME as col_name,
DATA_TYPE as col_type,
ORDINAL_POSITION as col_sort_order
FROM INFORMATION_SCHEMA.COLUMNS
{where_clause_suffix}
order by TABLE_SCHEMA, TABLE_NAME, CAST(ORDINAL_POSITION AS int)
""")
# CONFIG KEYS
WHERE_CLAUSE_SUFFIX_KEY = 'where_clause_suffix'
CLUSTER_KEY = 'cluster'
DEFAULT_CONFIG = ConfigFactory.from_dict({WHERE_CLAUSE_SUFFIX_KEY: ' ',
CLUSTER_KEY: 'gold'})
def init(self, conf):
# type: (ConfigTree) -> None
conf = conf.with_fallback(DruidMetadataExtractor.DEFAULT_CONFIG)
self._cluster = '{}'.format(conf.get_string(DruidMetadataExtractor.CLUSTER_KEY))
self.sql_stmt = DruidMetadataExtractor.SQL_STATEMENT.format(
where_clause_suffix=conf.get_string(DruidMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY,
default=''))
self._alchemy_extractor = SQLAlchemyExtractor()
sql_alch_conf = Scoped.get_scoped_conf(conf, self._alchemy_extractor.get_scope())\
.with_fallback(ConfigFactory.from_dict({SQLAlchemyExtractor.EXTRACT_SQL: self.sql_stmt}))
self._alchemy_extractor.init(sql_alch_conf)
self._extract_iter = None # type: Union[None, Iterator]
def extract(self):
# type: () -> Union[TableMetadata, None]
if not self._extract_iter:
self._extract_iter = self._get_extract_iter()
try:
return next(self._extract_iter)
except StopIteration:
return None
def get_scope(self):
# type: () -> str
return 'extractor.druid_metadata'
def _get_extract_iter(self):
# type: () -> Iterator[TableMetadata]
"""
Using itertools.groupby and raw level iterator, it groups to table and yields TableMetadata
:return:
"""
for key, group in groupby(self._get_raw_extract_iter(), self._get_table_key):
columns = []
# no table description and column description
for row in group:
last_row = row
columns.append(ColumnMetadata(name=row['col_name'],
description='',
col_type=row['col_type'],
sort_order=row['col_sort_order']))
yield TableMetadata(database='druid',
cluster=self._cluster,
schema=last_row['schema'],
name=last_row['name'],
description='',
columns=columns)
def _get_raw_extract_iter(self):
# type: () -> Iterator[Dict[str, Any]]
"""
Provides iterator of result row from dbapi extractor
:return:
"""
row = self._alchemy_extractor.extract()
while row:
yield row
row = self._alchemy_extractor.extract()
def _get_table_key(self, row):
# type: (Dict[str, Any]) -> Union[TableKey, None]
"""
Table key consists of schema and table name
:param row:
:return:
"""
if row:
return TableKey(schema=row['schema'], table_name=row['name'])
return None
...@@ -37,7 +37,11 @@ db2 = [ ...@@ -37,7 +37,11 @@ db2 = [
'ibm-db-sa-py3' 'ibm-db-sa-py3'
] ]
all_deps = requirements + kafka + cassandra + glue + snowflake + athena + bigquery + jsonpath + db2 druid = [
'pydruid'
]
all_deps = requirements + kafka + cassandra + glue + snowflake + athena + bigquery + jsonpath + db2 + druid
setup( setup(
name='amundsen-databuilder', name='amundsen-databuilder',
...@@ -60,7 +64,8 @@ setup( ...@@ -60,7 +64,8 @@ setup(
'athena': athena, 'athena': athena,
'bigquery': bigquery, 'bigquery': bigquery,
'jsonpath': jsonpath, 'jsonpath': jsonpath,
'db2': db2 'db2': db2,
'druid': druid,
}, },
classifiers=[ classifiers=[
'Programming Language :: Python :: 2.7', 'Programming Language :: Python :: 2.7',
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment