Unverified Commit 0c6db356 authored by Tao Feng's avatar Tao Feng Committed by GitHub

Remove py27 Antlar presto sql usage extractor (#208)

* Remove py27 Antlar presto sql usage extractor

* remove more

* remove pytest macro
parent 97277402
...@@ -272,10 +272,6 @@ job = DefaultJob( ...@@ -272,10 +272,6 @@ job = DefaultJob(
job.launch() job.launch()
``` ```
#### [TblColUsgAggExtractor](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/table_column_usage_aggregate_extractor.py "TblColUsgAggExtractor")
An extractor that extracts table usage from SQL statements. It accept any extractor that extracts row from source that has SQL audit log. Once SQL statement is extracted, it uses [ANTLR](https://www.antlr.org/ "ANTLR") to parse and get tables and columns that it reads from. Also, it aggregates usage based on table and user. (Column level aggregation is not there yet.)
## List of transformers ## List of transformers
#### [ChainedTransformer](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/transformer/base_transformer.py#L41 "ChainedTransformer") #### [ChainedTransformer](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/transformer/base_transformer.py#L41 "ChainedTransformer")
A chanined transformer that can take multiple transformer. A chanined transformer that can take multiple transformer.
...@@ -296,9 +292,6 @@ job = DefaultJob( ...@@ -296,9 +292,6 @@ job = DefaultJob(
job.launch() job.launch()
``` ```
#### [SqlToTblColUsageTransformer](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/transformer/sql_to_table_col_usage_transformer.py "SqlToTblColUsageTransformer")
A SQL to usage transformer where it transforms to ColumnReader that has column, user, count. Currently it's collects on table level that column on same table will be de-duped. In many cases, "from" clause does not contain schema and this will be fetched via table name -> schema name mapping which it gets from metadata extractor.
## List of loader ## List of loader
#### [FsNeo4jCSVLoader](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/loader/file_system_neo4j_csv_loader.py "FsNeo4jCSVLoader") #### [FsNeo4jCSVLoader](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/loader/file_system_neo4j_csv_loader.py "FsNeo4jCSVLoader")
Write node and relationship CSV file(s) that can be consumed by Neo4jCsvPublisher. It assumes that the record it consumes is instance of Neo4jCsvSerializable. Write node and relationship CSV file(s) that can be consumed by Neo4jCsvPublisher. It assumes that the record it consumes is instance of Neo4jCsvSerializable.
......
from collections import namedtuple
import logging
from pyhocon import ConfigTree # noqa: F401
from typing import Dict, List, Any, Optional # noqa: F401
from databuilder import Scoped
from databuilder.transformer.base_transformer import NoopTransformer
from databuilder.extractor.base_extractor import Extractor
from databuilder.models.table_column_usage import ColumnReader, TableColumnUsage
from databuilder.transformer.base_transformer import ChainedTransformer
from databuilder.transformer.regex_str_replace_transformer import RegexStrReplaceTransformer
from databuilder.transformer.sql_to_table_col_usage_transformer import SqlToTblColUsageTransformer
TableColumnUsageTuple = namedtuple('TableColumnUsageTuple', ['database', 'cluster', 'schema',
'table', 'column', 'email'])
LOGGER = logging.getLogger(__name__)
# Config keys:
RAW_EXTRACTOR = 'raw_extractor'
class TblColUsgAggExtractor(Extractor):
"""
An aggregate extractor for table column usage.
It uses RegexStrReplaceTransformer to cleanse SQL statement and uses SqlToTblColUsageTransformer to get table
column usage.
All usage will be aggregated in memory and on last record, it will return aggregated TableColumnUsage
Note that this extractor will do all the transformation and aggregation so that no more transformation is needed,
after this.
"""
def init(self, conf):
# type: (ConfigTree) -> None
self._extractor = conf.get(RAW_EXTRACTOR) # type: Extractor
self._extractor.init(Scoped.get_scoped_conf(conf, self._extractor.get_scope()))
regex_transformer = RegexStrReplaceTransformer() # type: Any
if conf.get(regex_transformer.get_scope(), None):
regex_transformer.init(Scoped.get_scoped_conf(conf, regex_transformer.get_scope()))
else:
LOGGER.info('{} is not defined. Not using it'.format(regex_transformer.get_scope()))
regex_transformer = NoopTransformer()
sql_to_usage_transformer = SqlToTblColUsageTransformer()
sql_to_usage_transformer.init(Scoped.get_scoped_conf(conf, sql_to_usage_transformer.get_scope()))
self._transformer = ChainedTransformer((regex_transformer, sql_to_usage_transformer))
def extract(self):
# type: () -> Optional[TableColumnUsage]
"""
It aggregates all count per table and user in memory. Table level aggregation don't expect to occupy much
memory.
:return: Provides a record or None if no more to extract
"""
count_map = {} # type: Dict[TableColumnUsageTuple, int]
record = self._extractor.extract()
count = 0
while record:
count += 1
if count % 1000 == 0:
LOGGER.info('Aggregated {} records'.format(count))
tbl_col_usg = self._transformer.transform(record=record)
record = self._extractor.extract()
# filtered case
if not tbl_col_usg:
continue
for col_rdr in tbl_col_usg.col_readers:
key = TableColumnUsageTuple(database=col_rdr.database, cluster=col_rdr.cluster, schema=col_rdr.schema,
table=col_rdr.table, column=col_rdr.column, email=col_rdr.user_email)
new_count = count_map.get(key, 0) + col_rdr.read_count
count_map[key] = new_count
if not len(count_map):
return None
col_readers = [] # type: List[ColumnReader]
while len(count_map):
tbl_col_rdr_tuple, count = count_map.popitem()
col_readers.append(ColumnReader(database=tbl_col_rdr_tuple.database, cluster=tbl_col_rdr_tuple.cluster,
schema=tbl_col_rdr_tuple.schema, table=tbl_col_rdr_tuple.table,
column=tbl_col_rdr_tuple.column, user_email=tbl_col_rdr_tuple.email,
read_count=count))
return TableColumnUsage(col_readers=col_readers)
def get_scope(self):
# type: () -> str
return 'extractor.table_column_usage_aggregate'
def close(self):
# type: () -> None
self._transformer.close()
import copy
import logging
import six
from typing import Union, Iterable, Optional # noqa: F401
LOGGER = logging.getLogger(__name__)
def remove_double_quotes(val):
# type: (Union[str, None]) -> Union[str, None]
"""
Often times, column name, table name or any identifier comes with double quoted. This method will remove double
quotes.
:param val:
:return:
"""
if not val:
return val
if six.PY2 and isinstance(val, six.text_type):
val = val.encode('utf-8', 'ignore')
if val.startswith('"') and val.endswith('"'):
return val[1:-1]
return val
class Column(object):
"""
Column for usage.
"""
def __init__(self, name, table=None, col_alias=None):
# type: (str, Union[Table, None], Union[str, None]) -> None
self.col_name = remove_double_quotes(name)
self.table = table
self.col_alias = remove_double_quotes(col_alias)
def __repr__(self):
# type: () -> str
return 'Column(name={!r}, table={!r}, col_alias={!r})'.format(self.col_name, self.table, self.col_alias)
def resolve_col_name(self, col_name):
# type: (Union[str, None]) -> Union[str, None]
"""
Resolve column name for currently processing column.
e.g: SELECT bar from (SELECT foo as bar FROM foobar)
Above case, bar is trying to resolve with column foo that has alias bar. In this case, it will resolve to foo,
as that is the actual column name.
:param col_name:
:return:
"""
if self.col_name == '*':
return col_name
if col_name == self.col_alias or col_name == self.col_name:
return self.col_name
return None
@staticmethod
def resolve(select_col, from_cols):
# type: (Column, Iterable[Column]) -> Iterable[Column]
"""
Resolve processing column with processed columns. Processing column is the one from SELECT clause where it
does not have table information where it can optionally have table alias in front of column (foo.bar)
Processed columns are already resolved columns that it has column with table with it.
Resolving processing columns with processed columns means having processing columns choose correct processed
columns. The result is proper column name with table name.
e.g1: SELECT foo from foobar
- processing column: foo
- processed column: all columns from foobar
--> result: column 'foo' from 'foobar' table
e.g2: SELECT foo from (SELECT foo, bar FROM foobar)
- processing column: foo
- processed column: foo and bar columns from foobar
--> result: column 'foo' from 'foobar' table
:param select_col: column from select clause
:param from_cols: column from 'from' clause
:return: List of columns
"""
if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug('select_col: {}'.format(select_col))
LOGGER.debug('from_cols: {}'.format(from_cols))
if select_col.col_name != '*':
return Column.resolve_named_column(select_col, from_cols)
return Column._resolve_all_column(select_col, from_cols)
@staticmethod
def resolve_named_column(select_col, from_cols):
# type: (Column, Iterable[Column]) -> Iterable[Column]
"""
SELECT clause where column has name (not *)
e.g: SELECT foo, bar FROM foobar.
:param select_col: column from select clause
:param from_cols: column from 'from' clause
:return:
"""
# column name is defined and has table alias. (SELECT foo.bar)
if select_col.table:
for processed_col in from_cols:
# resolve column name as processing column name can be alias
col_name = processed_col.resolve_col_name(select_col.col_name)
if col_name and processed_col.table:
# resolve table name using alias
table = processed_col.table.resolve_table(select_col.table.name)
if table:
alias = Column.get_column_alias(select_col=select_col, from_col=processed_col)
col = Column(col_name, table=table, col_alias=alias)
return [col]
raise Exception('Cannot found case 1. select_col: {} , from_cols: {}'
.format(select_col, from_cols))
# col name defined but no table. (SELECT foo)
else:
sub_result = []
# Using column name only to find a match from processed column.
# Note that we can have a column name with multiple table as a result. This is the case that SQL engine
# resolves ambiguity by looking into actual columns in table. Here we use OrTable so that later on it
# can be disambiguated.
for processed_col in from_cols:
col_name = processed_col.resolve_col_name(select_col.col_name)
if col_name:
col = copy.deepcopy(processed_col)
col.col_name = col_name
alias = Column.get_column_alias(select_col=select_col, from_col=processed_col)
col.col_alias = alias
sub_result.append(col)
if not sub_result:
raise Exception('Cannot find case 2. select_col: {} , from_cols: {}'
.format(select_col, from_cols))
if len(sub_result) == 1:
return sub_result
tables = []
for col in sub_result:
tables.append(copy.deepcopy(col.table))
col = sub_result[0]
col.table = OrTable(tables)
return [col]
@staticmethod
def get_column_alias(select_col, from_col):
# type: (Column, Column) -> str
"""
Use processing column alias if not null.
:param select_col: column from select clause
:param from_col: column from 'from' clause
:return:
"""
return select_col.col_alias if select_col.col_alias else from_col.col_alias
@staticmethod
def _resolve_all_column(processing_col, processed_cols):
# type: (Column, Iterable[Column]) -> Iterable[Column]
"""
SELECT statement where column is '*'
e.g: SELECT * FROM foobar;
:param processed_cols:
:return:
"""
if processing_col.table:
result = []
# Select whatever we have in processed where it just need to match table
for processed_col in processed_cols:
if processed_col.table:
table = processed_col.table.resolve_table(processing_col.table.name)
if table:
col = copy.deepcopy(processed_col)
col.table = table
result.append(col)
if not result:
raise Exception('Cannot find case 3. select_col: {} , from_cols: {}'
.format(processing_col, processed_cols))
return result
# SELECT * case
else:
result = []
for processed_col in processed_cols:
result.append(copy.deepcopy(processed_col))
if not result:
raise Exception('Cannot find case 4. select_col: {} , from_cols: {}'
.format(processing_col, processed_cols))
return result
class Table(object):
"""
Table class for usage
"""
def __init__(self, name, schema=None, alias=None):
# type: (str, Union[str, None], Union[str, None]) -> None
self.name = remove_double_quotes(name)
self.schema = remove_double_quotes(schema)
self.alias = remove_double_quotes(alias)
def resolve_table(self, select_table_name):
# type: (Union[str, None]) -> Union[Table, None]
"""
If processing_table_name matches table return table instance
:param select_table_name: table in select clause
:return:
"""
if select_table_name == self.alias or select_table_name == self.name:
return self
return None
def __repr__(self):
# type: () -> str
return 'Table(name={!r}, schema={!r}, alias={!r})'.format(self.name, self.schema, self.alias)
class OrTable(Table):
"""
Table that holds multiple table. This is for ambiguous case.
For example, "SELECT a, b FROM foo JOIN bar USING c" statement does not tell if column a is from foo or bar.
Thus, column a is either from table foo or bar and this class represent this problem.
"""
def __init__(self, tables):
# type: (Iterable[Optional[Table]]) -> None
self.tables = tables
def resolve_table(self, select_table_name):
# type: (str) -> Optional[Table]
"""
If any of term matches with table return it
:param select_table_name:
:return:
"""
for table in self.tables:
if isinstance(table, OrTable):
result = table.resolve_table(select_table_name)
if result:
return result
continue
if select_table_name == table.alias or select_table_name == table.name:
return table
return None
def __repr__(self):
# type: () -> str
return 'OrTable(tables={!r})'.format(self.tables)
T__0=1
T__1=2
T__2=3
T__3=4
T__4=5
T__5=6
T__6=7
T__7=8
T__8=9
ADD=10
ALL=11
ALTER=12
ANALYZE=13
AND=14
ANY=15
ARRAY=16
AS=17
ASC=18
AT=19
BERNOULLI=20
BETWEEN=21
BY=22
CALL=23
CASCADE=24
CASE=25
CAST=26
CATALOGS=27
COALESCE=28
COLUMN=29
COLUMNS=30
COMMENT=31
COMMIT=32
COMMITTED=33
CONSTRAINT=34
CREATE=35
CROSS=36
CUBE=37
CURRENT=38
CURRENT_DATE=39
CURRENT_TIME=40
CURRENT_TIMESTAMP=41
CURRENT_USER=42
DATA=43
DATE=44
DAY=45
DEALLOCATE=46
DELETE=47
DESC=48
DESCRIBE=49
DISTINCT=50
DISTRIBUTED=51
DROP=52
ELSE=53
END=54
ESCAPE=55
EXCEPT=56
EXCLUDING=57
EXECUTE=58
EXISTS=59
EXPLAIN=60
EXTRACT=61
FALSE=62
FILTER=63
FIRST=64
FOLLOWING=65
FOR=66
FORMAT=67
FROM=68
FULL=69
FUNCTIONS=70
GRANT=71
GRANTS=72
GRAPHVIZ=73
GROUP=74
GROUPING=75
HAVING=76
HOUR=77
IF=78
IN=79
INCLUDING=80
INNER=81
INPUT=82
INSERT=83
INTEGER=84
INTERSECT=85
INTERVAL=86
INTO=87
IS=88
ISOLATION=89
JOIN=90
LAST=91
LATERAL=92
LEFT=93
LEVEL=94
LIKE=95
LIMIT=96
LOCALTIME=97
LOCALTIMESTAMP=98
LOGICAL=99
MAP=100
MINUTE=101
MONTH=102
NATURAL=103
NFC=104
NFD=105
NFKC=106
NFKD=107
NO=108
NORMALIZE=109
NOT=110
NULL=111
NULLIF=112
NULLS=113
ON=114
ONLY=115
OPTION=116
OR=117
ORDER=118
ORDINALITY=119
OUTER=120
OUTPUT=121
OVER=122
PARTITION=123
PARTITIONS=124
POSITION=125
PRECEDING=126
PREPARE=127
PRIVILEGES=128
PROPERTIES=129
PUBLIC=130
RANGE=131
READ=132
RECURSIVE=133
RENAME=134
REPEATABLE=135
REPLACE=136
RESET=137
RESTRICT=138
REVOKE=139
RIGHT=140
ROLLBACK=141
ROLLUP=142
ROW=143
ROWS=144
SCHEMA=145
SCHEMAS=146
SECOND=147
SELECT=148
SERIALIZABLE=149
SESSION=150
SET=151
SETS=152
SHOW=153
SMALLINT=154
SOME=155
START=156
STATS=157
SUBSTRING=158
SYSTEM=159
TABLE=160
TABLES=161
TABLESAMPLE=162
TEXT=163
THEN=164
TIME=165
TIMESTAMP=166
TINYINT=167
TO=168
TRANSACTION=169
TRUE=170
TRY_CAST=171
TYPE=172
UESCAPE=173
UNBOUNDED=174
UNCOMMITTED=175
UNION=176
UNNEST=177
USE=178
USING=179
VALIDATE=180
VALUES=181
VERBOSE=182
VIEW=183
WHEN=184
WHERE=185
WITH=186
WORK=187
WRITE=188
YEAR=189
ZONE=190
EQ=191
NEQ=192
LT=193
LTE=194
GT=195
GTE=196
PLUS=197
MINUS=198
ASTERISK=199
SLASH=200
PERCENT=201
CONCAT=202
STRING=203
UNICODE_STRING=204
BINARY_LITERAL=205
INTEGER_VALUE=206
DECIMAL_VALUE=207
DOUBLE_VALUE=208
IDENTIFIER=209
DIGIT_IDENTIFIER=210
QUOTED_IDENTIFIER=211
BACKQUOTED_IDENTIFIER=212
TIME_WITH_TIME_ZONE=213
TIMESTAMP_WITH_TIME_ZONE=214
DOUBLE_PRECISION=215
SIMPLE_COMMENT=216
BRACKETED_COMMENT=217
WS=218
UNRECOGNIZED=219
DELIMITER=220
'.'=1
'('=2
')'=3
','=4
'?'=5
'->'=6
'['=7
']'=8
'=>'=9
'ADD'=10
'ALL'=11
'ALTER'=12
'ANALYZE'=13
'AND'=14
'ANY'=15
'ARRAY'=16
'AS'=17
'ASC'=18
'AT'=19
'BERNOULLI'=20
'BETWEEN'=21
'BY'=22
'CALL'=23
'CASCADE'=24
'CASE'=25
'CAST'=26
'CATALOGS'=27
'COALESCE'=28
'COLUMN'=29
'COLUMNS'=30
'COMMENT'=31
'COMMIT'=32
'COMMITTED'=33
'CONSTRAINT'=34
'CREATE'=35
'CROSS'=36
'CUBE'=37
'CURRENT'=38
'CURRENT_DATE'=39
'CURRENT_TIME'=40
'CURRENT_TIMESTAMP'=41
'CURRENT_USER'=42
'DATA'=43
'DATE'=44
'DAY'=45
'DEALLOCATE'=46
'DELETE'=47
'DESC'=48
'DESCRIBE'=49
'DISTINCT'=50
'DISTRIBUTED'=51
'DROP'=52
'ELSE'=53
'END'=54
'ESCAPE'=55
'EXCEPT'=56
'EXCLUDING'=57
'EXECUTE'=58
'EXISTS'=59
'EXPLAIN'=60
'EXTRACT'=61
'FALSE'=62
'FILTER'=63
'FIRST'=64
'FOLLOWING'=65
'FOR'=66
'FORMAT'=67
'FROM'=68
'FULL'=69
'FUNCTIONS'=70
'GRANT'=71
'GRANTS'=72
'GRAPHVIZ'=73
'GROUP'=74
'GROUPING'=75
'HAVING'=76
'HOUR'=77
'IF'=78
'IN'=79
'INCLUDING'=80
'INNER'=81
'INPUT'=82
'INSERT'=83
'INTEGER'=84
'INTERSECT'=85
'INTERVAL'=86
'INTO'=87
'IS'=88
'ISOLATION'=89
'JOIN'=90
'LAST'=91
'LATERAL'=92
'LEFT'=93
'LEVEL'=94
'LIKE'=95
'LIMIT'=96
'LOCALTIME'=97
'LOCALTIMESTAMP'=98
'LOGICAL'=99
'MAP'=100
'MINUTE'=101
'MONTH'=102
'NATURAL'=103
'NFC'=104
'NFD'=105
'NFKC'=106
'NFKD'=107
'NO'=108
'NORMALIZE'=109
'NOT'=110
'NULL'=111
'NULLIF'=112
'NULLS'=113
'ON'=114
'ONLY'=115
'OPTION'=116
'OR'=117
'ORDER'=118
'ORDINALITY'=119
'OUTER'=120
'OUTPUT'=121
'OVER'=122
'PARTITION'=123
'PARTITIONS'=124
'POSITION'=125
'PRECEDING'=126
'PREPARE'=127
'PRIVILEGES'=128
'PROPERTIES'=129
'PUBLIC'=130
'RANGE'=131
'READ'=132
'RECURSIVE'=133
'RENAME'=134
'REPEATABLE'=135
'REPLACE'=136
'RESET'=137
'RESTRICT'=138
'REVOKE'=139
'RIGHT'=140
'ROLLBACK'=141
'ROLLUP'=142
'ROW'=143
'ROWS'=144
'SCHEMA'=145
'SCHEMAS'=146
'SECOND'=147
'SELECT'=148
'SERIALIZABLE'=149
'SESSION'=150
'SET'=151
'SETS'=152
'SHOW'=153
'SMALLINT'=154
'SOME'=155
'START'=156
'STATS'=157
'SUBSTRING'=158
'SYSTEM'=159
'TABLE'=160
'TABLES'=161
'TABLESAMPLE'=162
'TEXT'=163
'THEN'=164
'TIME'=165
'TIMESTAMP'=166
'TINYINT'=167
'TO'=168
'TRANSACTION'=169
'TRUE'=170
'TRY_CAST'=171
'TYPE'=172
'UESCAPE'=173
'UNBOUNDED'=174
'UNCOMMITTED'=175
'UNION'=176
'UNNEST'=177
'USE'=178
'USING'=179
'VALIDATE'=180
'VALUES'=181
'VERBOSE'=182
'VIEW'=183
'WHEN'=184
'WHERE'=185
'WITH'=186
'WORK'=187
'WRITE'=188
'YEAR'=189
'ZONE'=190
'='=191
'<'=193
'<='=194
'>'=195
'>='=196
'+'=197
'-'=198
'*'=199
'/'=200
'%'=201
'||'=202
T__0=1
T__1=2
T__2=3
T__3=4
T__4=5
T__5=6
T__6=7
T__7=8
T__8=9
ADD=10
ALL=11
ALTER=12
ANALYZE=13
AND=14
ANY=15
ARRAY=16
AS=17
ASC=18
AT=19
BERNOULLI=20
BETWEEN=21
BY=22
CALL=23
CASCADE=24
CASE=25
CAST=26
CATALOGS=27
COALESCE=28
COLUMN=29
COLUMNS=30
COMMENT=31
COMMIT=32
COMMITTED=33
CONSTRAINT=34
CREATE=35
CROSS=36
CUBE=37
CURRENT=38
CURRENT_DATE=39
CURRENT_TIME=40
CURRENT_TIMESTAMP=41
CURRENT_USER=42
DATA=43
DATE=44
DAY=45
DEALLOCATE=46
DELETE=47
DESC=48
DESCRIBE=49
DISTINCT=50
DISTRIBUTED=51
DROP=52
ELSE=53
END=54
ESCAPE=55
EXCEPT=56
EXCLUDING=57
EXECUTE=58
EXISTS=59
EXPLAIN=60
EXTRACT=61
FALSE=62
FILTER=63
FIRST=64
FOLLOWING=65
FOR=66
FORMAT=67
FROM=68
FULL=69
FUNCTIONS=70
GRANT=71
GRANTS=72
GRAPHVIZ=73
GROUP=74
GROUPING=75
HAVING=76
HOUR=77
IF=78
IN=79
INCLUDING=80
INNER=81
INPUT=82
INSERT=83
INTEGER=84
INTERSECT=85
INTERVAL=86
INTO=87
IS=88
ISOLATION=89
JOIN=90
LAST=91
LATERAL=92
LEFT=93
LEVEL=94
LIKE=95
LIMIT=96
LOCALTIME=97
LOCALTIMESTAMP=98
LOGICAL=99
MAP=100
MINUTE=101
MONTH=102
NATURAL=103
NFC=104
NFD=105
NFKC=106
NFKD=107
NO=108
NORMALIZE=109
NOT=110
NULL=111
NULLIF=112
NULLS=113
ON=114
ONLY=115
OPTION=116
OR=117
ORDER=118
ORDINALITY=119
OUTER=120
OUTPUT=121
OVER=122
PARTITION=123
PARTITIONS=124
POSITION=125
PRECEDING=126
PREPARE=127
PRIVILEGES=128
PROPERTIES=129
PUBLIC=130
RANGE=131
READ=132
RECURSIVE=133
RENAME=134
REPEATABLE=135
REPLACE=136
RESET=137
RESTRICT=138
REVOKE=139
RIGHT=140
ROLLBACK=141
ROLLUP=142
ROW=143
ROWS=144
SCHEMA=145
SCHEMAS=146
SECOND=147
SELECT=148
SERIALIZABLE=149
SESSION=150
SET=151
SETS=152
SHOW=153
SMALLINT=154
SOME=155
START=156
STATS=157
SUBSTRING=158
SYSTEM=159
TABLE=160
TABLES=161
TABLESAMPLE=162
TEXT=163
THEN=164
TIME=165
TIMESTAMP=166
TINYINT=167
TO=168
TRANSACTION=169
TRUE=170
TRY_CAST=171
TYPE=172
UESCAPE=173
UNBOUNDED=174
UNCOMMITTED=175
UNION=176
UNNEST=177
USE=178
USING=179
VALIDATE=180
VALUES=181
VERBOSE=182
VIEW=183
WHEN=184
WHERE=185
WITH=186
WORK=187
WRITE=188
YEAR=189
ZONE=190
EQ=191
NEQ=192
LT=193
LTE=194
GT=195
GTE=196
PLUS=197
MINUS=198
ASTERISK=199
SLASH=200
PERCENT=201
CONCAT=202
STRING=203
UNICODE_STRING=204
BINARY_LITERAL=205
INTEGER_VALUE=206
DECIMAL_VALUE=207
DOUBLE_VALUE=208
IDENTIFIER=209
DIGIT_IDENTIFIER=210
QUOTED_IDENTIFIER=211
BACKQUOTED_IDENTIFIER=212
TIME_WITH_TIME_ZONE=213
TIMESTAMP_WITH_TIME_ZONE=214
DOUBLE_PRECISION=215
SIMPLE_COMMENT=216
BRACKETED_COMMENT=217
WS=218
UNRECOGNIZED=219
'.'=1
'('=2
')'=3
','=4
'?'=5
'->'=6
'['=7
']'=8
'=>'=9
'ADD'=10
'ALL'=11
'ALTER'=12
'ANALYZE'=13
'AND'=14
'ANY'=15
'ARRAY'=16
'AS'=17
'ASC'=18
'AT'=19
'BERNOULLI'=20
'BETWEEN'=21
'BY'=22
'CALL'=23
'CASCADE'=24
'CASE'=25
'CAST'=26
'CATALOGS'=27
'COALESCE'=28
'COLUMN'=29
'COLUMNS'=30
'COMMENT'=31
'COMMIT'=32
'COMMITTED'=33
'CONSTRAINT'=34
'CREATE'=35
'CROSS'=36
'CUBE'=37
'CURRENT'=38
'CURRENT_DATE'=39
'CURRENT_TIME'=40
'CURRENT_TIMESTAMP'=41
'CURRENT_USER'=42
'DATA'=43
'DATE'=44
'DAY'=45
'DEALLOCATE'=46
'DELETE'=47
'DESC'=48
'DESCRIBE'=49
'DISTINCT'=50
'DISTRIBUTED'=51
'DROP'=52
'ELSE'=53
'END'=54
'ESCAPE'=55
'EXCEPT'=56
'EXCLUDING'=57
'EXECUTE'=58
'EXISTS'=59
'EXPLAIN'=60
'EXTRACT'=61
'FALSE'=62
'FILTER'=63
'FIRST'=64
'FOLLOWING'=65
'FOR'=66
'FORMAT'=67
'FROM'=68
'FULL'=69
'FUNCTIONS'=70
'GRANT'=71
'GRANTS'=72
'GRAPHVIZ'=73
'GROUP'=74
'GROUPING'=75
'HAVING'=76
'HOUR'=77
'IF'=78
'IN'=79
'INCLUDING'=80
'INNER'=81
'INPUT'=82
'INSERT'=83
'INTEGER'=84
'INTERSECT'=85
'INTERVAL'=86
'INTO'=87
'IS'=88
'ISOLATION'=89
'JOIN'=90
'LAST'=91
'LATERAL'=92
'LEFT'=93
'LEVEL'=94
'LIKE'=95
'LIMIT'=96
'LOCALTIME'=97
'LOCALTIMESTAMP'=98
'LOGICAL'=99
'MAP'=100
'MINUTE'=101
'MONTH'=102
'NATURAL'=103
'NFC'=104
'NFD'=105
'NFKC'=106
'NFKD'=107
'NO'=108
'NORMALIZE'=109
'NOT'=110
'NULL'=111
'NULLIF'=112
'NULLS'=113
'ON'=114
'ONLY'=115
'OPTION'=116
'OR'=117
'ORDER'=118
'ORDINALITY'=119
'OUTER'=120
'OUTPUT'=121
'OVER'=122
'PARTITION'=123
'PARTITIONS'=124
'POSITION'=125
'PRECEDING'=126
'PREPARE'=127
'PRIVILEGES'=128
'PROPERTIES'=129
'PUBLIC'=130
'RANGE'=131
'READ'=132
'RECURSIVE'=133
'RENAME'=134
'REPEATABLE'=135
'REPLACE'=136
'RESET'=137
'RESTRICT'=138
'REVOKE'=139
'RIGHT'=140
'ROLLBACK'=141
'ROLLUP'=142
'ROW'=143
'ROWS'=144
'SCHEMA'=145
'SCHEMAS'=146
'SECOND'=147
'SELECT'=148
'SERIALIZABLE'=149
'SESSION'=150
'SET'=151
'SETS'=152
'SHOW'=153
'SMALLINT'=154
'SOME'=155
'START'=156
'STATS'=157
'SUBSTRING'=158
'SYSTEM'=159
'TABLE'=160
'TABLES'=161
'TABLESAMPLE'=162
'TEXT'=163
'THEN'=164
'TIME'=165
'TIMESTAMP'=166
'TINYINT'=167
'TO'=168
'TRANSACTION'=169
'TRUE'=170
'TRY_CAST'=171
'TYPE'=172
'UESCAPE'=173
'UNBOUNDED'=174
'UNCOMMITTED'=175
'UNION'=176
'UNNEST'=177
'USE'=178
'USING'=179
'VALIDATE'=180
'VALUES'=181
'VERBOSE'=182
'VIEW'=183
'WHEN'=184
'WHERE'=185
'WITH'=186
'WORK'=187
'WRITE'=188
'YEAR'=189
'ZONE'=190
'='=191
'<'=193
'<='=194
'>'=195
'>='=196
'+'=197
'-'=198
'*'=199
'/'=200
'%'=201
'||'=202
How to generate Antlr runtime targets:
1. Install Antlr4. https://github.com/antlr/antlr4/blob/master/doc/getting-started.md
1. Download grammar. Presto: https://github.com/prestodb/presto/blob/master/presto-parser/src/main/antlr4/com/facebook/presto/sql/parser/SqlBase.g4
1. Generate Antlr runtime target https://github.com/antlr/antlr4/blob/master/doc/python-target.md
1. More on Antlr: https://github.com/antlr/antlr4/blob/master/doc/index.md
import logging
from antlr4 import InputStream, CommonTokenStream, ParseTreeWalker
from typing import Iterable, List # noqa: F401
from databuilder.sql_parser.usage.column import Column, Table, remove_double_quotes
from databuilder.sql_parser.usage.presto.antlr_generated.SqlBaseLexer import SqlBaseLexer
from databuilder.sql_parser.usage.presto.antlr_generated.SqlBaseListener import SqlBaseListener
from databuilder.sql_parser.usage.presto.antlr_generated.SqlBaseParser import SqlBaseParser
LOGGER = logging.getLogger(__name__)
class ColumnUsageListener(SqlBaseListener):
"""
ColumnUsageListener that inherits Antlr generated SqlBaseListener so that it can extract column and table usage
while ColumnUsageListener walks the parsing tree.
(Method name is from Antlr generated SqlBaseListener where it does not follow python convention)
Basic idea of column extraction is to look at SELECT statement as two parts.
1. processing columns: Columns in SELECT clause. (SELECT foo, bar )
2. processed columns: Columns in FROM clause. (FROM foobar or FROM (SELECT foo, bar from foobar) )
Overall, we'd like to retrieve processing column. Thus, the problem we need to solve is basically based on
processed column, finalize processing column by get the necessary info (such as table, schema) from processed
column.
"""
def __init__(self):
# type: () -> None
self.processed_cols = [] # type: List[Column]
self._processing_cols = [] # type: List[Column]
self._current_col = None # type: Column
self._stack = [] # type: List[Column]
def exitColumnReference(self,
ctx # type: SqlBaseParser.ColumnReferenceContext
):
# type: (...) -> None
"""
Call back method for column that does not have table indicator
:param ctx:
:return:
"""
self._current_col = Column(ctx.getText())
def exitDereference(self,
ctx # type: SqlBaseParser.DereferenceContext
):
# type: (...) -> None
"""
Call back method for column with table indicator e.g: foo.bar
:param ctx:
:return:
"""
self._current_col = Column(ctx.identifier().getText(),
table=Table(ctx.base.getText()))
def exitSelectSingle(self,
ctx # type: SqlBaseParser.SelectSingleContext
):
# type: (...) -> None
"""
Call back method for select single column. This is to distinguish
between columns for SELECT statement and columns for something else
such as JOIN statement
:param ctx:
:return:
"""
if not self._current_col:
return
if ctx.identifier():
self._current_col.col_alias = remove_double_quotes(ctx.identifier().getText())
self._processing_cols.append(self._current_col)
self._current_col = None
def exitSelectAll(self,
ctx # type: SqlBaseParser.SelectAllContext
):
# type: (...) -> None
"""
Call back method for select ALL column.
:param ctx:
:return:
"""
self._current_col = Column('*')
if ctx.qualifiedName():
self._current_col.table = Table(ctx.qualifiedName().getText())
self._processing_cols.append(self._current_col)
self._current_col = None
def exitTableName(self,
ctx # type: SqlBaseParser.TableNameContext
):
# type: (...) -> None
"""
Call back method for table name
:param ctx:
:return:
"""
table_name = ctx.getText()
table = Table(table_name)
if '.' in table_name:
db_tbl = table_name.split('.')
table = Table(db_tbl[len(db_tbl) - 1],
schema=db_tbl[len(db_tbl) - 2])
self._current_col = Column('*', table=table)
def exitAliasedRelation(self,
ctx # type: SqlBaseParser.AliasedRelationContext
):
# type: (...) -> None
"""
Call back method for table alias
:param ctx:
:return:
"""
if not ctx.identifier():
return
# Table alias for column
if self._current_col and self._current_col.table:
self._current_col.table.alias = remove_double_quotes(ctx.identifier().getText())
return
# Table alias for inner SQL
for col in self.processed_cols:
col.table.alias = remove_double_quotes(ctx.identifier().getText())
def exitRelationDefault(self,
ctx # type: SqlBaseParser.RelationDefaultContext
):
# type: (...) -> None
"""
Callback method when exiting FROM clause. Here we are moving processing columns to processed
to processed
:param ctx:
:return:
"""
if not self._current_col:
return
self.processed_cols.append(self._current_col)
self._current_col = None
def enterQuerySpecification(self,
ctx # type: SqlBaseParser.QuerySpecificationContext
):
# type: (...) -> None
"""
Callback method for Query specification. For nested SELECT
statement, it will store previous processing column to stack.
:param ctx:
:return:
"""
if not self._processing_cols:
return
self._stack.append(self._processing_cols)
self._processing_cols = []
def exitQuerySpecification(self,
ctx # type: SqlBaseParser.QuerySpecificationContext
):
# type: (...) -> None
"""
Call back method for Query specification. It merges processing
columns with processed column
:param ctx:
:return:
"""
if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug('processing_cols: {}'.format(self._processing_cols))
LOGGER.debug('processed_cols: {}'.format(self.processed_cols))
result = []
for col in self._processing_cols:
for resolved in Column.resolve(col, self.processed_cols):
result.append(resolved)
self.processed_cols = result
self._processing_cols = []
if self._stack:
self._processing_cols = self._stack.pop()
self._current_col = None
if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug('done processing_cols: {}'.format(self._processing_cols))
LOGGER.debug('done processed_cols: {}'.format(self.processed_cols))
class ColumnUsageProvider(object):
def __init__(self):
# type: () -> None
pass
@classmethod
def get_columns(cls, query):
# type: (str) -> Iterable[Column]
"""
Using presto Grammar, instantiate Parsetree, attach ColumnUsageListener to tree and walk the tree.
Once finished walking the tree, listener will have selected columns and return them.
:param query:
:return:
"""
query = query.rstrip(';').upper() + "\n"
lexer = SqlBaseLexer(InputStream(query))
parser = SqlBaseParser(CommonTokenStream(lexer))
parse_tree = parser.singleStatement()
listener = ColumnUsageListener()
walker = ParseTreeWalker()
walker.walk(listener, parse_tree)
return listener.processed_cols
try:
import copy_reg
except Exception:
import copyreg as copy_reg
import logging
import types
from multiprocessing.pool import Pool, TimeoutError
from pyhocon import ConfigTree # noqa: F401
from typing import Any, Optional, List, Iterable # noqa: F401
from databuilder import Scoped
from databuilder.extractor.hive_table_metadata_extractor import HiveTableMetadataExtractor
from databuilder.models.table_column_usage import TableColumnUsage, ColumnReader
from databuilder.sql_parser.usage.column import OrTable, Table # noqa: F401
from databuilder.sql_parser.usage.presto.column_usage_provider import ColumnUsageProvider
from databuilder.transformer.base_transformer import Transformer
LOGGER = logging.getLogger(__name__)
def _pickle_method(m):
"""
Pool needs to pickle method in order to pass it to separate process. This method is to define how to pickle method.
"""
if m.im_self is None:
return getattr, (m.im_class, m.im_func.func_name)
else:
return getattr, (m.im_self, m.im_func.func_name)
copy_reg.pickle(types.MethodType, _pickle_method)
class SqlToTblColUsageTransformer(Transformer):
"""
Note that it currently only supports Presto SQL.
A SQL to usage transformer where it transforms to ColumnReader that has column, user, count.
Currently it's collects on table level that column on same table will be de-duped.
In many cases, "from" clause does not contain schema and this will be fetched via table name -> schema name mapping
which it gets from Hive metastore. (Naming collision is disregarded as it needs column level to disambiguate)
Currently, ColumnUsageProvider could hang on certain SQL statement and as a short term solution it will timeout
processing statement at 10 seconds.
"""
# Config key
DATABASE_NAME = 'database'
CLUSTER_NAME = 'cluster'
SQL_STATEMENT_ATTRIBUTE_NAME = 'sql_stmt_attribute_name'
USER_EMAIL_ATTRIBUTE_NAME = 'user_email_attribute_name'
COLUMN_EXTRACTION_TIMEOUT_SEC = 'column_extraction_timeout_seconds'
LOG_ALL_EXTRACTION_FAILURES = 'log_all_extraction_failures'
total_counts = 0
failure_counts = 0
def init(self, conf):
# type: (ConfigTree) -> None
self._conf = conf
self._database = conf.get_string(SqlToTblColUsageTransformer.DATABASE_NAME)
self._cluster = conf.get_string(SqlToTblColUsageTransformer.CLUSTER_NAME, 'gold')
self._sql_stmt_attr = conf.get_string(SqlToTblColUsageTransformer.SQL_STATEMENT_ATTRIBUTE_NAME)
self._user_email_attr = conf.get_string(SqlToTblColUsageTransformer.USER_EMAIL_ATTRIBUTE_NAME)
self._tbl_to_schema_mapping = self._create_schema_by_table_mapping()
self._worker_pool = Pool(processes=1)
self._time_out_sec = conf.get_int(SqlToTblColUsageTransformer.COLUMN_EXTRACTION_TIMEOUT_SEC, 10)
LOGGER.info('Column extraction timeout: {} seconds'.format(self._time_out_sec))
self._log_all_extraction_failures = conf.get_bool(SqlToTblColUsageTransformer.LOG_ALL_EXTRACTION_FAILURES,
False)
def transform(self, record):
# type: (Any) -> Optional[TableColumnUsage]
SqlToTblColUsageTransformer.total_counts += 1
stmt = getattr(record, self._sql_stmt_attr)
email = getattr(record, self._user_email_attr)
result = [] # type: List[ColumnReader]
try:
columns = self._worker_pool.apply_async(ColumnUsageProvider.get_columns, (stmt,)).get(self._time_out_sec)
# LOGGER.info('Statement: {} ---> columns: {}'.format(stmt, columns))
except TimeoutError:
SqlToTblColUsageTransformer.failure_counts += 1
LOGGER.exception('Timed out while getting column usage from query: {}'.format(stmt))
LOGGER.info('Killing the thread.')
self._worker_pool.terminate()
self._worker_pool = Pool(processes=1)
LOGGER.info('Killed the thread.')
return None
except Exception:
SqlToTblColUsageTransformer.failure_counts += 1
if self._log_all_extraction_failures:
LOGGER.exception('Failed to get column usage from query: {}'.format(stmt))
return None
# Dedupe is needed to make it table level. TODO: Remove this once we are at column level
dedupe_tuples = set() # type: set
for col in columns:
sub_result = self._get_col_readers(table=col.table,
stmt=stmt,
email=email,
dedupe_tuples=dedupe_tuples)
result.extend(sub_result)
if not result:
return None
return TableColumnUsage(col_readers=result)
def _get_col_readers(self,
table, # type: Table
stmt, # type: str
email, # type: str
dedupe_tuples # type: set
):
# type: (...) -> Iterable[ColumnReader]
"""
Using table information, produce column reader with de-duping same table as it's from same statement
:param table:
:param stmt:
:param email:
:param dedupe_tuples:
:return:
"""
result = [] # type: List[ColumnReader]
self._get_col_readers_helper(table=table,
stmt=stmt,
email=email,
dedupe_tuples=dedupe_tuples,
result=result)
return result
def _get_col_readers_helper(self,
table, # type: Table
stmt, # type: str
email, # type: str
dedupe_tuples, # type: set
result # type: List[ColumnReader]
):
# type: (...) -> None
if not table:
logging.warn('table does not exist. statement: {}'.format(stmt))
return
if isinstance(table, OrTable):
for table in table.tables:
self._get_col_readers_helper(table=table,
stmt=stmt,
email=email,
dedupe_tuples=dedupe_tuples,
result=result)
return
if self._is_duplicate(table=table, email=email, dedupe_tuples=dedupe_tuples):
return
schema = self._get_schema(table)
if not schema:
return
result.append(ColumnReader(database=self._database,
cluster=self._cluster,
schema=schema,
table=table.name,
column='*',
user_email=email))
def _is_duplicate(self, table, email, dedupe_tuples):
# type: (Table, str, set) -> bool
"""
This is to only produce table level usage. TODO: Remove this
:param table:
:param email:
:param dedupe_tuples:
:return:
"""
dedupe_tuple = (self._database, self._get_schema(table), table.name, email)
if dedupe_tuple in dedupe_tuples:
return True
dedupe_tuples.add(dedupe_tuple)
return False
def get_scope(self):
# type: () -> str
return 'transformer.sql_to_tbl_col_usage'
def _create_schema_by_table_mapping(self):
# type: () -> dict
# TODO: Make extractor generic
table_metadata_extractor = HiveTableMetadataExtractor()
table_metadata_extractor.init(Scoped.get_scoped_conf(self._conf, table_metadata_extractor.get_scope()))
table_to_schema = {}
table_metadata = table_metadata_extractor.extract()
while table_metadata:
# TODO: deal with collision
table_to_schema[table_metadata.name.lower()] = table_metadata.schema.lower()
table_metadata = table_metadata_extractor.extract()
return table_to_schema
def _get_schema(self, table):
# type: (Table) -> Optional[str]
if table.schema:
return table.schema.lower()
return self._tbl_to_schema_mapping.get(table.name.lower())
def close(self):
# type: () -> None
LOGGER.info('Column usage stats: failure: {fail_count} out of total {total_count}'
.format(fail_count=SqlToTblColUsageTransformer.failure_counts,
total_count=SqlToTblColUsageTransformer.total_counts))
...@@ -2,7 +2,7 @@ import os ...@@ -2,7 +2,7 @@ import os
from setuptools import setup, find_packages from setuptools import setup, find_packages
__version__ = '2.1.0' __version__ = '2.2.0'
requirements_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'requirements.txt') requirements_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'requirements.txt')
......
...@@ -2,8 +2,6 @@ from mock import patch, Mock ...@@ -2,8 +2,6 @@ from mock import patch, Mock
import base64 import base64
import tempfile import tempfile
import unittest import unittest
import six
import pytest
from pyhocon import ConfigFactory from pyhocon import ConfigFactory
...@@ -188,10 +186,6 @@ class MockLoggingClient(): ...@@ -188,10 +186,6 @@ class MockLoggingClient():
return self.b return self.b
@pytest.mark.skipif(
six.PY3,
reason="Deactivated for Python3 because of issue [#40](https://github.com/lyft/amundsen/issues/40) (ANTLR for py3)"
)
class TestBigqueryUsageExtractor(unittest.TestCase): class TestBigqueryUsageExtractor(unittest.TestCase):
@patch('databuilder.extractor.base_bigquery_extractor.build') @patch('databuilder.extractor.base_bigquery_extractor.build')
......
import unittest
from mock import patch, MagicMock # noqa: F401
from pyhocon import ConfigFactory
import pytest
import six
if six.PY2:
from databuilder.extractor.table_column_usage_aggregate_extractor import TblColUsgAggExtractor, RAW_EXTRACTOR
from databuilder.models.table_column_usage import TableColumnUsage, ColumnReader
from databuilder.transformer.regex_str_replace_transformer import RegexStrReplaceTransformer
from databuilder.transformer.sql_to_table_col_usage_transformer import SqlToTblColUsageTransformer
@pytest.mark.skipif(
six.PY3,
reason="Deactivated for Python3 because of issue [#40](https://github.com/lyft/amundsen/issues/40) (ANTLR for py3)"
)
class TestTblColUsgAggExtractor(unittest.TestCase):
def test_aggregate(self):
# type: () -> None
with patch.object(RegexStrReplaceTransformer, 'init'),\
patch.object(SqlToTblColUsageTransformer, 'init'),\
patch.object(RegexStrReplaceTransformer, 'transform'),\
patch.object(SqlToTblColUsageTransformer, 'transform') as mock_sql_transform:
raw_extractor = MagicMock()
mock_raw_extractor = MagicMock()
raw_extractor.extract = mock_raw_extractor
raw_extractor.get_scope.return_value = 'foo'
# Just to iterate 5 times
mock_raw_extractor.side_effect = ['foo', 'bar', 'foo', 'bar', None]
conf = ConfigFactory.from_dict(
{RAW_EXTRACTOR: raw_extractor}
)
mock_sql_transform.side_effect = [
TableColumnUsage(col_readers=[ColumnReader(database='database', cluster='gold', schema='test_schema1',
table='test_table1', column='*',
user_email='john@example.com')]),
TableColumnUsage(col_readers=[ColumnReader(database='database', cluster='gold', schema='test_schema1',
table='test_table1', column='*',
user_email='john@example.com', read_count=2)]),
TableColumnUsage(col_readers=[ColumnReader(database='database', cluster='gold', schema='test_schema1',
table='test_table2', column='*',
user_email='john@example.com', read_count=5)]),
None]
extractor = TblColUsgAggExtractor()
extractor.init(conf)
actual = extractor.extract()
expected = TableColumnUsage(
col_readers=[
ColumnReader(database='database', cluster='gold', schema='test_schema1', table='test_table1',
column='*', user_email='john@example.com', read_count=3),
ColumnReader(database='database', cluster='gold', schema='test_schema1', table='test_table2',
column='*', user_email='john@example.com', read_count=5)])
self.assertEqual(expected.__repr__(), actual.__repr__())
if __name__ == '__main__':
unittest.main()
import pytest
import six
import unittest
from typing import no_type_check
from mock import patch
from pyhocon import ConfigFactory
from databuilder.extractor.hive_table_metadata_extractor import HiveTableMetadataExtractor
from databuilder.models.table_metadata import TableMetadata, ColumnMetadata
if six.PY2:
from databuilder.models.table_column_usage import TableColumnUsage, ColumnReader
from databuilder.transformer.sql_to_table_col_usage_transformer import SqlToTblColUsageTransformer
@pytest.mark.skipif(
six.PY3,
reason="Deactivated for Python3 because of issue [#40](https://github.com/lyft/amundsen/issues/40) (ANTLR for py3)"
)
class TestSqlToTblColUsageTransformer(unittest.TestCase):
@no_type_check
def test(self):
# type: () -> None
config = ConfigFactory.from_dict({
SqlToTblColUsageTransformer.DATABASE_NAME: 'database',
SqlToTblColUsageTransformer.USER_EMAIL_ATTRIBUTE_NAME: 'email',
SqlToTblColUsageTransformer.SQL_STATEMENT_ATTRIBUTE_NAME: 'statement'
})
with patch.object(HiveTableMetadataExtractor, 'extract') as mock_extract,\
patch.object(HiveTableMetadataExtractor, 'init'):
mock_extract.side_effect = [
TableMetadata('hive', 'gold', 'test_schema1', 'test_table1', 'test_table1', [
ColumnMetadata('test_id1', 'description of test_table1', 'bigint', 0),
ColumnMetadata('test_id2', 'description of test_id2', 'bigint', 1),
ColumnMetadata('is_active', None, 'boolean', 2),
ColumnMetadata('source', 'description of source', 'varchar', 3),
ColumnMetadata('etl_created_at', 'description of etl_created_at', 'timestamp', 4),
ColumnMetadata('ds', None, 'varchar', 5)]), None]
transformer = SqlToTblColUsageTransformer()
transformer.init(config)
foo = Foo(email='john@example.com', statement='SELECT foo, bar FROM test_table1')
actual = transformer.transform(foo)
expected = TableColumnUsage(col_readers=[ColumnReader(database=u'database', cluster=u'gold',
schema='test_schema1',
table='test_table1', column='*',
user_email='john@example.com')])
self.assertEqual(expected.__repr__(), actual.__repr__())
class Foo(object):
def __init__(self, email, statement):
# type: (str, str) -> None
self.email = email
self.statement = statement
if __name__ == '__main__':
unittest.main()
import logging
import pytest
import six
import unittest
from databuilder.sql_parser.usage.column import Column, Table, OrTable, remove_double_quotes
if six.PY2:
from databuilder.sql_parser.usage.presto.column_usage_provider import ColumnUsageProvider
@pytest.mark.skipif(
six.PY3,
reason="Deactivated for Python3 because of issue [#40](https://github.com/lyft/amundsen/issues/40) (ANTLR for py3)"
)
class TestColumnUsage(unittest.TestCase):
def setUp(self):
# type: () -> None
logging.basicConfig(level=logging.INFO)
def test_column_usage(self):
# type: () -> None
query = 'SELECT foo, bar FROM foobar;'
actual = ColumnUsageProvider.get_columns(query)
expected = [Column(name='FOO', table=Table(name='FOOBAR', schema=None, alias=None), col_alias=None),
Column(name='BAR', table=Table(name='FOOBAR', schema=None, alias=None), col_alias=None)]
self.assertEqual(expected.__repr__(), actual.__repr__())
def test_with_schema(self):
# type: () -> None
query = 'SELECT foo, bar FROM scm.foobar;'
actual = ColumnUsageProvider.get_columns(query)
expected = [Column(name='FOO', table=Table(name='FOOBAR', schema='SCM', alias=None), col_alias=None),
Column(name='BAR', table=Table(name='FOOBAR', schema='SCM', alias=None), col_alias=None)]
self.assertEqual(expected.__repr__(), actual.__repr__())
def test_join(self):
# type: () -> None
query = 'SELECT A, B FROM scm.FOO JOIN BAR ON FOO.A = BAR.B'
actual = ColumnUsageProvider.get_columns(query)
expected = [Column(name='A', table=OrTable(tables=[Table(name='FOO', schema='SCM', alias=None),
Table(name='BAR', schema=None, alias=None)]),
col_alias=None),
Column(name='B', table=OrTable(tables=[Table(name='FOO', schema='SCM', alias=None),
Table(name='BAR', schema=None, alias=None)]),
col_alias=None)]
self.assertEqual(expected.__repr__(), actual.__repr__())
def test_join_with_alias(self):
# type: () -> None
query = 'SELECT FOO.A, BAR.B FROM FOOTABLE AS FOO JOIN BARTABLE AS BAR ON FOO.A = BAR.A'
actual = ColumnUsageProvider.get_columns(query)
expected = [Column(name='A', table=Table(name='FOOTABLE', schema=None, alias='FOO'), col_alias=None),
Column(name='B', table=Table(name='BARTABLE', schema=None, alias='BAR'), col_alias=None)]
self.assertEqual(expected.__repr__(), actual.__repr__())
def test_inner_sql(self):
# type: () -> None
query = 'SELECT TMP1.A, B FROM (SELECT * FROM FOOBAR) AS TMP1'
actual = ColumnUsageProvider.get_columns(query)
expected = [Column(name='A', table=Table(name='FOOBAR', schema=None, alias='TMP1'), col_alias=None),
Column(name='B', table=Table(name='FOOBAR', schema=None, alias='TMP1'), col_alias=None)]
self.assertEqual(expected.__repr__(), actual.__repr__())
def test_inner_sql_col_alias(self):
# type: () -> None
query = 'SELECT TMP1.A, F FROM (SELECT A, B AS F, C FROM FOOBAR) AS TMP1'
actual = ColumnUsageProvider.get_columns(query)
expected = [Column(name='A', table=Table(name='FOOBAR', schema=None, alias='TMP1'), col_alias=None),
Column(name='B', table=Table(name='FOOBAR', schema=None, alias='TMP1'), col_alias='F')]
self.assertEqual(expected.__repr__(), actual.__repr__())
def test_table_alias(self):
# type: () -> None
query = """
SELECT A.* FROM FACT_RIDES A LEFT JOIN DIM_VEHICLES B ON A.VEHICLE_KEY = B.VEHICLE_KEY
WHERE B.RENTAL_PROVIDER_ID IS NOT NULL LIMIT 100
"""
actual = ColumnUsageProvider.get_columns(query)
expected = [Column(name='*', table=Table(name='FACT_RIDES', schema=None, alias='A'), col_alias=None)]
self.assertEqual(expected.__repr__(), actual.__repr__())
def test_inner_sql_table_alias(self):
# type: () -> None
query = """
SELECT col1, temp.col2 FROM (SELECT col1, col2, col3 FROM foobar) as temp
"""
actual = ColumnUsageProvider.get_columns(query)
expected = [Column(name='COL1', table=Table(name='FOOBAR', schema=None, alias='TEMP'), col_alias=None),
Column(name='COL2', table=Table(name='FOOBAR', schema=None, alias='TEMP'), col_alias=None)]
self.assertEqual(expected.__repr__(), actual.__repr__())
def test_alias_double_inner(self):
# type: () -> None
query = """\
SELECT cluster AS cluster,
date_trunc('day', CAST(ds AS TIMESTAMP)) AS __timestamp,
sum(p90_time) AS sum__p90_time
FROM
(select ds,
cluster,
approx_percentile(latency_mins, .50) as p50_time,
approx_percentile(latency_mins, .60) as p60_time,
approx_percentile(latency_mins, .70) as p70_time,
approx_percentile(latency_mins, .75) as p75_time,
approx_percentile(latency_mins, .80) as p80_time,
approx_percentile(latency_mins, .90) as p90_time,
approx_percentile(latency_mins, .95) as p95_time
from
(SELECT ds,
cluster_name as cluster,
query_id,
date_diff('second',query_starttime,query_endtime)/60.0 as latency_mins
FROM etl.hive_query_logs
WHERE date(ds) > date_add('day', -60, current_date)
AND environment = 'production'
AND operation_name = 'QUERY' )
group by ds,
cluster
order by ds) AS expr_qry
WHERE ds >= '2018-03-30 00:00:00'
AND ds <= '2018-05-29 23:29:57'
GROUP BY cluster,
date_trunc('day', CAST(ds AS TIMESTAMP))
ORDER BY sum__p90_time DESC
LIMIT 5000
"""
actual = ColumnUsageProvider.get_columns(query)
expected = [Column(name='CLUSTER_NAME', table=Table(name='HIVE_QUERY_LOGS', schema='ETL', alias='EXPR_QRY'),
col_alias='CLUSTER'),
Column(name='DS', table=OrTable(tables=[Table(name='HIVE_QUERY_LOGS', schema='ETL',
alias='EXPR_QRY'), None]), col_alias='__TIMESTAMP'),
Column(name='QUERY_ENDTIME', table=Table(name='HIVE_QUERY_LOGS', schema='ETL', alias='EXPR_QRY'),
col_alias='SUM__P90_TIME')]
self.assertEqual(expected.__repr__(), actual.__repr__())
def test_remove_double_quotes(self):
# type: () -> None
val = '"foo"'
actual = remove_double_quotes(val)
expected = 'foo'
self.assertEqual(expected, actual)
val = '"foo'
actual = remove_double_quotes(val)
expected = '"foo'
self.assertEqual(expected, actual)
val = 'foo'
actual = remove_double_quotes(val)
expected = 'foo'
self.assertEqual(expected, actual)
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment