Unverified Commit 61ef7474 authored by Sarthak Killedar's avatar Sarthak Killedar Committed by GitHub

MS SQL MataData Extractor (#252)

* MS SQL MataData Extractor

* Fix Python 2.7 Build Failure

* Added MS SQL How To Use Guide

* Added MS SQL How To Use Guide
parent 68d700b5
...@@ -181,7 +181,7 @@ If using the filters option here is the input format ...@@ -181,7 +181,7 @@ If using the filters option here is the input format
#### [PostgresMetadataExtractor](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/postgres_metadata_extractor.py "PostgresMetadataExtractor") #### [PostgresMetadataExtractor](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/postgres_metadata_extractor.py "PostgresMetadataExtractor")
An extractor that extracts table and column metadata including database, schema, table name, table description, column name and column description from a Postgres or Redshift database. An extractor that extracts table and column metadata including database, schema, table name, table description, column name and column description from a Postgres or Redshift database.
By default, the Postgres/Redshift database name is used as the cluter name. To override this, set `USE_CATALOG_AS_CLUSTER_NAME` By default, the Postgres/Redshift database name is used as the cluster name. To override this, set `USE_CATALOG_AS_CLUSTER_NAME`
to `False`, and `CLUSTER_KEY` to what you wish to use as the cluster name. to `False`, and `CLUSTER_KEY` to what you wish to use as the cluster name.
The `where_clause_suffix` below should define which schemas you'd like to query (see [the sample dag](https://github.com/lyft/amundsendatabuilder/blob/master/example/dags/postgres_sample_dag.py) for an example). The `where_clause_suffix` below should define which schemas you'd like to query (see [the sample dag](https://github.com/lyft/amundsendatabuilder/blob/master/example/dags/postgres_sample_dag.py) for an example).
...@@ -201,6 +201,30 @@ job = DefaultJob( ...@@ -201,6 +201,30 @@ job = DefaultJob(
job.launch() job.launch()
``` ```
#### [MSSQLMetadataExtractor](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/mssql_metadata_extractor.py "PostgresMetadataExtractor")
An extractor that extracts table and column metadata including database, schema, table name, table description, column name and column description from a Microsoft SQL database.
By default, the Microsoft SQL Server Database name is used as the cluster name. To override this, set `USE_CATALOG_AS_CLUSTER_NAME`
to `False`, and `CLUSTER_KEY` to what you wish to use as the cluster name.
The `where_clause_suffix` below should define which schemas you'd like to query (`"('dbo','sys')"`).
The SQL query driving the extraction is defined [here](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/mssql_metadata_extractor.py)
This extractor is highly derived from [PostgresMetadataExtractor](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/postgres_metadata_extractor.py "PostgresMetadataExtractor").
```python
job_config = ConfigFactory.from_dict({
'extractor.mssql_metadata.{}'.format(MSSQLMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY): where_clause_suffix,
'extractor.mssql_metadata.{}'.format(MSSQLMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME): True,
'extractor.mssql_metadata.extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): connection_string()})
job = DefaultJob(
conf=job_config,
task=DefaultTask(
extractor=MSSQLMetadataExtractor(),
loader=AnyLoader()))
job.launch()
```
#### [MysqlMetadataExtractor](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/mysql_metadata_extractor.py "PostgresMetadataExtractor") #### [MysqlMetadataExtractor](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/mysql_metadata_extractor.py "PostgresMetadataExtractor")
An extractor that extracts table and column metadata including database, schema, table name, table description, column name and column description from a MYSQL database. An extractor that extracts table and column metadata including database, schema, table name, table description, column name and column description from a MYSQL database.
......
import logging
import six
from collections import namedtuple
from pyhocon import ConfigFactory, ConfigTree # noqa: F401
from typing import Iterator, Union, Dict, Any # noqa: F401
from databuilder import Scoped
from databuilder.extractor.base_extractor import Extractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.models.table_metadata import TableMetadata, ColumnMetadata
from itertools import groupby
TableKey = namedtuple('TableKey', ['schema_name', 'table_name'])
LOGGER = logging.getLogger(__name__)
class MSSQLMetadataExtractor(Extractor):
"""
Extracts Microsoft SQL Server table and column metadata from underlying
meta store database using SQLAlchemyExtractor
"""
# SELECT statement from MS SQL to extract table and column metadata
SQL_STATEMENT = """
SELECT DISTINCT {cluster_source} AS CLUSTER,
TBL.TABLE_SCHEMA AS [SCHEMA_NAME],
TBL.TABLE_NAME AS [NAME],
CAST(PROP.VALUE AS NVARCHAR(MAX)) AS [DESCRIPTION],
COL.COLUMN_NAME AS [COL_NAME],
COL.DATA_TYPE AS [COL_TYPE],
CAST(PROP_COL.VALUE AS NVARCHAR(MAX)) AS [COL_DESCRIPTION],
COL.ORDINAL_POSITION AS COL_SORT_ORDER
FROM INFORMATION_SCHEMA.TABLES TBL
INNER JOIN INFORMATION_SCHEMA.COLUMNS COL ON COL.TABLE_NAME = TBL.TABLE_NAME
AND COL.TABLE_SCHEMA = TBL.TABLE_SCHEMA
LEFT JOIN SYS.EXTENDED_PROPERTIES PROP ON PROP.MAJOR_ID = OBJECT_ID(TBL.TABLE_SCHEMA + '.' + TBL.TABLE_NAME)
AND PROP.MINOR_ID = 0
AND PROP.NAME = 'MS_Description'
LEFT JOIN SYS.EXTENDED_PROPERTIES PROP_COL ON PROP_COL.MAJOR_ID = OBJECT_ID(TBL.TABLE_SCHEMA + '.' + TBL.TABLE_NAME)
AND PROP_COL.MINOR_ID = COL.ORDINAL_POSITION
AND PROP_COL.NAME = 'MS_Description'
WHERE TBL.TABLE_TYPE = 'base table' {where_clause_suffix}
ORDER BY CLUSTER,
SCHEMA_NAME,
NAME,
COL_SORT_ORDER;
"""
# CONFIG KEYS
WHERE_CLAUSE_SUFFIX_KEY = 'where_clause_suffix'
CLUSTER_KEY = 'cluster_key'
USE_CATALOG_AS_CLUSTER_NAME = 'use_catalog_as_cluster_name'
DATABASE_KEY = 'database_key'
# Default values
DEFAULT_CLUSTER_NAME = 'DB_NAME()'
DEFAULT_CONFIG = ConfigFactory.from_dict(
{WHERE_CLAUSE_SUFFIX_KEY: '', CLUSTER_KEY: DEFAULT_CLUSTER_NAME, USE_CATALOG_AS_CLUSTER_NAME: True}
)
DEFAULT_WHERE_CLAUSE_VALUE = 'and tbl.table_schema in {schemas}'
def init(self, conf):
# type: (ConfigTree) -> None
conf = conf.with_fallback(MSSQLMetadataExtractor.DEFAULT_CONFIG)
self._cluster = '{}'.format(conf.get_string(MSSQLMetadataExtractor.CLUSTER_KEY))
if conf.get_bool(MSSQLMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME):
cluster_source = "DB_NAME()"
else:
cluster_source = "'{}'".format(self._cluster)
database = conf.get_string(MSSQLMetadataExtractor.DATABASE_KEY, default='mssql')
if six.PY2 and isinstance(database, six.text_type):
database = database.encode('utf-8', 'ignore')
self._database = database
config_where_clause = conf.get_string(MSSQLMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY)
logging.info("Crawling for Schemas %s", config_where_clause)
if len(config_where_clause) > 0:
where_clause_suffix = MSSQLMetadataExtractor.DEFAULT_WHERE_CLAUSE_VALUE.format(schemas=config_where_clause)
else:
where_clause_suffix = ''
self.sql_stmt = MSSQLMetadataExtractor.SQL_STATEMENT.format(
where_clause_suffix=where_clause_suffix,
cluster_source=cluster_source
)
LOGGER.info('SQL for MS SQL Metadata: {}'.format(self.sql_stmt))
self._alchemy_extractor = SQLAlchemyExtractor()
sql_alch_conf = Scoped.get_scoped_conf(conf, self._alchemy_extractor.get_scope()) \
.with_fallback(ConfigFactory.from_dict({SQLAlchemyExtractor.EXTRACT_SQL: self.sql_stmt}))
self._alchemy_extractor.init(sql_alch_conf)
self._extract_iter = None # type: Union[None, Iterator]
def extract(self):
# type: () -> Union[TableMetadata, None]
if not self._extract_iter:
self._extract_iter = self._get_extract_iter()
try:
return next(self._extract_iter)
except StopIteration:
return None
def get_scope(self):
# type: () -> str
return 'extractor.mssql_metadata'
def _get_extract_iter(self):
# type: () -> Iterator[TableMetadata]
"""
Using itertools.groupby and raw level iterator, it groups to table and yields TableMetadata
:return:
"""
for key, group in groupby(self._get_raw_extract_iter(), self._get_table_key):
columns = []
for row in group:
last_row = row
columns.append(ColumnMetadata(row['col_name'], row['col_description'],
row['col_type'], row['col_sort_order']))
yield TableMetadata(self._database, last_row['cluster'],
last_row['schema_name'],
last_row['name'],
last_row['description'],
columns, tags=last_row['schema_name'])
def _get_raw_extract_iter(self):
# type: () -> Iterator[Dict[str, Any]]
"""
Provides iterator of result row from SQLAlchemy extractor
:return:
"""
row = self._alchemy_extractor.extract()
while row:
yield row
row = self._alchemy_extractor.extract()
def _get_table_key(self, row):
# type: (Dict[str, Any]) -> Union[TableKey, None]
"""
Table key consists of schema and table name
:param row:
:return:
"""
if row:
return TableKey(schema_name=row['schema_name'], table_name=row['name'])
return None
import logging
import unittest
from mock import patch, MagicMock
from pyhocon import ConfigFactory
from typing import Any, Dict # noqa: F401
from databuilder.extractor.mssql_metadata_extractor import MSSQLMetadataExtractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.models.table_metadata import TableMetadata, ColumnMetadata
class TestMSSQLMetadataExtractor(unittest.TestCase):
def setUp(self):
# type: () -> None
logging.basicConfig(level=logging.INFO)
config_dict = {
'extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING):
'TEST_CONNECTION',
'extractor.mssql_metadata.{}'.format(MSSQLMetadataExtractor.CLUSTER_KEY):
'MY_CLUSTER',
'extractor.mssql_metadata.{}'.format(MSSQLMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME):
False,
'extractor.mssql_metadata.{}'.format(MSSQLMetadataExtractor.DATABASE_KEY):
'mssql',
'extractor.mssql_metadata.{}'.format(MSSQLMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY): ''
}
self.conf = ConfigFactory.from_dict(config_dict)
def test_extraction_with_empty_query_result(self):
# type: () -> None
"""
Test Extraction with empty result from query
"""
with patch.object(SQLAlchemyExtractor, '_get_connection'):
extractor = MSSQLMetadataExtractor()
extractor.init(self.conf)
results = extractor.extract()
self.assertEqual(results, None)
def test_extraction_with_single_result(self):
# type: () -> None
with patch.object(SQLAlchemyExtractor, '_get_connection') as mock_connection:
connection = MagicMock()
mock_connection.return_value = connection
sql_execute = MagicMock()
connection.execute = sql_execute
table = {'schema_name': 'test_schema',
'name': 'test_table',
'description': 'a table for testing',
'cluster':
self.conf['extractor.mssql_metadata.{}'.format(MSSQLMetadataExtractor.CLUSTER_KEY)]
}
sql_execute.return_value = [
self._union(
{'col_name': 'col_id1',
'col_type': 'bigint',
'col_description': 'description of id1',
'col_sort_order': 0}, table),
self._union(
{'col_name': 'col_id2',
'col_type': 'bigint',
'col_description': 'description of id2',
'col_sort_order': 1}, table),
self._union(
{'col_name': 'is_active',
'col_type': 'boolean',
'col_description': None,
'col_sort_order': 2}, table),
self._union(
{'col_name': 'source',
'col_type': 'varchar',
'col_description': 'description of source',
'col_sort_order': 3}, table),
self._union(
{'col_name': 'etl_created_at',
'col_type': 'timestamp',
'col_description': 'description of etl_created_at',
'col_sort_order': 4}, table),
self._union(
{'col_name': 'ds',
'col_type': 'varchar',
'col_description': None,
'col_sort_order': 5}, table)
]
extractor = MSSQLMetadataExtractor()
extractor.init(self.conf)
actual = extractor.extract()
expected = TableMetadata('mssql', 'MY_CLUSTER', 'test_schema', 'test_table', 'a table for testing',
[ColumnMetadata('col_id1', 'description of id1', 'bigint', 0),
ColumnMetadata('col_id2', 'description of id2', 'bigint', 1),
ColumnMetadata('is_active', None, 'boolean', 2),
ColumnMetadata('source', 'description of source', 'varchar', 3),
ColumnMetadata('etl_created_at', 'description of etl_created_at', 'timestamp', 4),
ColumnMetadata('ds', None, 'varchar', 5)], tags='test_schema')
self.assertEqual(expected.__repr__(), actual.__repr__())
self.assertIsNone(extractor.extract())
def test_extraction_with_multiple_result(self):
# type: () -> None
with patch.object(SQLAlchemyExtractor, '_get_connection') as mock_connection:
connection = MagicMock()
mock_connection.return_value = connection
sql_execute = MagicMock()
connection.execute = sql_execute
table = {'schema_name': 'test_schema1',
'name': 'test_table1',
'description': 'test table 1',
'cluster':
self.conf['extractor.mssql_metadata.{}'.format(MSSQLMetadataExtractor.CLUSTER_KEY)]
}
table1 = {'schema_name': 'test_schema1',
'name': 'test_table2',
'description': 'test table 2',
'cluster':
self.conf['extractor.mssql_metadata.{}'.format(MSSQLMetadataExtractor.CLUSTER_KEY)]
}
table2 = {'schema_name': 'test_schema2',
'name': 'test_table3',
'description': 'test table 3',
'cluster':
self.conf['extractor.mssql_metadata.{}'.format(MSSQLMetadataExtractor.CLUSTER_KEY)]
}
sql_execute.return_value = [
self._union(
{'col_name': 'col_id1',
'col_type': 'bigint',
'col_description': 'description of col_id1',
'col_sort_order': 0}, table),
self._union(
{'col_name': 'col_id2',
'col_type': 'bigint',
'col_description': 'description of col_id2',
'col_sort_order': 1}, table),
self._union(
{'col_name': 'is_active',
'col_type': 'boolean',
'col_description': None,
'col_sort_order': 2}, table),
self._union(
{'col_name': 'source',
'col_type': 'varchar',
'col_description': 'description of source',
'col_sort_order': 3}, table),
self._union(
{'col_name': 'etl_created_at',
'col_type': 'timestamp',
'col_description': 'description of etl_created_at',
'col_sort_order': 4}, table),
self._union(
{'col_name': 'ds',
'col_type': 'varchar',
'col_description': None,
'col_sort_order': 5}, table),
self._union(
{'col_name': 'col_name',
'col_type': 'varchar',
'col_description': 'description of col_name',
'col_sort_order': 0}, table1),
self._union(
{'col_name': 'col_name2',
'col_type': 'varchar',
'col_description': 'description of col_name2',
'col_sort_order': 1}, table1),
self._union(
{'col_name': 'col_id3',
'col_type': 'varchar',
'col_description': 'description of col_id3',
'col_sort_order': 0}, table2),
self._union(
{'col_name': 'col_name3',
'col_type': 'varchar',
'col_description': 'description of col_name3',
'col_sort_order': 1}, table2)
]
extractor = MSSQLMetadataExtractor()
extractor.init(self.conf)
expected = TableMetadata('mssql',
self.conf['extractor.mssql_metadata.{}'.format(
MSSQLMetadataExtractor.CLUSTER_KEY)],
'test_schema1', 'test_table1', 'test table 1',
[ColumnMetadata('col_id1', 'description of col_id1', 'bigint', 0),
ColumnMetadata('col_id2', 'description of col_id2', 'bigint', 1),
ColumnMetadata('is_active', None, 'boolean', 2),
ColumnMetadata('source', 'description of source', 'varchar', 3),
ColumnMetadata('etl_created_at', 'description of etl_created_at', 'timestamp', 4),
ColumnMetadata('ds', None, 'varchar', 5)], tags='test_schema1')
self.assertEqual(expected.__repr__(), extractor.extract().__repr__())
expected = TableMetadata('mssql',
self.conf['extractor.mssql_metadata.{}'.format(
MSSQLMetadataExtractor.CLUSTER_KEY)],
'test_schema1', 'test_table2', 'test table 2',
[ColumnMetadata('col_name', 'description of col_name', 'varchar', 0),
ColumnMetadata('col_name2', 'description of col_name2', 'varchar', 1)],
tags='test_schema1')
self.assertEqual(expected.__repr__(), extractor.extract().__repr__())
expected = TableMetadata('mssql',
self.conf['extractor.mssql_metadata.{}'.format(
MSSQLMetadataExtractor.CLUSTER_KEY)],
'test_schema2', 'test_table3', 'test table 3',
[ColumnMetadata('col_id3', 'description of col_id3', 'varchar', 0),
ColumnMetadata('col_name3', 'description of col_name3',
'varchar', 1)], tags='test_schema2')
self.assertEqual(expected.__repr__(), extractor.extract().__repr__())
self.assertIsNone(extractor.extract())
self.assertIsNone(extractor.extract())
def _union(self, target, extra):
# type: (Dict[Any, Any], Dict[Any, Any]) -> Dict[Any, Any]
target.update(extra)
return target
class TestMSSQLMetadataExtractorWithWhereClause(unittest.TestCase):
def setUp(self):
# type: () -> None
logging.basicConfig(level=logging.INFO)
self.where_clause_suffix = """
('dbo', 'sys')
"""
config_dict = {
MSSQLMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY: self.where_clause_suffix,
'extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING):
'TEST_CONNECTION'
}
self.conf = ConfigFactory.from_dict(config_dict)
def test_sql_statement(self):
# type: () -> None
"""
Test Extraction with empty result from query
"""
with patch.object(SQLAlchemyExtractor, '_get_connection'):
extractor = MSSQLMetadataExtractor()
extractor.init(self.conf)
self.assertTrue(self.where_clause_suffix in extractor.sql_stmt)
class TestMSSQLMetadataExtractorClusterKeyNoTableCatalog(unittest.TestCase):
# test when USE_CATALOG_AS_CLUSTER_NAME is false and CLUSTER_KEY is specified
def setUp(self):
# type: () -> None
logging.basicConfig(level=logging.INFO)
self.cluster_key = "not_master"
config_dict = {
MSSQLMetadataExtractor.CLUSTER_KEY: self.cluster_key,
'extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING):
'TEST_CONNECTION',
MSSQLMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME: False
}
self.conf = ConfigFactory.from_dict(config_dict)
def test_sql_statement(self):
# type: () -> None
"""
Test Extraction with empty result from query
"""
with patch.object(SQLAlchemyExtractor, '_get_connection'):
extractor = MSSQLMetadataExtractor()
extractor.init(self.conf)
self.assertTrue(self.cluster_key in extractor.sql_stmt)
class TestMSSQLMetadataExtractorNoClusterKeyNoTableCatalog(unittest.TestCase):
# test when USE_CATALOG_AS_CLUSTER_NAME is false and CLUSTER_KEY is NOT specified
def setUp(self):
# type: () -> None
logging.basicConfig(level=logging.INFO)
config_dict = {
'extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING):
'TEST_CONNECTION',
MSSQLMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME: False
}
self.conf = ConfigFactory.from_dict(config_dict)
def test_sql_statement(self):
# type: () -> None
"""
Test Extraction with empty result from query
"""
with patch.object(SQLAlchemyExtractor, '_get_connection'):
extractor = MSSQLMetadataExtractor()
extractor.init(self.conf)
self.assertTrue(MSSQLMetadataExtractor.DEFAULT_CLUSTER_NAME in extractor.sql_stmt)
class TestMSSQLMetadataExtractorTableCatalogEnabled(unittest.TestCase):
# test when USE_CATALOG_AS_CLUSTER_NAME is true (CLUSTER_KEY should be ignored)
def setUp(self):
# type: () -> None
logging.basicConfig(level=logging.INFO)
self.cluster_key = "not_master"
config_dict = {
MSSQLMetadataExtractor.CLUSTER_KEY: self.cluster_key,
'extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING):
'TEST_CONNECTION',
MSSQLMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME: True
}
self.conf = ConfigFactory.from_dict(config_dict)
def test_sql_statement(self):
# type: () -> None
"""
Test Extraction with empty result from query
"""
with patch.object(SQLAlchemyExtractor, '_get_connection'):
extractor = MSSQLMetadataExtractor()
extractor.init(self.conf)
self.assertTrue('DB_NAME()' in extractor.sql_stmt)
self.assertFalse(self.cluster_key in extractor.sql_stmt)
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment