Unverified Commit 0bac11bc authored by Alagappan's avatar Alagappan Committed by GitHub

feat: Add Snowflake table last updated timestamp extractor (#348)

Signed-off-by: 's avatarTao Feng <fengtao04@gmail.com>
parent 39a4316c
...@@ -314,6 +314,27 @@ job = DefaultJob( ...@@ -314,6 +314,27 @@ job = DefaultJob(
job.launch() job.launch()
``` ```
#### [SnowflakeTableLastUpdatedExtractor](https://github.com/amundsen-io/amundsendatabuilder/blob/master/databuilder/extractor/snowflake_table_last_updated_extractor.py "SnowflakeTableLastUpdatedExtractor")
An extractor that extracts table last updated timestamp from a Snowflake database.
It uses same configs as the `SnowflakeMetadataExtractor` described above.
The SQL query driving the extraction is defined [here](https://github.com/amundsen-io/amundsendatabuilder/blob/master/databuilder/extractor/snowflake_table_last_updated_extractor.py)
```python
job_config = ConfigFactory.from_dict({
'extractor.snowflake_table_last_updated.{}'.format(SnowflakeTableLastUpdatedExtractor.SNOWFLAKE_DATABASE_KEY): 'YourDbName',
'extractor.snowflake_table_last_updated.{}'.format(SnowflakeTableLastUpdatedExtractor.WHERE_CLAUSE_SUFFIX_KEY): where_clause_suffix,
'extractor.snowflake_table_last_updated.{}'.format(SnowflakeTableLastUpdatedExtractor.USE_CATALOG_AS_CLUSTER_NAME): True,
'extractor.snowflake_table_last_updated.extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): connection_string()})
job = DefaultJob(
conf=job_config,
task=DefaultTask(
extractor=SnowflakeTableLastUpdatedExtractor(),
loader=AnyLoader()))
job.launch()
```
#### [BigQueryMetadataExtractor](https://github.com/amundsen-io/amundsendatabuilder/blob/master/databuilder/extractor/bigquery_metadata_extractor.py "BigQuery Metdata Extractor") #### [BigQueryMetadataExtractor](https://github.com/amundsen-io/amundsendatabuilder/blob/master/databuilder/extractor/bigquery_metadata_extractor.py "BigQuery Metdata Extractor")
An extractor that extracts table and column metadata including database, schema, table name, table description, column name and column description from a Bigquery database. An extractor that extracts table and column metadata including database, schema, table name, table description, column name and column description from a Bigquery database.
......
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0
import logging
from pyhocon import ConfigFactory, ConfigTree
from typing import Iterator, Union
from databuilder import Scoped
from databuilder.extractor.base_extractor import Extractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.models.table_last_updated import TableLastUpdated
LOGGER = logging.getLogger(__name__)
class SnowflakeTableLastUpdatedExtractor(Extractor):
"""
Extracts Snowflake table last update time from INFORMATION_SCHEMA metadata tables using SQLAlchemyExtractor.
Requirements:
snowflake-connector-python
snowflake-sqlalchemy
"""
# 'last_altered' column in 'TABLES` metadata view under 'INFORMATION_SCHEMA' contains last time when the table was
# updated (both DML and DDL update). Below query fetches that column for each table.
SQL_STATEMENT = """
SELECT
lower({cluster_source}) AS cluster,
lower(t.table_schema) AS schema,
lower(t.table_name) AS table_name,
DATA_PART(EPOCH, t.last_altered) AS last_updated_time
FROM
{database}.INFORMATION_SCHEMA.TABLES t
{where_clause_suffix};
"""
# CONFIG KEYS
WHERE_CLAUSE_SUFFIX_KEY = 'where_clause_suffix'
CLUSTER_KEY = 'cluster_key'
USE_CATALOG_AS_CLUSTER_NAME = 'use_catalog_as_cluster_name'
# Database Key, used to identify the database type in the UI.
DATABASE_KEY = 'database_key'
# Snowflake Database Key, used to determine which Snowflake database to connect to.
SNOWFLAKE_DATABASE_KEY = 'snowflake_database'
# Default values
DEFAULT_CLUSTER_NAME = 'master'
DEFAULT_CONFIG = ConfigFactory.from_dict(
{WHERE_CLAUSE_SUFFIX_KEY: ' ',
CLUSTER_KEY: DEFAULT_CLUSTER_NAME,
USE_CATALOG_AS_CLUSTER_NAME: True,
DATABASE_KEY: 'snowflake',
SNOWFLAKE_DATABASE_KEY: 'prod'}
)
def init(self, conf: ConfigTree) -> None:
conf = conf.with_fallback(SnowflakeTableLastUpdatedExtractor.DEFAULT_CONFIG)
if conf.get_bool(SnowflakeTableLastUpdatedExtractor.USE_CATALOG_AS_CLUSTER_NAME):
cluster_source = "t.table_catalog"
else:
cluster_source = "'{}'".format(conf.get_string(SnowflakeTableLastUpdatedExtractor.CLUSTER_KEY))
self._database = conf.get_string(SnowflakeTableLastUpdatedExtractor.DATABASE_KEY)
self._snowflake_database = conf.get_string(SnowflakeTableLastUpdatedExtractor.SNOWFLAKE_DATABASE_KEY)
self.sql_stmt = SnowflakeTableLastUpdatedExtractor.SQL_STATEMENT.format(
where_clause_suffix=conf.get_string(SnowflakeTableLastUpdatedExtractor.WHERE_CLAUSE_SUFFIX_KEY),
cluster_source=cluster_source,
database=self._snowflake_database
)
LOGGER.info('SQL for snowflake table last updated timestamp: {}'.format(self.sql_stmt))
# use an sql_alchemy_extractor to execute sql
self._alchemy_extractor = SQLAlchemyExtractor()
sql_alch_conf = Scoped.get_scoped_conf(conf, self._alchemy_extractor.get_scope()) \
.with_fallback(ConfigFactory.from_dict({SQLAlchemyExtractor.EXTRACT_SQL: self.sql_stmt}))
self._alchemy_extractor.init(sql_alch_conf)
self._extract_iter: Union[None, Iterator] = None
def extract(self) -> Union[TableLastUpdated, None]:
if not self._extract_iter:
self._extract_iter = self._get_extract_iter()
try:
return next(self._extract_iter)
except StopIteration:
return None
def get_scope(self) -> str:
return 'extractor.snowflake_table_last_updated'
def _get_extract_iter(self) -> Iterator[TableLastUpdated]:
"""
Provides iterator of result row from SQLAlchemy extractor
"""
tbl_last_updated_row = self._alchemy_extractor.extract()
while tbl_last_updated_row:
yield TableLastUpdated(table_name=tbl_last_updated_row['table_name'],
last_updated_time_epoch=tbl_last_updated_row['last_updated_time'],
schema=tbl_last_updated_row['schema'],
db=self._database,
cluster=tbl_last_updated_row['cluster'])
tbl_last_updated_row = self._alchemy_extractor.extract()
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0
import unittest
from mock import patch, MagicMock
from pyhocon import ConfigFactory
from databuilder.extractor.snowflake_table_last_updated_extractor import SnowflakeTableLastUpdatedExtractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.models.table_last_updated import TableLastUpdated
class TestSnowflakeTableLastUpdatedExtractor(unittest.TestCase):
def setUp(self) -> None:
config_dict = {
'extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING):
'TEST_CONNECTION',
'extractor.snowflake_table_last_updated.{}'.format(SnowflakeTableLastUpdatedExtractor.CLUSTER_KEY):
'MY_CLUSTER',
'extractor.snowflake_table_last_updated.{}'.format(
SnowflakeTableLastUpdatedExtractor.USE_CATALOG_AS_CLUSTER_NAME):
False,
'extractor.snowflake_table_last_updated.{}'.format(
SnowflakeTableLastUpdatedExtractor.SNOWFLAKE_DATABASE_KEY):
'prod'
}
self.conf = ConfigFactory.from_dict(config_dict)
def test_extraction_with_empty_query_result(self) -> None:
"""
Test Extraction with empty result from query
"""
with patch.object(SQLAlchemyExtractor, '_get_connection'):
extractor = SnowflakeTableLastUpdatedExtractor()
extractor.init(self.conf)
results = extractor.extract()
self.assertIsNone(results)
def test_extraction_with_single_result(self) -> None:
"""
Test Extraction with default cluster and database and with one table as result
"""
with patch.object(SQLAlchemyExtractor, '_get_connection') as mock_connection:
connection = MagicMock()
mock_connection.return_value = connection
sql_execute = MagicMock()
connection.execute = sql_execute
sql_execute.return_value = [
{'schema': 'test_schema',
'table_name': 'test_table',
'last_updated_time': 1000,
'cluster': self.conf['extractor.snowflake_table_last_updated.{}'.format(
SnowflakeTableLastUpdatedExtractor.CLUSTER_KEY)],
}
]
extractor = SnowflakeTableLastUpdatedExtractor()
extractor.init(self.conf)
actual = extractor.extract()
expected = TableLastUpdated(schema='test_schema', table_name='test_table',
last_updated_time_epoch=1000,
db='snowflake', cluster='MY_CLUSTER')
self.assertEqual(expected.__repr__(), actual.__repr__())
self.assertIsNone(extractor.extract())
def test_extraction_with_multiple_result(self) -> None:
"""
Test Extraction with default cluster and database and with multiple tables as result
"""
with patch.object(SQLAlchemyExtractor, '_get_connection') as mock_connection:
connection = MagicMock()
mock_connection.return_value = connection
sql_execute = MagicMock()
connection.execute = sql_execute
default_cluster = self.conf['extractor.snowflake_table_last_updated.{}'.format(
SnowflakeTableLastUpdatedExtractor.CLUSTER_KEY)]
table = {'schema': 'test_schema1',
'table_name': 'test_table1',
'last_updated_time': 1000,
'cluster': default_cluster
}
table1 = {'schema': 'test_schema1',
'table_name': 'test_table2',
'last_updated_time': 2000,
'cluster': default_cluster
}
table2 = {'schema': 'test_schema2',
'table_name': 'test_table3',
'last_updated_time': 3000,
'cluster': default_cluster
}
sql_execute.return_value = [table, table1, table2]
extractor = SnowflakeTableLastUpdatedExtractor()
extractor.init(self.conf)
expected = TableLastUpdated(schema='test_schema1', table_name='test_table1',
last_updated_time_epoch=1000,
db='snowflake', cluster='MY_CLUSTER')
self.assertEqual(expected.__repr__(), extractor.extract().__repr__())
expected = TableLastUpdated(schema='test_schema1', table_name='test_table2',
last_updated_time_epoch=2000,
db='snowflake', cluster='MY_CLUSTER')
self.assertEqual(expected.__repr__(), extractor.extract().__repr__())
expected = TableLastUpdated(schema='test_schema2', table_name='test_table3',
last_updated_time_epoch=3000,
db='snowflake', cluster='MY_CLUSTER')
self.assertEqual(expected.__repr__(), extractor.extract().__repr__())
self.assertIsNone(extractor.extract())
class TestSnowflakeTableLastUpdatedExtractorWithWhereClause(unittest.TestCase):
"""
Test 'where_clause' config key in extractor
"""
def setUp(self) -> None:
self.where_clause_suffix = """
where table_schema in ('public') and table_name = 'movies'
"""
config_dict = {
SnowflakeTableLastUpdatedExtractor.WHERE_CLAUSE_SUFFIX_KEY: self.where_clause_suffix,
'extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING):
'TEST_CONNECTION'
}
self.conf = ConfigFactory.from_dict(config_dict)
def test_sql_statement(self) -> None:
"""
test where clause in extractor sql statement
"""
with patch.object(SQLAlchemyExtractor, '_get_connection'):
extractor = SnowflakeTableLastUpdatedExtractor()
extractor.init(self.conf)
self.assertTrue(self.where_clause_suffix in extractor.sql_stmt)
class TestSnowflakeTableLastUpdatedExtractorClusterKeyNoTableCatalog(unittest.TestCase):
"""
Test with 'USE_CATALOG_AS_CLUSTER_NAME' is false and 'CLUSTER_KEY' is specified
"""
def setUp(self) -> None:
self.cluster_key = "not_master"
config_dict = {
SnowflakeTableLastUpdatedExtractor.CLUSTER_KEY: self.cluster_key,
'extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING):
'TEST_CONNECTION',
SnowflakeTableLastUpdatedExtractor.USE_CATALOG_AS_CLUSTER_NAME: False
}
self.conf = ConfigFactory.from_dict(config_dict)
def test_sql_statement(self) -> None:
"""
Test cluster_key in extractor sql stmt
"""
with patch.object(SQLAlchemyExtractor, '_get_connection'):
extractor = SnowflakeTableLastUpdatedExtractor()
extractor.init(self.conf)
self.assertTrue(self.cluster_key in extractor.sql_stmt)
class TestSnowflakeTableLastUpdatedExtractorDefaultSnowflakeDatabaseKey(unittest.TestCase):
"""
Test with SNOWFLAKE_DATABASE_KEY config specified
"""
def setUp(self) -> None:
self.snowflake_database_key = "not_prod"
config_dict = {
SnowflakeTableLastUpdatedExtractor.SNOWFLAKE_DATABASE_KEY: self.snowflake_database_key,
'extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING):
'TEST_CONNECTION'
}
self.conf = ConfigFactory.from_dict(config_dict)
def test_sql_statement(self) -> None:
"""
Test SNOWFLAKE_DATABASE_KEY in extractor sql stmt
"""
with patch.object(SQLAlchemyExtractor, '_get_connection'):
extractor = SnowflakeTableLastUpdatedExtractor()
extractor.init(self.conf)
self.assertTrue(self.snowflake_database_key in extractor.sql_stmt)
class TestSnowflakeTableLastUpdatedExtractorDefaultDatabaseKey(unittest.TestCase):
"""
Test with DATABASE_KEY config specified
"""
def setUp(self) -> None:
self.database_key = 'not_snowflake'
config_dict = {
SnowflakeTableLastUpdatedExtractor.DATABASE_KEY: self.database_key,
'extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING):
'TEST_CONNECTION'
}
self.conf = ConfigFactory.from_dict(config_dict)
def test_sql_statement(self) -> None:
"""
Test DATABASE_KEY in extractor sql stmt
"""
with patch.object(SQLAlchemyExtractor, '_get_connection'):
extractor = SnowflakeTableLastUpdatedExtractor()
extractor.init(self.conf)
self.assertFalse(self.database_key in extractor.sql_stmt)
def test_extraction_with_database_specified(self) -> None:
"""
Test DATABASE_KEY in extractor result
"""
with patch.object(SQLAlchemyExtractor, '_get_connection') as mock_connection:
connection = MagicMock()
mock_connection.return_value = connection
sql_execute = MagicMock()
connection.execute = sql_execute
sql_execute.return_value = [
{'schema': 'test_schema',
'table_name': 'test_table',
'last_updated_time': 1000,
'cluster': 'MY_CLUSTER',
}
]
extractor = SnowflakeTableLastUpdatedExtractor()
extractor.init(self.conf)
actual = extractor.extract()
expected = TableLastUpdated(schema='test_schema', table_name='test_table',
last_updated_time_epoch=1000,
db=self.database_key, cluster='MY_CLUSTER')
self.assertEqual(expected.__repr__(), actual.__repr__())
self.assertIsNone(extractor.extract())
class TestSnowflakeTableLastUpdatedExtractorNoClusterKeyNoTableCatalog(unittest.TestCase):
"""
Test when USE_CATALOG_AS_CLUSTER_NAME is false and CLUSTER_KEY is NOT specified
"""
def setUp(self) -> None:
config_dict = {
'extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING):
'TEST_CONNECTION',
SnowflakeTableLastUpdatedExtractor.USE_CATALOG_AS_CLUSTER_NAME: False
}
self.conf = ConfigFactory.from_dict(config_dict)
def test_sql_statement(self) -> None:
"""
Test cluster name in extract sql stmt
"""
with patch.object(SQLAlchemyExtractor, '_get_connection'):
extractor = SnowflakeTableLastUpdatedExtractor()
extractor.init(self.conf)
self.assertTrue(SnowflakeTableLastUpdatedExtractor.DEFAULT_CLUSTER_NAME in extractor.sql_stmt)
class TestSnowflakeTableLastUpdatedExtractorTableCatalogEnabled(unittest.TestCase):
"""
Test when USE_CATALOG_AS_CLUSTER_NAME is true (CLUSTER_KEY should be ignored)
"""
def setUp(self) -> None:
self.cluster_key = "not_master"
config_dict = {
SnowflakeTableLastUpdatedExtractor.CLUSTER_KEY: self.cluster_key,
'extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING):
'TEST_CONNECTION',
SnowflakeTableLastUpdatedExtractor.USE_CATALOG_AS_CLUSTER_NAME: True
}
self.conf = ConfigFactory.from_dict(config_dict)
def test_sql_statement(self) -> None:
"""
Ensure catalog is used as cluster in extract sql stmt
"""
with patch.object(SQLAlchemyExtractor, '_get_connection'):
extractor = SnowflakeTableLastUpdatedExtractor()
extractor.init(self.conf)
self.assertTrue('table_catalog' in extractor.sql_stmt)
self.assertFalse(self.cluster_key in extractor.sql_stmt)
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment