Commit db04aa0b authored by Junda Yang's avatar Junda Yang Committed by Tao Feng

index presto views (#25)

* index presto views

* fix test

* address comment

* remove alphabetica sorting

* bump up version
parent c7d6da1f
import base64
import json
import logging
from pyhocon import ConfigFactory, ConfigTree # noqa: F401
from typing import Iterator, Union, Dict, Any # noqa: F401
from databuilder import Scoped
from databuilder.extractor.base_extractor import Extractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.models.table_metadata import TableMetadata, ColumnMetadata
LOGGER = logging.getLogger(__name__)
class PrestoViewMetadataExtractor(Extractor):
"""
Extracts Presto View and column metadata from underlying meta store database using SQLAlchemyExtractor
PrestoViewMetadataExtractor does not require a separate table model but just reuse the existing TableMetadata
"""
# SQL statement to extract View metadata
# {where_clause_suffix} could be used to filter schemas
SQL_STATEMENT = """
SELECT t.TBL_ID, d.NAME as schema_name, t.TBL_NAME name, t.TBL_TYPE, t.VIEW_ORIGINAL_TEXT as view_original_text
FROM TBLS t
JOIN DBS d ON t.DB_ID = d.DB_ID
WHERE t.VIEW_EXPANDED_TEXT = '/* Presto View */'
{where_clause_suffix}
ORDER BY t.TBL_ID desc;
"""
# Presto View data prefix and suffix definition:
# https://github.com/prestodb/presto/blob/43bd519052ba4c56ff1f4fc807075637ab5f4f10/presto-hive/src/main/java/com/facebook/presto/hive/HiveUtil.java#L153-L154
PRESTO_VIEW_PREFIX = '/* Presto View: '
PRESTO_VIEW_SUFFIX = ' */'
# CONFIG KEYS
WHERE_CLAUSE_SUFFIX_KEY = 'where_clause_suffix'
CLUSTER_KEY = 'cluster'
DEFAULT_CONFIG = ConfigFactory.from_dict({WHERE_CLAUSE_SUFFIX_KEY: ' ',
CLUSTER_KEY: 'gold'})
def init(self, conf):
# type: (ConfigTree) -> None
conf = conf.with_fallback(PrestoViewMetadataExtractor.DEFAULT_CONFIG)
self._cluster = '{}'.format(conf.get_string(PrestoViewMetadataExtractor.CLUSTER_KEY))
self.sql_stmt = PrestoViewMetadataExtractor.SQL_STATEMENT.format(
where_clause_suffix=conf.get_string(PrestoViewMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY))
LOGGER.info('SQL for hive metastore: {}'.format(self.sql_stmt))
self._alchemy_extractor = SQLAlchemyExtractor()
sql_alch_conf = Scoped.get_scoped_conf(conf, self._alchemy_extractor.get_scope())\
.with_fallback(ConfigFactory.from_dict({SQLAlchemyExtractor.EXTRACT_SQL: self.sql_stmt}))
self._alchemy_extractor.init(sql_alch_conf)
self._extract_iter = None # type: Union[None, Iterator]
def extract(self):
# type: () -> Union[TableMetadata, None]
if not self._extract_iter:
self._extract_iter = self._get_extract_iter()
try:
return next(self._extract_iter)
except StopIteration:
return None
def get_scope(self):
# type: () -> str
return 'extractor.presto_view_metadata'
def _get_extract_iter(self):
# type: () -> Iterator[TableMetadata]
"""
Using itertools.groupby and raw level iterator, it groups to table and yields TableMetadata
:return:
"""
row = self._alchemy_extractor.extract()
while row:
columns = self._get_column_metadata(row['view_original_text'])
yield TableMetadata(database='presto',
cluster=self._cluster,
schema_name=row['schema_name'],
name=row['name'],
description=None,
columns=columns,
is_view=True)
row = self._alchemy_extractor.extract()
def _get_column_metadata(self, view_original_text):
# type: (str) -> List[ColumnMetadata]
"""
Get Column Metadata from VIEW_ORIGINAL_TEXT from TBLS table for Presto Views.
Columns are sorted the same way as they appear in Presto Create View SQL.
:param view_original_text:
:return:
"""
# remove encoded Presto View data prefix and suffix
encoded_view_info = (
view_original_text.
split(PrestoViewMetadataExtractor.PRESTO_VIEW_PREFIX, 1)[-1].
rsplit(PrestoViewMetadataExtractor.PRESTO_VIEW_SUFFIX, 1)[0]
)
# view_original_text is b64 encoded:
# https://github.com/prestodb/presto/blob/43bd519052ba4c56ff1f4fc807075637ab5f4f10/presto-hive/src/main/java/com/facebook/presto/hive/HiveUtil.java#L602-L605
decoded_view_info = base64.b64decode(encoded_view_info)
columns = json.loads(decoded_view_info).get('columns')
return [ColumnMetadata(name=column['name'],
description=None,
col_type=column['type'],
sort_order=i) for i, column in enumerate(columns)]
......@@ -62,10 +62,13 @@ class TableMetadata(Neo4jCsvSerializable):
These are being created here as it does not make much sense to have different extraction to produce this. As
database, cluster, schema would be very repititive with low cardinality, it will perform de-dupe so that publisher
won't need to publish same nodes, relationships.
This class can be used for both table and view metadata. If it is a View, is_view=True should be passed in.
"""
TABLE_NODE_LABEL = 'Table'
TABLE_KEY_FORMAT = '{db}://{cluster}.{schema}/{tbl}'
TABLE_NAME = 'name'
IS_VIEW = 'is_view'
TABLE_DESCRIPTION = 'description'
TABLE_DESCRIPTION_FORMAT = '{db}://{cluster}.{schema}/{tbl}/_description'
......@@ -100,7 +103,8 @@ class TableMetadata(Neo4jCsvSerializable):
schema_name, # type: str
name, # type: str
description, # type: Union[str, None]
columns=None # type: Iterable[ColumnMetadata]
columns=None, # type: Iterable[ColumnMetadata]
is_view=False, # type: bool
):
# type: (...) -> None
"""
......@@ -118,17 +122,19 @@ class TableMetadata(Neo4jCsvSerializable):
self.name = name
self.description = description
self.columns = columns if columns else []
self.is_view = is_view
self._node_iterator = self._create_next_node()
self._relation_iterator = self._create_next_relation()
def __repr__(self):
# type: () -> str
return 'TableMetadata({!r}, {!r}, {!r}, {!r}, {!r}, {!r})'.format(self.database,
self.cluster,
self.schema_name,
self.name,
self.description,
self.columns)
return 'TableMetadata({!r}, {!r}, {!r}, {!r}, {!r}, {!r}, {!r})'.format(self.database,
self.cluster,
self.schema_name,
self.name,
self.description,
self.columns,
self.is_view)
def _get_table_key(self):
# type: () -> str
......@@ -186,7 +192,8 @@ class TableMetadata(Neo4jCsvSerializable):
# type: () -> Iterator[Any]
yield {NODE_LABEL: TableMetadata.TABLE_NODE_LABEL,
NODE_KEY: self._get_table_key(),
TableMetadata.TABLE_NAME: self.name}
TableMetadata.TABLE_NAME: self.name,
TableMetadata.IS_VIEW: self.is_view}
if self.description:
yield {NODE_LABEL: DESCRIPTION_NODE_LABEL,
......
from setuptools import setup, find_packages
__version__ = '1.0.9'
__version__ = '1.0.10'
setup(
......
import base64
import json
import logging
import unittest
from mock import patch, MagicMock
from pyhocon import ConfigFactory
from databuilder.extractor.presto_view_metadata_extractor import PrestoViewMetadataExtractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.models.table_metadata import TableMetadata, ColumnMetadata
class TestPrestoViewMetadataExtractor(unittest.TestCase):
def setUp(self):
# type: () -> None
logging.basicConfig(level=logging.INFO)
config_dict = {
'extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING):
'TEST_CONNECTION'
}
self.conf = ConfigFactory.from_dict(config_dict)
def test_extraction_with_empty_result(self):
# type: () -> None
"""
Test Extraction with empty result from query
"""
with patch.object(SQLAlchemyExtractor, '_get_connection'):
extractor = PrestoViewMetadataExtractor()
extractor.init(self.conf)
results = extractor.extract()
self.assertEqual(results, None)
def test_extraction_with_multiple_views(self):
# type: () -> None
with patch.object(SQLAlchemyExtractor, '_get_connection') as mock_connection:
connection = MagicMock()
mock_connection.return_value = connection
sql_execute = MagicMock()
connection.execute = sql_execute
columns1 = {'columns': [{'name': 'xyz', 'type': 'varchar'},
{'name': 'xyy', 'type': 'double'},
{'name': 'aaa', 'type': 'int'},
{'name': 'ab', 'type': 'varchar'}]}
columns2 = {'columns': [{'name': 'xyy', 'type': 'varchar'},
{'name': 'ab', 'type': 'double'},
{'name': 'aaa', 'type': 'int'},
{'name': 'xyz', 'type': 'varchar'}]}
sql_execute.return_value = [
{'tbl_id': 2,
'schema_name': 'test_schema2',
'name': 'test_view2',
'tbl_type': 'virtual_view',
'view_original_text': base64.b64encode(json.dumps(columns2).encode()).decode("utf-8")},
{'tbl_id': 1,
'schema_name': 'test_schema1',
'name': 'test_view1',
'tbl_type': 'virtual_view',
'view_original_text': base64.b64encode(json.dumps(columns1).encode()).decode("utf-8")},
]
extractor = PrestoViewMetadataExtractor()
extractor.init(self.conf)
actual_first_view = extractor.extract()
expected_first_view = TableMetadata('presto', 'gold', 'test_schema2', 'test_view2', None,
[ColumnMetadata(u'xyy', None, u'varchar', 0),
ColumnMetadata(u'ab', None, u'double', 1),
ColumnMetadata(u'aaa', None, u'int', 2),
ColumnMetadata(u'xyz', None, u'varchar', 3)],
True)
self.assertEqual(expected_first_view.__repr__(), actual_first_view.__repr__())
actual_second_view = extractor.extract()
expected_second_view = TableMetadata('presto', 'gold', 'test_schema1', 'test_view1', None,
[ColumnMetadata(u'xyz', None, u'varchar', 0),
ColumnMetadata(u'xyy', None, u'double', 1),
ColumnMetadata(u'aaa', None, u'int', 2),
ColumnMetadata(u'ab', None, u'varchar', 3)],
True)
self.assertEqual(expected_second_view.__repr__(), actual_second_view.__repr__())
self.assertIsNone(extractor.extract())
if __name__ == '__main__':
unittest.main()
......@@ -24,7 +24,7 @@ class TestTableMetadata(unittest.TestCase):
ColumnMetadata('ds', None, 'varchar', 5)])
self.expected_nodes_deduped = [
{'name': 'test_table1', 'KEY': 'hive://gold.test_schema1/test_table1', 'LABEL': 'Table'},
{'name': 'test_table1', 'KEY': 'hive://gold.test_schema1/test_table1', 'LABEL': 'Table', 'is_view': False},
{'description': 'test_table1', 'KEY': 'hive://gold.test_schema1/test_table1/_description',
'LABEL': 'Description'},
{'sort_order': 0, 'type': 'bigint', 'name': 'test_id1',
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment