Commit 3c24cac3 authored by Jakub Hettler's avatar Jakub Hettler Committed by Jin Hyuk Chang

Add Snowflake Extractor (#119)

* Add Snowflake Extractor

* Move default value

* Add Snowflake extractor
parent e0f753d2
...@@ -129,6 +129,33 @@ job = DefaultJob( ...@@ -129,6 +129,33 @@ job = DefaultJob(
job.launch() job.launch()
``` ```
#### [SnowflakeMetadataExtractor](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/snowflake_metadata_extractor.py "SnowflakeMetadataExtractor")
An extractor that extracts table and column metadata including database, schema, table name, table description, column name and column description from a Snowflake database.
By default, the Snowflake database name is used as the cluter name. To override this, set `USE_CATALOG_AS_CLUSTER_NAME`
to `False`, and `CLUSTER_KEY` to what you wish to use as the cluster name.
By default, the Snowflake database is set to `PROD`. To override this, set `DATABASE_KEY`
to `WhateverNameOfYourDb`.
The `where_clause_suffix` below should define which schemas you'd like to query (see [the sample dag](https://github.com/lyft/amundsendatabuilder/blob/master/example/scripts/sample_snowflake_data_loader.py) for an example).
The SQL query driving the extraction is defined [here](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/snowflake_metadata_extractor.py)
```python
job_config = ConfigFactory.from_dict({
'extractor.postgres_metadata.{}'.format(PostgresMetadataExtractor.DATABASE_KEY): 'YourDbName',
'extractor.postgres_metadata.{}'.format(PostgresMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY): where_clause_suffix,
'extractor.postgres_metadata.{}'.format(PostgresMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME): True,
'extractor.postgres_metadata.extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): connection_string()})
job = DefaultJob(
conf=job_config,
task=DefaultTask(
extractor=SnowflakeMetadataExtractor(),
loader=AnyLoader()))
job.launch()
```
#### [Neo4jEsLastUpdatedExtractor](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/neo4j_es_last_updated_extractor.py "Neo4jEsLastUpdatedExtractor") #### [Neo4jEsLastUpdatedExtractor](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/neo4j_es_last_updated_extractor.py "Neo4jEsLastUpdatedExtractor")
An extractor that basically get current timestamp and passes it GenericExtractor. This extractor is basically being used to create timestamp for "Amundsen was last indexed on ..." in Amundsen web page's footer. An extractor that basically get current timestamp and passes it GenericExtractor. This extractor is basically being used to create timestamp for "Amundsen was last indexed on ..." in Amundsen web page's footer.
......
import logging
from collections import namedtuple
from pyhocon import ConfigFactory, ConfigTree # noqa: F401
from typing import Iterator, Union, Dict, Any # noqa: F401
from unidecode import unidecode
from databuilder import Scoped
from databuilder.extractor.base_extractor import Extractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.models.table_metadata import TableMetadata, ColumnMetadata
from itertools import groupby
TableKey = namedtuple('TableKey', ['schema_name', 'table_name'])
LOGGER = logging.getLogger(__name__)
class SnowflakeMetadataExtractor(Extractor):
"""
Extracts Snowflake table and column metadata from underlying meta store database using SQLAlchemyExtractor.
Requirements:
snowflake-connector-python
snowflake-sqlalchemy
"""
# SELECT statement from snowflake information_schema to extract table and column metadata
SQL_STATEMENT = """
SELECT
lower(c.column_name) AS col_name,
c.comment AS col_description,
lower(c.data_type) AS col_type,
lower(c.ordinal_position) AS col_sort_order,
lower(c.table_catalog) AS database,
lower({cluster_source}) AS cluster,
lower(c.table_schema) AS schema_name,
lower(c.table_name) AS name,
t.comment AS description,
decode(lower(t.table_type), 'view', 'true', 'false') AS is_view
FROM
{database}.INFORMATION_SCHEMA.COLUMNS AS c
LEFT JOIN
{database}.INFORMATION_SCHEMA.TABLES t
ON c.TABLE_NAME = t.TABLE_NAME
AND c.TABLE_SCHEMA = t.TABLE_SCHEMA
{where_clause_suffix};
"""
# CONFIG KEYS
WHERE_CLAUSE_SUFFIX_KEY = 'where_clause_suffix'
CLUSTER_KEY = 'cluster_key'
USE_CATALOG_AS_CLUSTER_NAME = 'use_catalog_as_cluster_name'
DATABASE_KEY = 'database_key'
# Default values
DEFAULT_CLUSTER_NAME = 'master'
DEFAULT_CONFIG = ConfigFactory.from_dict(
{WHERE_CLAUSE_SUFFIX_KEY: ' ',
CLUSTER_KEY: DEFAULT_CLUSTER_NAME,
USE_CATALOG_AS_CLUSTER_NAME: True,
DATABASE_KEY: 'prod'}
)
def init(self, conf):
# type: (ConfigTree) -> None
conf = conf.with_fallback(SnowflakeMetadataExtractor.DEFAULT_CONFIG)
self._cluster = '{}'.format(conf.get_string(SnowflakeMetadataExtractor.CLUSTER_KEY))
if conf.get_bool(SnowflakeMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME):
cluster_source = "c.table_catalog"
else:
cluster_source = "'{}'".format(self._cluster)
self._database = conf.get_string(SnowflakeMetadataExtractor.DATABASE_KEY).encode('utf-8', 'ignore')
self.sql_stmt = SnowflakeMetadataExtractor.SQL_STATEMENT.format(
where_clause_suffix=conf.get_string(SnowflakeMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY),
cluster_source=cluster_source,
database=self._database
)
LOGGER.info('SQL for snowflake metadata: {}'.format(self.sql_stmt))
self._alchemy_extractor = SQLAlchemyExtractor()
sql_alch_conf = Scoped.get_scoped_conf(conf, self._alchemy_extractor.get_scope())\
.with_fallback(ConfigFactory.from_dict({SQLAlchemyExtractor.EXTRACT_SQL: self.sql_stmt}))
self._alchemy_extractor.init(sql_alch_conf)
self._extract_iter = None # type: Union[None, Iterator]
def extract(self):
# type: () -> Union[TableMetadata, None]
if not self._extract_iter:
self._extract_iter = self._get_extract_iter()
try:
return next(self._extract_iter)
except StopIteration:
return None
def get_scope(self):
# type: () -> str
return 'extractor.snowflake'
def _get_extract_iter(self):
# type: () -> Iterator[TableMetadata]
"""
Using itertools.groupby and raw level iterator, it groups to table and yields TableMetadata
:return:
"""
for key, group in groupby(self._get_raw_extract_iter(), self._get_table_key):
columns = []
for row in group:
last_row = row
columns.append(ColumnMetadata(
row['col_name'],
unidecode(row['col_description']) if row['col_description'] else None,
row['col_type'],
row['col_sort_order'])
)
yield TableMetadata(self._database, last_row['cluster'],
last_row['schema_name'],
last_row['name'],
unidecode(last_row['description']) if last_row['description'] else None,
columns)
def _get_raw_extract_iter(self):
# type: () -> Iterator[Dict[str, Any]]
"""
Provides iterator of result row from SQLAlchemy extractor
:return:
"""
row = self._alchemy_extractor.extract()
while row:
yield row
row = self._alchemy_extractor.extract()
def _get_table_key(self, row):
# type: (Dict[str, Any]) -> Union[TableKey, None]
"""
Table key consists of schema and table name
:param row:
:return:
"""
if row:
return TableKey(schema_name=row['schema_name'], table_name=row['name'])
return None
"""
This is a example script which demo how to load data into neo4j without using Airflow DAG.
"""
import logging
from pyhocon import ConfigFactory
from urllib import unquote_plus
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.extractor.snowflake_metadata_extractor import SnowflakeMetadataExtractor
from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader
from databuilder.publisher import neo4j_csv_publisher
from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher
from databuilder.task.task import DefaultTask
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.INFO)
# Disable snowflake logging
logging.getLogger("snowflake.connector.network").disabled = True
SNOWFLAKE_CONN_STRING = 'snowflake://username:%s@account' % unquote_plus('password')
# replace localhost with docker host ip
NEO4J_ENDPOINT = 'bolt://localhost:7687'
neo4j_endpoint = NEO4J_ENDPOINT
neo4j_user = 'neo4j'
neo4j_password = 'test'
IGNORED_SCHEMAS = ['\'DVCORE\'', '\'INFORMATION_SCHEMA\'', '\'STAGE_ORACLE\'']
def create_sample_snowflake_job():
where_clause = "WHERE c.TABLE_SCHEMA not in ({0}) \
AND c.TABLE_SCHEMA not like 'STAGE_%' \
AND c.TABLE_SCHEMA not like 'HIST_%' \
AND c.TABLE_SCHEMA not like 'SNAP_%' \
AND lower(c.COLUMN_NAME) not like 'dw_%';".format(','.join(IGNORED_SCHEMAS))
tmp_folder = '/var/tmp/amundsen/{}'.format('tables')
node_files_folder = '{tmp_folder}/nodes'.format(tmp_folder=tmp_folder)
relationship_files_folder = '{tmp_folder}/relationships'.format(tmp_folder=tmp_folder)
sql_extractor = SnowflakeMetadataExtractor()
csv_loader = FsNeo4jCSVLoader()
task = DefaultTask(extractor=sql_extractor,
loader=csv_loader)
job_config = ConfigFactory.from_dict({
'extractor.snowflake.extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): SNOWFLAKE_CONN_STRING,
'extractor.snowflake.{}'.format(SnowflakeMetadataExtractor.DATABASE_KEY): 'YourSnowflakeDbName',
'extractor.snowflake.{}'.format(SnowflakeMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY): where_clause,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.NODE_DIR_PATH): node_files_folder,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.RELATION_DIR_PATH): relationship_files_folder,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.SHOULD_DELETE_CREATED_DIR): True,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.FORCE_CREATE_DIR): True,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NODE_FILES_DIR): node_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.RELATION_FILES_DIR): relationship_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_END_POINT_KEY): neo4j_endpoint,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_USER): neo4j_user,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_PASSWORD): neo4j_password,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.JOB_PUBLISH_TAG): 'unique_tag'
})
job = DefaultJob(conf=job_config,
task=task,
publisher=Neo4jCsvPublisher())
return job
if __name__ == "__main__":
job = create_sample_snowflake_job()
job.launch()
...@@ -65,3 +65,4 @@ google-auth-httplib2>=0.0.1 ...@@ -65,3 +65,4 @@ google-auth-httplib2>=0.0.1
google-auth>=1.0.0, <2.0.0dev google-auth>=1.0.0, <2.0.0dev
httplib2~=0.9.2 httplib2~=0.9.2
confluent-kafka==1.0.0 confluent-kafka==1.0.0
unidecode
from setuptools import setup, find_packages from setuptools import setup, find_packages
__version__ = '1.3.9' __version__ = '1.4.0'
setup( setup(
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment