Commit 4656f236 authored by Itay Levy's avatar Itay Levy Committed by Tao Feng

Added Extractor for Athena metadata (#105)

* Added Extractor for Athena metadata
Added Athena sample dag that uses Athena metadata extractor
Added Unit test for Athena metadata extractor

* Fixed flakey8 errors

* Fixed flakey8 error in the unit test

* Fixed typo changed Postgres to Athena
parent 2fa0a29d
import logging
from collections import namedtuple
from pyhocon import ConfigFactory, ConfigTree # noqa: F401
from typing import Iterator, Union, Dict, Any # noqa: F401
from databuilder import Scoped
from databuilder.extractor.base_extractor import Extractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.models.table_metadata import TableMetadata, ColumnMetadata
from itertools import groupby
TableKey = namedtuple('TableKey', ['schema_name', 'table_name'])
LOGGER = logging.getLogger(__name__)
class AthenaMetadataExtractor(Extractor):
"""
Extracts Athena table and column metadata from underlying meta store database using SQLAlchemyExtractor
"""
SQL_STATEMENT = """
SELECT
{catalog_source} as cluster, table_schema as schema_name, table_name as name, column_name as col_name,
data_type as col_type,ordinal_position as col_sort_order,
comment as col_description, extra_info as extras from information_schema.columns
{where_clause_suffix}
ORDER by cluster, schema_name, name, col_sort_order ;
"""
# CONFIG KEYS
WHERE_CLAUSE_SUFFIX_KEY = 'where_clause_suffix'
CATALOG_KEY = 'catalog_source'
# Default values
DEFAULT_CLUSTER_NAME = 'master'
DEFAULT_CONFIG = ConfigFactory.from_dict(
{WHERE_CLAUSE_SUFFIX_KEY: ' ', CATALOG_KEY: DEFAULT_CLUSTER_NAME}
)
def init(self, conf):
# type: (ConfigTree) -> None
conf = conf.with_fallback(AthenaMetadataExtractor.DEFAULT_CONFIG)
self._cluster = '{}'.format(conf.get_string(AthenaMetadataExtractor.CATALOG_KEY))
self.sql_stmt = AthenaMetadataExtractor.SQL_STATEMENT.format(
where_clause_suffix=conf.get_string(AthenaMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY),
catalog_source=self._cluster
)
LOGGER.info('SQL for Athena metadata: {}'.format(self.sql_stmt))
self._alchemy_extractor = SQLAlchemyExtractor()
sql_alch_conf = Scoped.get_scoped_conf(conf, self._alchemy_extractor.get_scope())\
.with_fallback(ConfigFactory.from_dict({SQLAlchemyExtractor.EXTRACT_SQL: self.sql_stmt}))
self._alchemy_extractor.init(sql_alch_conf)
self._extract_iter = None # type: Union[None, Iterator]
def extract(self):
# type: () -> Union[TableMetadata, None]
if not self._extract_iter:
self._extract_iter = self._get_extract_iter()
try:
return next(self._extract_iter)
except StopIteration:
return None
def get_scope(self):
# type: () -> str
return 'extractor.athena_metadata'
def _get_extract_iter(self):
# type: () -> Iterator[TableMetadata]
"""
Using itertools.groupby and raw level iterator, it groups to table and yields TableMetadata
:return:
"""
for key, group in groupby(self._get_raw_extract_iter(), self._get_table_key):
columns = []
for row in group:
last_row = row
columns.append(ColumnMetadata(row['col_name'],
row['extras'] if row['extras'] is not None else row['col_description'],
row['col_type'], row['col_sort_order']))
yield TableMetadata('athena', last_row['cluster'],
last_row['schema_name'],
last_row['name'],
'',
columns)
def _get_raw_extract_iter(self):
# type: () -> Iterator[Dict[str, Any]]
"""
Provides iterator of result row from SQLAlchemy extractor
:return:
"""
row = self._alchemy_extractor.extract()
while row:
yield row
row = self._alchemy_extractor.extract()
def _get_table_key(self, row):
# type: (Dict[str, Any]) -> Union[TableKey, None]
"""
Table key consists of schema and table name
:param row:
:return:
"""
if row:
return TableKey(schema_name=row['schema_name'], table_name=row['name'])
return None
import textwrap
from datetime import datetime, timedelta
import uuid
from elasticsearch import Elasticsearch
from airflow import DAG # noqa
from airflow import macros # noqa
from airflow.operators.python_operator import PythonOperator # noqa
from pyhocon import ConfigFactory
from databuilder.extractor.neo4j_search_data_extractor import Neo4jSearchDataExtractor
from databuilder.extractor.athena_metadata_extractor import AthenaMetadataExtractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher
from databuilder.extractor.neo4j_extractor import Neo4jExtractor
from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_elasticsearch_json_loader import FSElasticsearchJSONLoader
from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader
from databuilder.publisher import neo4j_csv_publisher
from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher
from databuilder.task.task import DefaultTask
from databuilder.transformer.base_transformer import NoopTransformer
dag_args = {
'concurrency': 10,
# One dagrun at a time
'max_active_runs': 1,
# 4AM, 4PM PST
'schedule_interval': '0 11 * * *',
'catchup': False
}
default_args = {
'owner': 'amundsen',
'start_date': datetime(2018, 6, 18),
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 3,
'priority_weight': 10,
'retry_delay': timedelta(minutes=5),
'execution_timeout': timedelta(minutes=120)
}
# NEO4J cluster endpoints
NEO4J_ENDPOINT = 'bolt://127.0.0.1:7687'
neo4j_endpoint = NEO4J_ENDPOINT
neo4j_user = 'neo4j'
neo4j_password = 'test'
es = Elasticsearch([
{'host': '127.0.0.1'},
])
# TODO: user provides a list of schema for indexing
SUPPORTED_SCHEMAS = ['sampledb']
# String format - ('schema1', schema2', .... 'schemaN')
SUPPORTED_SCHEMA_SQL_IN_CLAUSE = "('{schemas}')".format(schemas="', '".join(SUPPORTED_SCHEMAS))
OPTIONAL_TABLE_NAMES = ''
AWS_ACCESS = 'YOUR_ACCESS_KEY'
AWS_SECRET = 'YOUR_SECRET_KEY'
def connection_string():
access_key = AWS_ACCESS
secret = AWS_SECRET
host = 'athena.us-east-1.amazonaws.com'
extras = 's3_staging_dir=s3://aws-athena-query-results-032106861074-us-east-1/'
return "awsathena+rest://%s:%s@%s:443/?%s" % (access_key, secret, host, extras)
def create_table_extract_job(**kwargs):
where_clause_suffix = textwrap.dedent("""
where table_schema in {schemas}
""").format(schemas=SUPPORTED_SCHEMA_SQL_IN_CLAUSE)
tmp_folder = '/var/tmp/amundsen/table_metadata'
node_files_folder = '{tmp_folder}/nodes/'.format(tmp_folder=tmp_folder)
relationship_files_folder = '{tmp_folder}/relationships/'.format(tmp_folder=tmp_folder)
job_config = ConfigFactory.from_dict({
'extractor.athena_metadata.{}'.format(AthenaMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY):
where_clause_suffix,
'extractor.athena_metadata.extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING):
connection_string(),
'extractor.athena_metadata.{}'.format(AthenaMetadataExtractor.CATALOG_KEY): "'AwsDataCatalog'",
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.NODE_DIR_PATH):
node_files_folder,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.RELATION_DIR_PATH):
relationship_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NODE_FILES_DIR):
node_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.RELATION_FILES_DIR):
relationship_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_END_POINT_KEY):
neo4j_endpoint,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_USER):
neo4j_user,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_PASSWORD):
neo4j_password,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.JOB_PUBLISH_TAG):
'unique_tag', # should use unique tag here like {ds}
})
job = DefaultJob(conf=job_config,
task=DefaultTask(extractor=AthenaMetadataExtractor(), loader=FsNeo4jCSVLoader(),
transformer=NoopTransformer()),
publisher=Neo4jCsvPublisher())
job.launch()
def create_es_publisher_sample_job():
# loader saves data to this location and publisher reads it from here
extracted_search_data_path = '/var/tmp/amundsen/search_data.json'
task = DefaultTask(loader=FSElasticsearchJSONLoader(),
extractor=Neo4jSearchDataExtractor(),
transformer=NoopTransformer())
# elastic search client instance
elasticsearch_client = es
# unique name of new index in Elasticsearch
elasticsearch_new_index_key = 'tables' + str(uuid.uuid4())
# related to mapping type from /databuilder/publisher/elasticsearch_publisher.py#L38
elasticsearch_new_index_key_type = 'table'
# alias for Elasticsearch used in amundsensearchlibrary/search_service/config.py as an index
elasticsearch_index_alias = 'table_search_index'
job_config = ConfigFactory.from_dict({
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.GRAPH_URL_CONFIG_KEY): neo4j_endpoint,
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.MODEL_CLASS_CONFIG_KEY):
'databuilder.models.table_elasticsearch_document.TableESDocument',
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_USER): neo4j_user,
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_PW): neo4j_password,
'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY):
extracted_search_data_path,
'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY): 'w',
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_PATH_CONFIG_KEY):
extracted_search_data_path,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_MODE_CONFIG_KEY): 'r',
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY):
elasticsearch_client,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY):
elasticsearch_new_index_key,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_DOC_TYPE_CONFIG_KEY):
elasticsearch_new_index_key_type,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY):
elasticsearch_index_alias
})
job = DefaultJob(conf=job_config,
task=task,
publisher=ElasticsearchPublisher())
job.launch()
with DAG('amundsen_databuilder', default_args=default_args, **dag_args) as dag:
create_table_extract_job()
# create_table_extract_job = PythonOperator(
# task_id='create_table_extract_job',
# python_callable=create_table_extract_job
# )
create_es_index_job = PythonOperator(
task_id='create_es_publisher_sample_job',
python_callable=create_es_publisher_sample_job
)
create_es_publisher_sample_job()
......@@ -54,6 +54,7 @@ pytz==2018.4
antlr4-python2-runtime==4.7.1
statsd==3.2.1
retrying==1.3.3
PyAthena[SQLAlchemy]
# Python API client for google
# License: Apache Software License
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment