Unverified Commit 8075a6c7 authored by Tao Feng's avatar Tao Feng Committed by GitHub

feat: allow hive sql to be provided as config (#312)

* feat: allow hive sql to be provided as config

* update test name

* update test name

* remove print

* update
parent 381a4170
......@@ -23,12 +23,13 @@ class HiveTableMetadataExtractor(Extractor):
"""
Extracts Hive table and column metadata from underlying meta store database using SQLAlchemyExtractor
"""
EXTRACT_SQL = 'extract_sql'
# SELECT statement from hive metastore database to extract table and column metadata
# Below SELECT statement uses UNION to combining two queries together.
# 1st query is retrieving partition columns
# 2nd query is retrieving columns
# Using UNION to combine above two statements and order by table & partition identifier.
SQL_STATEMENT = """
DEFAULT_SQL_STATEMENT = """
SELECT source.* FROM
(SELECT t.TBL_ID, d.NAME as `schema`, t.TBL_NAME name, t.TBL_TYPE, tp.PARAM_VALUE as description,
p.PKEY_NAME as col_name, p.INTEGER_IDX as col_sort_order,
......@@ -66,9 +67,11 @@ class HiveTableMetadataExtractor(Extractor):
conf = conf.with_fallback(HiveTableMetadataExtractor.DEFAULT_CONFIG)
self._cluster = '{}'.format(conf.get_string(HiveTableMetadataExtractor.CLUSTER_KEY))
self.sql_stmt = HiveTableMetadataExtractor.SQL_STATEMENT.format(
default_sql = HiveTableMetadataExtractor.DEFAULT_SQL_STATEMENT.format(
where_clause_suffix=conf.get_string(HiveTableMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY))
self.sql_stmt = conf.get_string(HiveTableMetadataExtractor.EXTRACT_SQL, default=default_sql)
LOGGER.info('SQL for hive metastore: {}'.format(self.sql_stmt))
self._alchemy_extractor = SQLAlchemyExtractor()
......
......@@ -230,6 +230,25 @@ class TestHiveTableMetadataExtractorWithWhereClause(unittest.TestCase):
extractor.init(self.conf)
self.assertTrue(self.where_clause_suffix in extractor.sql_stmt)
def test_hive_sql_statement_with_custom_sql(self):
# type: () -> None
"""
Test Extraction by providing a custom sql
:return:
"""
with patch.object(SQLAlchemyExtractor, '_get_connection'):
config_dict = {
HiveTableMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY: self.where_clause_suffix,
'extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING):
'TEST_CONNECTION',
HiveTableMetadataExtractor.EXTRACT_SQL:
'select sth for test {where_clause_suffix}'
}
conf = ConfigFactory.from_dict(config_dict)
extractor = HiveTableMetadataExtractor()
extractor.init(conf)
self.assertTrue('select sth for test' in extractor.sql_stmt)
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment