Unverified Commit 74dc98ec authored by Tao Feng's avatar Tao Feng Committed by GitHub

Add MYSQL metadata extractor (#247)

parent 3d01d83c
......@@ -201,6 +201,27 @@ job = DefaultJob(
job.launch()
```
#### [MysqlMetadataExtractor](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/mysql_metadata_extractor.py "PostgresMetadataExtractor")
An extractor that extracts table and column metadata including database, schema, table name, table description, column name and column description from a MYSQL database.
By default, the MYSQL database name is used as the cluster name. To override this, set `USE_CATALOG_AS_CLUSTER_NAME`
to `False`, and `CLUSTER_KEY` to what you wish to use as the cluster name.
The `where_clause_suffix` below should define which schemas you'd like to query.
The SQL query driving the extraction is defined [here](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/mysql_metadata_extractor.py)
```python
job_config = ConfigFactory.from_dict({
'extractor.mysql_metadata.{}'.format(MysqlMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY): where_clause_suffix,
'extractor.mysql_metadata.{}'.format(MysqlMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME): True,
'extractor.postgres_metadata.extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): connection_string()})
job = DefaultJob(conf=job_config,
task=DefaultTask(extractor=MysqlMetadataExtractor(), loader=FsNeo4jCSVLoader()),
publisher=Neo4jCsvPublisher())
job.launch()
#### [Db2MetadataExtractor](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/db2_metadata_extractor.py "Db2MetadataExtractor")
An extractor that extracts table and column metadata including database, schema, table name, table description, column name and column description from a Unix, Windows or Linux Db2 database or BigSQL.
......
import logging
import six
from collections import namedtuple
from pyhocon import ConfigFactory, ConfigTree # noqa: F401
from typing import Iterator, Union, Dict, Any # noqa: F401
from databuilder import Scoped
from databuilder.extractor.base_extractor import Extractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.models.table_metadata import TableMetadata, ColumnMetadata
from itertools import groupby
TableKey = namedtuple('TableKey', ['schema', 'table_name'])
LOGGER = logging.getLogger(__name__)
class MysqlMetadataExtractor(Extractor):
"""
Extracts mysql table and column metadata from underlying meta store database using SQLAlchemyExtractor
"""
# SELECT statement from mysql information_schema to extract table and column metadata
SQL_STATEMENT = """
SELECT
lower(c.column_name) AS col_name,
c.column_comment AS col_description,
lower(c.data_type) AS col_type,
lower(c.ordinal_position) AS col_sort_order,
{cluster_source} AS cluster,
lower(c.table_schema) AS "schema",
lower(c.table_name) AS name,
t.table_comment AS description,
case when lower(t.table_type) = "view" then "true" else "false" end AS is_view
FROM
INFORMATION_SCHEMA.COLUMNS AS c
LEFT JOIN
INFORMATION_SCHEMA.TABLES t
ON c.TABLE_NAME = t.TABLE_NAME
AND c.TABLE_SCHEMA = t.TABLE_SCHEMA
{where_clause_suffix}
ORDER by cluster, "schema", name, col_sort_order ;
"""
# CONFIG KEYS
WHERE_CLAUSE_SUFFIX_KEY = 'where_clause_suffix'
CLUSTER_KEY = 'cluster_key'
USE_CATALOG_AS_CLUSTER_NAME = 'use_catalog_as_cluster_name'
DATABASE_KEY = 'database_key'
# Default values
DEFAULT_CLUSTER_NAME = 'master'
DEFAULT_CONFIG = ConfigFactory.from_dict(
{WHERE_CLAUSE_SUFFIX_KEY: ' ', CLUSTER_KEY: DEFAULT_CLUSTER_NAME, USE_CATALOG_AS_CLUSTER_NAME: True}
)
def init(self, conf):
# type: (ConfigTree) -> None
conf = conf.with_fallback(MysqlMetadataExtractor.DEFAULT_CONFIG)
self._cluster = '{}'.format(conf.get_string(MysqlMetadataExtractor.CLUSTER_KEY))
if conf.get_bool(MysqlMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME):
cluster_source = "c.table_catalog"
else:
cluster_source = "'{}'".format(self._cluster)
database = conf.get_string(MysqlMetadataExtractor.DATABASE_KEY, default='mysql')
if six.PY2 and isinstance(database, six.text_type):
database = database.encode('utf-8', 'ignore')
self._database = database
self.sql_stmt = MysqlMetadataExtractor.SQL_STATEMENT.format(
where_clause_suffix=conf.get_string(MysqlMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY),
cluster_source=cluster_source
)
self._alchemy_extractor = SQLAlchemyExtractor()
sql_alch_conf = Scoped.get_scoped_conf(conf, self._alchemy_extractor.get_scope())\
.with_fallback(ConfigFactory.from_dict({SQLAlchemyExtractor.EXTRACT_SQL: self.sql_stmt}))
self.sql_stmt = sql_alch_conf.get_string(SQLAlchemyExtractor.EXTRACT_SQL)
LOGGER.info('SQL for mysql metadata: {}'.format(self.sql_stmt))
self._alchemy_extractor.init(sql_alch_conf)
self._extract_iter = None # type: Union[None, Iterator]
def extract(self):
# type: () -> Union[TableMetadata, None]
if not self._extract_iter:
self._extract_iter = self._get_extract_iter()
try:
return next(self._extract_iter)
except StopIteration:
return None
def get_scope(self):
# type: () -> str
return 'extractor.mysql_metadata'
def _get_extract_iter(self):
# type: () -> Iterator[TableMetadata]
"""
Using itertools.groupby and raw level iterator, it groups to table and yields TableMetadata
:return:
"""
for key, group in groupby(self._get_raw_extract_iter(), self._get_table_key):
columns = []
for row in group:
last_row = row
columns.append(ColumnMetadata(row['col_name'], row['col_description'],
row['col_type'], row['col_sort_order']))
yield TableMetadata(self._database, last_row['cluster'],
last_row['schema'],
last_row['name'],
last_row['description'],
columns,
is_view=last_row['is_view'])
def _get_raw_extract_iter(self):
# type: () -> Iterator[Dict[str, Any]]
"""
Provides iterator of result row from SQLAlchemy extractor
:return:
"""
row = self._alchemy_extractor.extract()
while row:
yield row
row = self._alchemy_extractor.extract()
def _get_table_key(self, row):
# type: (Dict[str, Any]) -> Union[TableKey, None]
"""
Table key consists of schema and table name
:param row:
:return:
"""
if row:
return TableKey(schema=row['schema'], table_name=row['name'])
return None
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment