Unverified Commit 63f239fd authored by Josh Howard's avatar Josh Howard Committed by GitHub

feat: Added Dremio extractor (#377)

Signed-off-by: 's avatarJosh Howard <josh.t.howard@ey.com>
Co-authored-by: 's avatarJosh Howard <josh.t.howard@ey.com>
parent 6802ab13
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0
from collections import namedtuple
from itertools import groupby
import logging
from typing import Iterator, Union, Dict, Any
from pyhocon import ConfigFactory, ConfigTree
from pyodbc import connect
from databuilder.extractor.base_extractor import Extractor
from databuilder.models.table_metadata import TableMetadata, ColumnMetadata
TableKey = namedtuple('TableKey', ['schema', 'table_name'])
LOGGER = logging.getLogger(__name__)
class DremioMetadataExtractor(Extractor):
'''
Extracts Dremio table and column metadata from underlying INFORMATION_SCHEMA table
Requirements:
pyodbc & Dremio driver
'''
SQL_STATEMENT = '''
SELECT
nested_1.COLUMN_NAME AS col_name,
CAST(NULL AS VARCHAR) AS col_description,
nested_1.DATA_TYPE AS col_type,
nested_1.ORDINAL_POSITION AS col_sort_order,
nested_1.TABLE_CATALOG AS database,
'{cluster}' AS cluster,
nested_1.TABLE_SCHEMA AS schema,
nested_1.TABLE_NAME AS name,
CAST(NULL AS VARCHAR) AS description,
CASE WHEN nested_0.TABLE_TYPE='VIEW' THEN TRUE ELSE FALSE END AS is_view
FROM (
SELECT TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE
FROM INFORMATION_SCHEMA."TABLES"
) nested_0
RIGHT JOIN (
SELECT TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, DATA_TYPE, ORDINAL_POSITION
FROM INFORMATION_SCHEMA."COLUMNS"
) nested_1 ON nested_0.TABLE_NAME = nested_1.TABLE_NAME
AND nested_0.TABLE_SCHEMA = nested_1.TABLE_SCHEMA
AND nested_0.TABLE_CATALOG = nested_1.TABLE_CATALOG
{where_stmt}
'''
# Config keys
DREMIO_USER_KEY = 'user_key'
DREMIO_PASSWORD_KEY = 'password_key'
DREMIO_HOST_KEY = 'host_key'
DREMIO_PORT_KEY = 'port_key'
DREMIO_DRIVER_KEY = 'driver_key'
DREMIO_CLUSTER_KEY = 'cluster_key'
DREMIO_EXCLUDE_SYS_TABLES_KEY = 'exclude_system_tables'
DREMIO_EXCLUDE_PDS_TABLES_KEY = 'exclude_pds_tables'
# Default values
DEFAULT_AUTH_USER = 'dremio_auth_user'
DEFAULT_AUTH_PW = 'dremio_auth_pw'
DEFAULT_HOST = 'localhost'
DEFAULT_PORT = '31010'
DEFAULT_DRIVER = 'DSN=Dremio Connector'
DEFAULT_CLUSTER_NAME = 'Production'
DEFAULT_EXCLUDE_SYS_TABLES = True
DEFAULT_EXCLUDE_PDS_TABLES = False
# Default config
DEFAULT_CONFIG = ConfigFactory.from_dict({
DREMIO_USER_KEY: DEFAULT_AUTH_USER,
DREMIO_PASSWORD_KEY: DEFAULT_AUTH_PW,
DREMIO_HOST_KEY: DEFAULT_HOST,
DREMIO_PORT_KEY: DEFAULT_PORT,
DREMIO_DRIVER_KEY: DEFAULT_DRIVER,
DREMIO_CLUSTER_KEY: DEFAULT_CLUSTER_NAME,
DREMIO_EXCLUDE_SYS_TABLES_KEY: DEFAULT_EXCLUDE_SYS_TABLES,
DREMIO_EXCLUDE_PDS_TABLES_KEY: DEFAULT_EXCLUDE_PDS_TABLES
})
def init(self, conf: ConfigTree) -> None:
conf = conf.with_fallback(DremioMetadataExtractor.DEFAULT_CONFIG)
exclude_sys_tables = conf.get_bool(DremioMetadataExtractor.DREMIO_EXCLUDE_SYS_TABLES_KEY)
exclude_pds_tables = conf.get_bool(DremioMetadataExtractor.DREMIO_EXCLUDE_PDS_TABLES_KEY)
if exclude_sys_tables and exclude_pds_tables:
where_stmt = ("WHERE nested_0.TABLE_TYPE != 'SYSTEM_TABLE' AND "
"nested_0.TABLE_TYPE != 'TABLE';")
elif exclude_sys_tables:
where_stmt = "WHERE nested_0.TABLE_TYPE != 'SYSTEM_TABLE';"
elif exclude_pds_tables:
where_stmt = "WHERE nested_0.TABLE_TYPE != 'TABLE';"
else:
where_stmt = ';'
self._cluster = conf.get_string(DremioMetadataExtractor.DREMIO_CLUSTER_KEY)
self._cluster = conf.get_string(DremioMetadataExtractor.DREMIO_CLUSTER_KEY)
self.sql_stmt = DremioMetadataExtractor.SQL_STATEMENT.format(
cluster=self._cluster,
where_stmt=where_stmt
)
LOGGER.info('SQL for Dremio metadata: {}'.format(self.sql_stmt))
self._pyodbc_cursor = connect(
conf.get_string(DremioMetadataExtractor.DREMIO_DRIVER_KEY),
uid=conf.get_string(DremioMetadataExtractor.DREMIO_USER_KEY),
pwd=conf.get_string(DremioMetadataExtractor.DREMIO_PASSWORD_KEY),
host=conf.get_string(DremioMetadataExtractor.DREMIO_HOST_KEY),
port=conf.get_string(DremioMetadataExtractor.DREMIO_PORT_KEY),
autocommit=True).cursor()
self._extract_iter: Union[None, Iterator] = None
def extract(self) -> Union[TableMetadata, None]:
if not self._extract_iter:
self._extract_iter = self._get_extract_iter()
try:
return next(self._extract_iter)
except StopIteration:
return None
def get_scope(self) -> str:
return 'extractor.dremio'
def _get_extract_iter(self) -> Iterator[TableMetadata]:
'''
Using itertools.groupby and raw level iterator, it groups to table and yields TableMetadata
:return:
'''
for _, group in groupby(self._get_raw_extract_iter(), self._get_table_key):
columns = []
for row in group:
last_row = row
columns.append(ColumnMetadata(
row['col_name'],
row['col_description'],
row['col_type'],
row['col_sort_order'])
)
yield TableMetadata(last_row['database'],
last_row['cluster'],
last_row['schema'],
last_row['name'],
last_row['description'],
columns,
last_row['is_view'] == 'true')
def _get_raw_extract_iter(self) -> Iterator[Dict[str, Any]]:
'''
Provides iterator of result row from SQLAlchemy extractor
:return:
'''
for row in self._pyodbc_cursor.execute(self.sql_stmt):
yield dict(zip([c[0] for c in self._pyodbc_cursor.description], row))
def _get_table_key(self, row: Dict[str, Any]) -> Union[TableKey, None]:
'''
Table key consists of schema and table name
:param row:
:return:
'''
if row:
return TableKey(schema=row['schema'], table_name=row['name'])
return None
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0
"""
This is a example script which demo how to load data into neo4j without using Airflow DAG.
"""
import logging
import os
from pyhocon import ConfigFactory
import uuid
import sys
from databuilder.extractor.dremio_metadata_extractor import DremioMetadataExtractor
from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader
from databuilder.publisher import neo4j_csv_publisher
from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher
from databuilder.task.task import DefaultTask
from databuilder.extractor.neo4j_search_data_extractor import Neo4jSearchDataExtractor
from databuilder.extractor.neo4j_extractor import Neo4jExtractor
from databuilder.loader.file_system_elasticsearch_json_loader import FSElasticsearchJSONLoader
from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher
from elasticsearch.client import Elasticsearch
from databuilder.transformer.base_transformer import NoopTransformer
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.INFO)
# set env Dremio values to override defaults
DREMIO_HOST = 'localhost'
DREMIO_USER = 'dremio'
DREMIO_PASSWORD = 'test'
# set env NEO4J_HOST to override localhost
NEO4J_ENDPOINT = 'bolt://{}:7687'.format(os.getenv('NEO4J_HOST', 'localhost'))
NEO4J_USER = 'neo4j'
NEO4J_PASSWORD = 'test'
es_host = None
neo_host = None
if len(sys.argv) > 1:
es_host = sys.argv[1]
if len(sys.argv) > 2:
neo_host = sys.argv[2]
es = Elasticsearch([
{'host': es_host if es_host else 'localhost'},
])
def create_sample_dremio_job():
tmp_folder = '/var/tmp/amundsen/{}'.format('tables')
node_files_folder = '{tmp_folder}/nodes'.format(tmp_folder=tmp_folder)
relationship_files_folder = '{tmp_folder}/relationships'.format(tmp_folder=tmp_folder)
extractor = DremioMetadataExtractor()
loader = FsNeo4jCSVLoader()
task = DefaultTask(extractor=extractor,
loader=loader)
job_config = ConfigFactory.from_dict({
'extractor.dremio.{}'.format(DremioMetadataExtractor.DREMIO_USER_KEY): DREMIO_USER,
'extractor.dremio.{}'.format(DremioMetadataExtractor.DREMIO_PASSWORD_KEY): DREMIO_PASSWORD,
'extractor.dremio.{}'.format(DremioMetadataExtractor.DREMIO_HOST_KEY): DREMIO_HOST,
'extractor.dremio.{}'.format(DremioMetadataExtractor.DREMIO_EXCLUDE_PDS_TABLES_KEY): True,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.NODE_DIR_PATH): node_files_folder,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.RELATION_DIR_PATH): relationship_files_folder,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.SHOULD_DELETE_CREATED_DIR): True,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.FORCE_CREATE_DIR): True,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NODE_FILES_DIR): node_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.RELATION_FILES_DIR): relationship_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_END_POINT_KEY): NEO4J_ENDPOINT,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_USER): NEO4J_USER,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_PASSWORD): NEO4J_PASSWORD,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.JOB_PUBLISH_TAG): 'unique_tag'
})
job = DefaultJob(conf=job_config,
task=task,
publisher=Neo4jCsvPublisher())
return job
def create_es_publisher_sample_job(elasticsearch_index_alias='table_search_index',
elasticsearch_doc_type_key='table',
model_name='databuilder.models.table_elasticsearch_document.TableESDocument',
cypher_query=None,
elasticsearch_mapping=None):
"""
:param elasticsearch_index_alias: alias for Elasticsearch used in
amundsensearchlibrary/search_service/config.py as an index
:param elasticsearch_doc_type_key: name the ElasticSearch index is prepended with. Defaults to `table` resulting in
`table_search_index`
:param model_name: the Databuilder model class used in transporting between Extractor and Loader
:param cypher_query: Query handed to the `Neo4jSearchDataExtractor` class, if None is given (default)
it uses the `Table` query baked into the Extractor
:param elasticsearch_mapping: Elasticsearch field mapping "DDL" handed to the `ElasticsearchPublisher` class,
if None is given (default) it uses the `Table` query baked into the Publisher
"""
# loader saves data to this location and publisher reads it from here
extracted_search_data_path = '/var/tmp/amundsen/search_data.json'
task = DefaultTask(loader=FSElasticsearchJSONLoader(),
extractor=Neo4jSearchDataExtractor(),
transformer=NoopTransformer())
# elastic search client instance
elasticsearch_client = es
# unique name of new index in Elasticsearch
elasticsearch_new_index_key = 'tables' + str(uuid.uuid4())
job_config = ConfigFactory.from_dict({
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.GRAPH_URL_CONFIG_KEY): NEO4J_ENDPOINT,
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.MODEL_CLASS_CONFIG_KEY): model_name,
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_USER): NEO4J_USER,
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_PW): NEO4J_PASSWORD,
'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY):
extracted_search_data_path,
'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY): 'w',
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_PATH_CONFIG_KEY):
extracted_search_data_path,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_MODE_CONFIG_KEY): 'r',
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY):
elasticsearch_client,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY):
elasticsearch_new_index_key,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_DOC_TYPE_CONFIG_KEY):
elasticsearch_doc_type_key,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY):
elasticsearch_index_alias,
})
# only optionally add these keys, so need to dynamically `put` them
if cypher_query:
job_config.put('extractor.search_data.{}'.format(Neo4jSearchDataExtractor.CYPHER_QUERY_CONFIG_KEY),
cypher_query)
if elasticsearch_mapping:
job_config.put('publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_MAPPING_CONFIG_KEY),
elasticsearch_mapping)
job = DefaultJob(conf=job_config,
task=task,
publisher=ElasticsearchPublisher())
return job
if __name__ == "__main__":
# Push code to Neo4j from Dremio
job = create_sample_dremio_job()
job.launch()
# Push data to Elasticsearch from Neo4j
job_es_table = create_es_publisher_sample_job(
elasticsearch_index_alias='table_search_index',
elasticsearch_doc_type_key='table',
model_name='databuilder.models.table_elasticsearch_document.TableESDocument')
job_es_table.launch()
...@@ -47,11 +47,16 @@ db2 = [ ...@@ -47,11 +47,16 @@ db2 = [
'ibm-db-sa-py3==0.3.1-1' 'ibm-db-sa-py3==0.3.1-1'
] ]
dremio = [
'pyodbc==4.0.30'
]
druid = [ druid = [
'pydruid' 'pydruid'
] ]
all_deps = requirements + kafka + cassandra + glue + snowflake + athena + bigquery + jsonpath + db2 + druid all_deps = requirements + kafka + cassandra + glue + snowflake + athena + \
bigquery + jsonpath + db2 + dremio + druid
setup( setup(
name='amundsen-databuilder', name='amundsen-databuilder',
...@@ -75,6 +80,7 @@ setup( ...@@ -75,6 +80,7 @@ setup(
'bigquery': bigquery, 'bigquery': bigquery,
'jsonpath': jsonpath, 'jsonpath': jsonpath,
'db2': db2, 'db2': db2,
'dremio': dremio,
'druid': druid, 'druid': druid,
}, },
classifiers=[ classifiers=[
......
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0
import logging
from typing import Dict, List, Any
import unittest
from unittest.mock import MagicMock, patch
from pyhocon import ConfigFactory
from databuilder.extractor.dremio_metadata_extractor import DremioMetadataExtractor
from databuilder.models.table_metadata import TableMetadata, ColumnMetadata
class TestDremioMetadataExtractor(unittest.TestCase):
def setUp(self) -> None:
logging.basicConfig(level=logging.INFO)
config_dict: Dict[str, str] = {}
self.conf = ConfigFactory.from_dict(config_dict)
@patch('databuilder.extractor.dremio_metadata_extractor.connect')
def test_extraction_with_empty_query_result(self, mock_connect: MagicMock) -> None:
"""
Test Extraction with empty result from query
"""
mock_connection = MagicMock()
mock_connect.return_value = mock_connection
mock_cursor = MagicMock()
mock_connection.cursor.return_value = mock_cursor
extractor = DremioMetadataExtractor()
extractor.init(self.conf)
results = extractor.extract()
self.assertEqual(results, None)
@patch('databuilder.extractor.dremio_metadata_extractor.connect')
def test_extraction_with_single_result(self, mock_connect: MagicMock) -> None:
"""
Test Extraction with single table result from query
"""
mock_connection = MagicMock()
mock_connect.return_value = mock_connection
mock_cursor = MagicMock()
mock_connection.cursor.return_value = mock_cursor
mock_execute = MagicMock()
mock_cursor.execute = mock_execute
mock_cursor.description = [
['col_name'],
['col_description'],
['col_type'],
['col_sort_order'],
['database'],
['cluster'],
['schema'],
['name'],
['description'],
['is_view']
]
# Pass flake8 Unsupported operand types for + error
table: List[Any] = [
'DREMIO',
'Production',
'test_schema',
'test_table',
'a table for testing',
'false'
]
# Pass flake8 Unsupported operand types for + error
expected_input: List[List[Any]] = [
['col_id1', 'description of id1', 'number', 0] + table,
['col_id2', 'description of id2', 'number', 1] + table,
['is_active', None, 'boolean', 2] + table,
['source', 'description of source', 'varchar', 3] + table,
['etl_created_at', 'description of etl_created_at', 'timestamp_ltz', 4] + table,
['ds', None, 'varchar', 5] + table
]
mock_cursor.execute.return_value = expected_input
extractor = DremioMetadataExtractor()
extractor.init(self.conf)
actual = extractor.extract()
expected = TableMetadata('DREMIO', 'Production', 'test_schema', 'test_table', 'a table for testing',
[ColumnMetadata('col_id1', 'description of id1', 'number', 0),
ColumnMetadata('col_id2', 'description of id2', 'number', 1),
ColumnMetadata('is_active', None, 'boolean', 2),
ColumnMetadata('source', 'description of source', 'varchar', 3),
ColumnMetadata('etl_created_at', 'description of etl_created_at',
'timestamp_ltz', 4),
ColumnMetadata('ds', None, 'varchar', 5)])
self.assertEqual(expected.__repr__(), actual.__repr__())
self.assertIsNone(extractor.extract())
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment