Unverified Commit 4113cfd3 authored by Nathan Lawrence's avatar Nathan Lawrence Committed by GitHub

feat: Create a RedshiftMetadataExtractor that supports late binding views (#356)

Signed-off-by: 's avatarNathan Lawrence <nathanlawrence@asana.com>
Co-authored-by: 's avatarNathan Lawrence <nathanlawrence@Nathan-Lawrences-MacBook-Pro.local>
parent 871a1763
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0
import abc
import logging
from collections import namedtuple
from pyhocon import ConfigFactory, ConfigTree
from typing import Iterator, Union, Dict, Any
from databuilder import Scoped
from databuilder.extractor.base_extractor import Extractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.models.table_metadata import TableMetadata, ColumnMetadata
from itertools import groupby
TableKey = namedtuple('TableKey', ['schema', 'table_name'])
LOGGER = logging.getLogger(__name__)
class BasePostgresMetadataExtractor(Extractor):
"""
Extracts Postgres table and column metadata from underlying meta store database using SQLAlchemyExtractor
"""
# CONFIG KEYS
WHERE_CLAUSE_SUFFIX_KEY = 'where_clause_suffix'
CLUSTER_KEY = 'cluster_key'
USE_CATALOG_AS_CLUSTER_NAME = 'use_catalog_as_cluster_name'
DATABASE_KEY = 'database_key'
# Default values
DEFAULT_CLUSTER_NAME = 'master'
DEFAULT_CONFIG = ConfigFactory.from_dict(
{WHERE_CLAUSE_SUFFIX_KEY: ' ', CLUSTER_KEY: DEFAULT_CLUSTER_NAME, USE_CATALOG_AS_CLUSTER_NAME: True}
)
@abc.abstractmethod
def get_sql_statement(self, use_catalog_as_cluster_name: bool, where_clause_suffix: str) -> Any:
"""
:return: Provides a record or None if no more to extract
"""
return None
def init(self, conf: ConfigTree) -> None:
conf = conf.with_fallback(BasePostgresMetadataExtractor.DEFAULT_CONFIG)
self._cluster = '{}'.format(conf.get_string(BasePostgresMetadataExtractor.CLUSTER_KEY))
self._database = conf.get_string(BasePostgresMetadataExtractor.DATABASE_KEY, default='postgres')
self.sql_stmt = self.get_sql_statement(
use_catalog_as_cluster_name=conf.get_bool(BasePostgresMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME),
where_clause_suffix=conf.get_string(BasePostgresMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY),
)
self._alchemy_extractor = SQLAlchemyExtractor()
sql_alch_conf = Scoped.get_scoped_conf(conf, self._alchemy_extractor.get_scope())\
.with_fallback(ConfigFactory.from_dict({SQLAlchemyExtractor.EXTRACT_SQL: self.sql_stmt}))
self.sql_stmt = sql_alch_conf.get_string(SQLAlchemyExtractor.EXTRACT_SQL)
LOGGER.info('SQL for postgres metadata: {}'.format(self.sql_stmt))
self._alchemy_extractor.init(sql_alch_conf)
self._extract_iter: Union[None, Iterator] = None
def extract(self) -> Union[TableMetadata, None]:
if not self._extract_iter:
self._extract_iter = self._get_extract_iter()
try:
return next(self._extract_iter)
except StopIteration:
return None
def _get_extract_iter(self) -> Iterator[TableMetadata]:
"""
Using itertools.groupby and raw level iterator, it groups to table and yields TableMetadata
:return:
"""
for key, group in groupby(self._get_raw_extract_iter(), self._get_table_key):
columns = []
for row in group:
last_row = row
columns.append(ColumnMetadata(row['col_name'], row['col_description'],
row['col_type'], row['col_sort_order']))
yield TableMetadata(self._database, last_row['cluster'],
last_row['schema'],
last_row['name'],
last_row['description'],
columns)
def _get_raw_extract_iter(self) -> Iterator[Dict[str, Any]]:
"""
Provides iterator of result row from SQLAlchemy extractor
:return:
"""
row = self._alchemy_extractor.extract()
while row:
yield row
row = self._alchemy_extractor.extract()
def _get_table_key(self, row: Dict[str, Any]) -> Union[TableKey, None]:
"""
Table key consists of schema and table name
:param row:
:return:
"""
if row:
return TableKey(schema=row['schema'], table_name=row['name'])
return None
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0
import logging
from collections import namedtuple
from pyhocon import ConfigFactory, ConfigTree # noqa: F401
from typing import Iterator, Union, Dict, Any # noqa: F401
from pyhocon import ConfigFactory, ConfigTree
from typing import Iterator, Union, Dict, Any
from databuilder.extractor.base_postgres_metadata_extractor import BasePostgresMetadataExtractor
from databuilder import Scoped
from databuilder.extractor.base_extractor import Extractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.models.table_metadata import TableMetadata, ColumnMetadata
from itertools import groupby
TableKey = namedtuple('TableKey', ['schema', 'table_name'])
LOGGER = logging.getLogger(__name__)
class PostgresMetadataExtractor(Extractor):
class PostgresMetadataExtractor(BasePostgresMetadataExtractor):
"""
Extracts Postgres table and column metadata from underlying meta store database using SQLAlchemyExtractor
"""
# SELECT statement from postgres information_schema to extract table and column metadata
SQL_STATEMENT = """
SELECT
{cluster_source} as cluster, c.table_schema as schema, c.table_name as name, pgtd.description as description
,c.column_name as col_name, c.data_type as col_type
, pgcd.description as col_description, ordinal_position as col_sort_order
FROM INFORMATION_SCHEMA.COLUMNS c
INNER JOIN
pg_catalog.pg_statio_all_tables as st on c.table_schema=st.schemaname and c.table_name=st.relname
LEFT JOIN
pg_catalog.pg_description pgcd on pgcd.objoid=st.relid and pgcd.objsubid=c.ordinal_position
LEFT JOIN
pg_catalog.pg_description pgtd on pgtd.objoid=st.relid and pgtd.objsubid=0
{where_clause_suffix}
ORDER by cluster, schema, name, col_sort_order ;
"""
# CONFIG KEYS
WHERE_CLAUSE_SUFFIX_KEY = 'where_clause_suffix'
CLUSTER_KEY = 'cluster_key'
USE_CATALOG_AS_CLUSTER_NAME = 'use_catalog_as_cluster_name'
DATABASE_KEY = 'database_key'
# Default values
DEFAULT_CLUSTER_NAME = 'master'
DEFAULT_CONFIG = ConfigFactory.from_dict(
{WHERE_CLAUSE_SUFFIX_KEY: ' ', CLUSTER_KEY: DEFAULT_CLUSTER_NAME, USE_CATALOG_AS_CLUSTER_NAME: True}
)
def init(self, conf: ConfigTree) -> None:
conf = conf.with_fallback(PostgresMetadataExtractor.DEFAULT_CONFIG)
self._cluster = '{}'.format(conf.get_string(PostgresMetadataExtractor.CLUSTER_KEY))
if conf.get_bool(PostgresMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME):
def get_sql_statement(self, use_catalog_as_cluster_name, where_clause_suffix):
# type: (bool, str) -> str
if use_catalog_as_cluster_name:
cluster_source = "c.table_catalog"
else:
cluster_source = "'{}'".format(self._cluster)
self._database = conf.get_string(PostgresMetadataExtractor.DATABASE_KEY, default='postgres')
self.sql_stmt = PostgresMetadataExtractor.SQL_STATEMENT.format(
where_clause_suffix=conf.get_string(PostgresMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY),
cluster_source=cluster_source
return """
SELECT
{cluster_source} as cluster, c.table_schema as schema, c.table_name as name, pgtd.description as description
,c.column_name as col_name, c.data_type as col_type
, pgcd.description as col_description, ordinal_position as col_sort_order
FROM INFORMATION_SCHEMA.COLUMNS c
INNER JOIN
pg_catalog.pg_statio_all_tables as st on c.table_schema=st.schemaname and c.table_name=st.relname
LEFT JOIN
pg_catalog.pg_description pgcd on pgcd.objoid=st.relid and pgcd.objsubid=c.ordinal_position
LEFT JOIN
pg_catalog.pg_description pgtd on pgtd.objoid=st.relid and pgtd.objsubid=0
{where_clause_suffix}
ORDER by cluster, schema, name, col_sort_order ;
""".format(
cluster_source=cluster_source,
where_clause_suffix=where_clause_suffix,
)
self._alchemy_extractor = SQLAlchemyExtractor()
sql_alch_conf = Scoped.get_scoped_conf(conf, self._alchemy_extractor.get_scope())\
.with_fallback(ConfigFactory.from_dict({SQLAlchemyExtractor.EXTRACT_SQL: self.sql_stmt}))
self.sql_stmt = sql_alch_conf.get_string(SQLAlchemyExtractor.EXTRACT_SQL)
LOGGER.info('SQL for postgres metadata: {}'.format(self.sql_stmt))
self._alchemy_extractor.init(sql_alch_conf)
self._extract_iter: Union[None, Iterator] = None
def extract(self) -> Union[TableMetadata, None]:
if not self._extract_iter:
self._extract_iter = self._get_extract_iter()
try:
return next(self._extract_iter)
except StopIteration:
return None
def get_scope(self) -> str:
def get_scope(self):
# type: () -> str
return 'extractor.postgres_metadata'
def _get_extract_iter(self) -> Iterator[TableMetadata]:
"""
Using itertools.groupby and raw level iterator, it groups to table and yields TableMetadata
:return:
"""
for key, group in groupby(self._get_raw_extract_iter(), self._get_table_key):
columns = []
for row in group:
last_row = row
columns.append(ColumnMetadata(row['col_name'], row['col_description'],
row['col_type'], row['col_sort_order']))
yield TableMetadata(self._database, last_row['cluster'],
last_row['schema'],
last_row['name'],
last_row['description'],
columns)
def _get_raw_extract_iter(self) -> Iterator[Dict[str, Any]]:
"""
Provides iterator of result row from SQLAlchemy extractor
:return:
"""
row = self._alchemy_extractor.extract()
while row:
yield row
row = self._alchemy_extractor.extract()
def _get_table_key(self, row: Dict[str, Any]) -> Union[TableKey, None]:
"""
Table key consists of schema and table name
:param row:
:return:
"""
if row:
return TableKey(schema=row['schema'], table_name=row['name'])
return None
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0
from pyhocon import ConfigFactory, ConfigTree # noqa: F401
from typing import Iterator, Union, Dict, Any # noqa: F401
from databuilder.extractor.base_postgres_metadata_extractor import BasePostgresMetadataExtractor
class RedshiftMetadataExtractor(BasePostgresMetadataExtractor):
"""
Extracts Redshift table and column metadata from underlying meta store database using SQLAlchemyExtractor
This differs from the PostgresMetadataExtractor because in order to support Redshift's late binding views,
we need to join the INFORMATION_SCHEMA data against the function PG_GET_LATE_BINDING_VIEW_COLS().
"""
def get_sql_statement(self, use_catalog_as_cluster_name, where_clause_suffix):
# type: (bool, str) -> str
if use_catalog_as_cluster_name:
cluster_source = "CURRENT_DATABASE()"
else:
cluster_source = "'{}'".format(self._cluster)
return """
SELECT
*
FROM (
SELECT
{cluster_source} as cluster,
c.table_schema as schema,
c.table_name as name,
pgtd.description as description,
c.column_name as col_name,
c.data_type as col_type,
pgcd.description as col_description,
ordinal_position as col_sort_order
FROM INFORMATION_SCHEMA.COLUMNS c
INNER JOIN
pg_catalog.pg_statio_all_tables as st on c.table_schema=st.schemaname and c.table_name=st.relname
LEFT JOIN
pg_catalog.pg_description pgcd on pgcd.objoid=st.relid and pgcd.objsubid=c.ordinal_position
LEFT JOIN
pg_catalog.pg_description pgtd on pgtd.objoid=st.relid and pgtd.objsubid=0
UNION
SELECT
{cluster_source} as cluster,
view_schema as schema,
view_name as name,
NULL as description,
column_name as col_name,
data_type as col_type,
NULL as col_description,
ordinal_position as col_sort_order
FROM
PG_GET_LATE_BINDING_VIEW_COLS()
COLS(view_schema NAME, view_name NAME, column_name NAME, data_type VARCHAR, ordinal_position INT)
)
{where_clause_suffix}
ORDER by cluster, schema, name, col_sort_order ;
""".format(
cluster_source=cluster_source,
where_clause_suffix=where_clause_suffix,
)
def get_scope(self):
# type: () -> str
return 'extractor.redshift_metadata'
......@@ -20,12 +20,9 @@ class TestPostgresMetadataExtractor(unittest.TestCase):
config_dict = {
'extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING):
'TEST_CONNECTION',
'extractor.postgres_metadata.{}'.format(PostgresMetadataExtractor.CLUSTER_KEY):
'MY_CLUSTER',
'extractor.postgres_metadata.{}'.format(PostgresMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME):
False,
'extractor.postgres_metadata.{}'.format(PostgresMetadataExtractor.DATABASE_KEY):
'postgres'
PostgresMetadataExtractor.CLUSTER_KEY: 'MY_CLUSTER',
PostgresMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME: False,
PostgresMetadataExtractor.DATABASE_KEY: 'postgres'
}
self.conf = ConfigFactory.from_dict(config_dict)
......@@ -50,7 +47,7 @@ class TestPostgresMetadataExtractor(unittest.TestCase):
'name': 'test_table',
'description': 'a table for testing',
'cluster':
self.conf['extractor.postgres_metadata.{}'.format(PostgresMetadataExtractor.CLUSTER_KEY)]
self.conf[PostgresMetadataExtractor.CLUSTER_KEY]
}
sql_execute.return_value = [
......@@ -110,21 +107,21 @@ class TestPostgresMetadataExtractor(unittest.TestCase):
'name': 'test_table1',
'description': 'test table 1',
'cluster':
self.conf['extractor.postgres_metadata.{}'.format(PostgresMetadataExtractor.CLUSTER_KEY)]
self.conf[PostgresMetadataExtractor.CLUSTER_KEY]
}
table1 = {'schema': 'test_schema1',
'name': 'test_table2',
'description': 'test table 2',
'cluster':
self.conf['extractor.postgres_metadata.{}'.format(PostgresMetadataExtractor.CLUSTER_KEY)]
self.conf[PostgresMetadataExtractor.CLUSTER_KEY]
}
table2 = {'schema': 'test_schema2',
'name': 'test_table3',
'description': 'test table 3',
'cluster':
self.conf['extractor.postgres_metadata.{}'.format(PostgresMetadataExtractor.CLUSTER_KEY)]
self.conf[PostgresMetadataExtractor.CLUSTER_KEY]
}
sql_execute.return_value = [
......@@ -184,8 +181,7 @@ class TestPostgresMetadataExtractor(unittest.TestCase):
extractor.init(self.conf)
expected = TableMetadata('postgres',
self.conf['extractor.postgres_metadata.{}'.format(
PostgresMetadataExtractor.CLUSTER_KEY)],
self.conf[PostgresMetadataExtractor.CLUSTER_KEY],
'test_schema1', 'test_table1', 'test table 1',
[ColumnMetadata('col_id1', 'description of col_id1', 'bigint', 0),
ColumnMetadata('col_id2', 'description of col_id2', 'bigint', 1),
......@@ -196,16 +192,14 @@ class TestPostgresMetadataExtractor(unittest.TestCase):
self.assertEqual(expected.__repr__(), extractor.extract().__repr__())
expected = TableMetadata('postgres',
self.conf['extractor.postgres_metadata.{}'.format(
PostgresMetadataExtractor.CLUSTER_KEY)],
self.conf[PostgresMetadataExtractor.CLUSTER_KEY],
'test_schema1', 'test_table2', 'test table 2',
[ColumnMetadata('col_name', 'description of col_name', 'varchar', 0),
ColumnMetadata('col_name2', 'description of col_name2', 'varchar', 1)])
self.assertEqual(expected.__repr__(), extractor.extract().__repr__())
expected = TableMetadata('postgres',
self.conf['extractor.postgres_metadata.{}'.format(
PostgresMetadataExtractor.CLUSTER_KEY)],
self.conf[PostgresMetadataExtractor.CLUSTER_KEY],
'test_schema2', 'test_table3', 'test table 3',
[ColumnMetadata('col_id3', 'description of col_id3', 'varchar', 0),
ColumnMetadata('col_name3', 'description of col_name3',
......
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0
import logging
import unittest
from mock import patch, MagicMock
from pyhocon import ConfigFactory
from typing import Any, Dict
from databuilder.extractor.redshift_metadata_extractor import RedshiftMetadataExtractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.models.table_metadata import TableMetadata, ColumnMetadata
class TestRedshiftMetadataExtractor(unittest.TestCase):
def setUp(self) -> None:
logging.basicConfig(level=logging.INFO)
config_dict = {
'extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING):
'TEST_CONNECTION',
RedshiftMetadataExtractor.CLUSTER_KEY: 'MY_CLUSTER',
RedshiftMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME: False,
RedshiftMetadataExtractor.DATABASE_KEY: 'redshift'
}
self.conf = ConfigFactory.from_dict(config_dict)
def test_extraction_with_empty_query_result(self) -> None:
"""
Test Extraction with empty result from query
"""
with patch.object(SQLAlchemyExtractor, '_get_connection'):
extractor = RedshiftMetadataExtractor()
extractor.init(self.conf)
results = extractor.extract()
self.assertEqual(results, None)
def test_extraction_with_single_result(self) -> None:
with patch.object(SQLAlchemyExtractor, '_get_connection') as mock_connection:
connection = MagicMock()
mock_connection.return_value = connection
sql_execute = MagicMock()
connection.execute = sql_execute
table = {'schema': 'test_schema',
'name': 'test_table',
'description': 'a table for testing',
'cluster':
self.conf[RedshiftMetadataExtractor.CLUSTER_KEY]
}
sql_execute.return_value = [
self._union(
{'col_name': 'col_id1',
'col_type': 'bigint',
'col_description': 'description of id1',
'col_sort_order': 0}, table),
self._union(
{'col_name': 'col_id2',
'col_type': 'bigint',
'col_description': 'description of id2',
'col_sort_order': 1}, table),
self._union(
{'col_name': 'is_active',
'col_type': 'boolean',
'col_description': None,
'col_sort_order': 2}, table),
self._union(
{'col_name': 'source',
'col_type': 'varchar',
'col_description': 'description of source',
'col_sort_order': 3}, table),
self._union(
{'col_name': 'etl_created_at',
'col_type': 'timestamp',
'col_description': 'description of etl_created_at',
'col_sort_order': 4}, table),
self._union(
{'col_name': 'ds',
'col_type': 'varchar',
'col_description': None,
'col_sort_order': 5}, table)
]
extractor = RedshiftMetadataExtractor()
extractor.init(self.conf)
actual = extractor.extract()
expected = TableMetadata('redshift', 'MY_CLUSTER', 'test_schema', 'test_table', 'a table for testing',
[ColumnMetadata('col_id1', 'description of id1', 'bigint', 0),
ColumnMetadata('col_id2', 'description of id2', 'bigint', 1),
ColumnMetadata('is_active', None, 'boolean', 2),
ColumnMetadata('source', 'description of source', 'varchar', 3),
ColumnMetadata('etl_created_at', 'description of etl_created_at', 'timestamp', 4),
ColumnMetadata('ds', None, 'varchar', 5)])
self.assertEqual(expected.__repr__(), actual.__repr__())
self.assertIsNone(extractor.extract())
def _union(self,
target: Dict[Any, Any],
extra: Dict[Any, Any]) -> Dict[Any, Any]:
target.update(extra)
return target
class TestRedshiftMetadataExtractorWithWhereClause(unittest.TestCase):
def setUp(self) -> None:
logging.basicConfig(level=logging.INFO)
self.where_clause_suffix = """
where table_schema in ('public') and table_name = 'movies'
"""
config_dict = {
RedshiftMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY: self.where_clause_suffix,
'extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING):
'TEST_CONNECTION'
}
self.conf = ConfigFactory.from_dict(config_dict)
def test_sql_statement(self) -> None:
"""
Test Extraction with empty result from query
"""
with patch.object(SQLAlchemyExtractor, '_get_connection'):
extractor = RedshiftMetadataExtractor()
extractor.init(self.conf)
self.assertTrue(self.where_clause_suffix in extractor.sql_stmt)
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment