Commit 2c2b843e authored by Tao feng's avatar Tao feng

initial commit

parents
*~
*.pyc
*.pyo
*.pyt
*.pytc
*.egg-info
.*.swp
.DS_Store
venv/
venv3/
.cache/
build/
.idea/
.coverage
.mypy_cache
.pytest_cache
**/coverage.xml
**/htmlcov/**
This project is governed by [Lyft's code of conduct](https://github.com/lyft/code-of-conduct).
All contributors and participants agree to abide by its terms.
This diff is collapsed.
clean:
find . -name \*.pyc -delete
find . -name __pycache__ -delete
rm -rf dist/
.PHONY: test_unit
test_unit: test_unit2
lint:
flake8 .
.PHONY: test
test: test_unit lint
.PHONY: test_unit
test_unit2:
python -m pytest tests/unit
test_unit3:
python3 -bb -m pytest tests/unit
amundsendatabuilder
Copyright 2018-2019 Lyft Inc.
This product includes software developed at Lyft Inc.
This diff is collapsed.
import abc
import six
from pyhocon import ConfigTree, ConfigFactory # noqa: F401
@six.add_metaclass(abc.ABCMeta)
class Scoped(object):
_EMPTY_CONFIG = ConfigFactory.from_dict({})
"""
An interface for class that works with scoped (nested) config.
https://github.com/chimpler/pyhocon
A scoped instance will use config within its scope. This is a way to
distribute configuration to its implementation instead of having it in
one central place.
This is very useful for DataBuilder as it has different components
(extractor, transformer, loader, publisher) and its component itself
could have different implementation.
For example these can be a configuration for two different extractors
"extractor.mysql.url" for MySQLExtractor
"extractor.filesystem.source_path" for FileSystemExtractor
For MySQLExtractor, if you defined scope as "extractor.mysql", scoped
config will basically reduce it to the config that is only for MySQL.
config.get("extractor.mysql") provides you all the config within
'extractor.mysql'. By removing outer context from the config,
MySQLExtractor is highly reusable.
"""
@abc.abstractmethod
def init(self, conf):
# type: (ConfigTree) -> None
"""
All scoped instance is expected to be lazily initialized. Means that
__init__ should not have any heavy operation such as service call.
The reason behind is that Databuilder is a code at the same time,
code itself is used as a configuration. For example, you can
instantiate scoped instance with all the parameters already set,
ready to run, and actual execution will be executing init() and
execute.
:param conf: Typesafe config instance
:return: None
"""
pass
@abc.abstractmethod
def get_scope(self):
# type: () -> str
"""
A scope for the config. Typesafe config supports nested config.
Scope, string, is used to basically peel off nested config
:return:
"""
return ''
def close(self):
# type: () -> None
"""
Anything that needs to be cleaned up after the use of the instance.
:return: None
"""
pass
@classmethod
def get_scoped_conf(cls, conf, scope):
# type: (ConfigTree, str) -> ConfigTree
"""
Convenient method to provide scoped method.
:param conf: Type safe config instance
:param scope: scope string
:return: Type safe config instance
"""
if not scope:
return Scoped._EMPTY_CONFIG
return conf.get(scope, Scoped._EMPTY_CONFIG)
import abc
from pyhocon import ConfigTree # noqa: F401
from typing import Any # noqa: F401
from databuilder import Scoped
class Extractor(Scoped):
"""
An extractor extracts record
"""
@abc.abstractmethod
def init(self, conf):
# type: (ConfigTree) -> None
pass
@abc.abstractmethod
def extract(self):
# type: () -> Any
"""
:return: Provides a record or None if no more to extract
"""
return None
def get_scope(self):
# type: () -> str
return 'extractor'
import importlib
import logging
from typing import Iterable, Any # noqa: F401
from pyhocon import ConfigTree # noqa: F401
from databuilder.extractor.base_extractor import Extractor
LOGGER = logging.getLogger(__name__)
class DBAPIExtractor(Extractor):
"""
Generic DB API extractor.
"""
CONNECTION_CONFIG_KEY = 'connection'
SQL_CONFIG_KEY = 'sql'
def init(self, conf):
# type: (ConfigTree) -> None
"""
Receives a {Connection} object and {sql} to execute.
An optional model class can be passed, in which, sql result row
would be converted to a class instance and returned to calling
function
:param conf:
:return:
"""
self.conf = conf
self.connection = conf.get(DBAPIExtractor.CONNECTION_CONFIG_KEY) # type: Any
self.cursor = self.connection.cursor()
self.sql = conf.get(DBAPIExtractor.SQL_CONFIG_KEY)
model_class = conf.get('model_class', None)
if model_class:
module_name, class_name = model_class.rsplit(".", 1)
mod = importlib.import_module(module_name)
self.model_class = getattr(mod, class_name)
self._iter = iter(self._execute_query())
def _execute_query(self):
# type: () -> Iterable[Any]
"""
Use cursor to execute the {sql}
:return:
"""
self.cursor.execute(self.sql)
return self.cursor.fetchall()
def extract(self):
# type: () -> Any
"""
Fetch one sql result row, convert to {model_class} if specified before
returning.
:return:
"""
try:
result = next(self._iter)
except StopIteration:
return None
if hasattr(self, 'model_class'):
obj = self.model_class(*result[:len(result)])
return obj
else:
return result
def close(self):
# type: () -> None
"""
close cursor and connection handlers
:return:
"""
try:
self.cursor.close()
self.connection.close()
except Exception as e:
LOGGER.warning("Exception encountered while closing up connection handler!", e)
def get_scope(self):
# type: () -> str
return 'extractor.dbapi'
import importlib
from typing import Iterable, Any # noqa: F401
from pyhocon import ConfigTree # noqa: F401
from databuilder.extractor.base_extractor import Extractor
class GenericExtractor(Extractor):
"""
Extractor to extract any arbitrary values from users.
"""
EXTRACTION_ITEMS = 'extraction_items'
def init(self, conf):
# type: (ConfigTree) -> None
"""
Receives a list of dictionaries which is used for extraction
:param conf:
:return:
"""
self.conf = conf
self.values = conf.get(GenericExtractor.EXTRACTION_ITEMS) # type: Iterable[Any]
model_class = conf.get('model_class', None)
if model_class:
module_name, class_name = model_class.rsplit(".", 1)
mod = importlib.import_module(module_name)
self.model_class = getattr(mod, class_name)
results = [self.model_class(**result)
for result in self.values]
self._iter = iter(results)
else:
raise RuntimeError('model class needs to be provided!')
def extract(self):
# type: () -> Any
"""
Fetch one sql result row, convert to {model_class} if specified before
returning.
:return:
"""
try:
result = next(self._iter)
return result
except StopIteration:
return None
def get_scope(self):
# type: () -> str
return 'extractor.generic'
This diff is collapsed.
import logging
from collections import namedtuple
from pyhocon import ConfigFactory, ConfigTree # noqa: F401
from typing import Iterator, Union, Dict, Any # noqa: F401
from databuilder import Scoped
from databuilder.extractor.base_extractor import Extractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.models.table_metadata import TableMetadata, ColumnMetadata
from itertools import groupby
TableKey = namedtuple('TableKey', ['schema_name', 'table_name'])
LOGGER = logging.getLogger(__name__)
class HiveTableMetadataExtractor(Extractor):
"""
Extracts Hive table and column metadata from underlying meta store database using SQLAlchemyExtractor
"""
# SELECT statement from hive metastore database to extract table and column metadata
# Below SELECT statement uses UNION to combining two queries together.
# 1st query is retrieving partition columns
# 2nd query is retrieving columns
# Using UNION to combine above two statements and order by table & partition identifier.
SQL_STATEMENT = """
SELECT source.* FROM
(SELECT t.TBL_ID, d.NAME as schema_name, t.TBL_NAME name, t.TBL_TYPE, tp.PARAM_VALUE as description,
p.PKEY_NAME as col_name, p.INTEGER_IDX as col_sort_order,
p.PKEY_TYPE as col_type, p.PKEY_COMMENT as col_description, 1 as "is_partition_col"
FROM TBLS t
JOIN DBS d ON t.DB_ID = d.DB_ID
JOIN PARTITION_KEYS p ON t.TBL_ID = p.TBL_ID
LEFT JOIN TABLE_PARAMS tp ON (t.TBL_ID = tp.TBL_ID AND tp.PARAM_KEY='comment')
{where_clause_suffix}
UNION
SELECT t.TBL_ID, d.NAME as schema_name, t.TBL_NAME name, t.TBL_TYPE, tp.PARAM_VALUE as description,
c.COLUMN_NAME as col_name, c.INTEGER_IDX as col_sort_order,
c.TYPE_NAME as col_type, c.COMMENT as col_description, 0 as "is_partition_col"
FROM TBLS t
JOIN DBS d ON t.DB_ID = d.DB_ID
JOIN SDS s ON t.SD_ID = s.SD_ID
JOIN COLUMNS_V2 c ON s.CD_ID = c.CD_ID
LEFT JOIN TABLE_PARAMS tp ON (t.TBL_ID = tp.TBL_ID AND tp.PARAM_KEY='comment')
{where_clause_suffix}
) source
ORDER by tbl_id, is_partition_col desc;
"""
# CONFIG KEYS
WHERE_CLAUSE_SUFFIX_KEY = 'where_clause_suffix'
CLUSTER_KEY = 'cluster'
DEFAULT_CONFIG = ConfigFactory.from_dict({WHERE_CLAUSE_SUFFIX_KEY: ' ',
CLUSTER_KEY: 'gold'})
def init(self, conf):
# type: (ConfigTree) -> None
conf = conf.with_fallback(HiveTableMetadataExtractor.DEFAULT_CONFIG)
self._cluster = '{}'.format(conf.get_string(HiveTableMetadataExtractor.CLUSTER_KEY))
self.sql_stmt = HiveTableMetadataExtractor.SQL_STATEMENT.format(
where_clause_suffix=conf.get_string(HiveTableMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY))
LOGGER.info('SQL for hive metastore: {}'.format(self.sql_stmt))
self._alchemy_extractor = SQLAlchemyExtractor()
sql_alch_conf = Scoped.get_scoped_conf(conf, self._alchemy_extractor.get_scope())\
.with_fallback(ConfigFactory.from_dict({SQLAlchemyExtractor.EXTRACT_SQL: self.sql_stmt}))
self._alchemy_extractor.init(sql_alch_conf)
self._extract_iter = None # type: Union[None, Iterator]
def extract(self):
# type: () -> Union[TableMetadata, None]
if not self._extract_iter:
self._extract_iter = self._get_extract_iter()
try:
return next(self._extract_iter)
except StopIteration:
return None
def get_scope(self):
# type: () -> str
return 'extractor.hive_table_metadata'
def _get_extract_iter(self):
# type: () -> Iterator[TableMetadata]
"""
Using itertools.groupby and raw level iterator, it groups to table and yields TableMetadata
:return:
"""
for key, group in groupby(self._get_raw_extract_iter(), self._get_table_key):
columns = []
for row in group:
last_row = row
columns.append(ColumnMetadata(row['col_name'], row['col_description'],
row['col_type'], row['col_sort_order']))
yield TableMetadata('hive', self._cluster,
last_row['schema_name'],
last_row['name'],
last_row['description'],
columns)
def _get_raw_extract_iter(self):
# type: () -> Iterator[Dict[str, Any]]
"""
Provides iterator of result row from SQLAlchemy extractor
:return:
"""
row = self._alchemy_extractor.extract()
while row:
yield row
row = self._alchemy_extractor.extract()
def _get_table_key(self, row):
# type: (Dict[str, Any]) -> Union[TableKey, None]
"""
Table key consists of schema and table name
:param row:
:return:
"""
if row:
return TableKey(schema_name=row['schema_name'], table_name=row['name'])
return None
import importlib
import time
from typing import Iterable, Any # noqa: F401
from pyhocon import ConfigTree # noqa: F401
from databuilder.extractor.generic_extractor import GenericExtractor
class Neo4jEsLastUpdatedExtractor(GenericExtractor):
"""
Extractor to extract last updated timestamp for neo4j and Es
"""
def init(self, conf):
# type: (ConfigTree) -> None
"""
Receives a list of dictionaries which is used for extraction
:param conf:
:return:
"""
self.conf = conf
model_class = conf.get('model_class', None)
if model_class:
module_name, class_name = model_class.rsplit(".", 1)
mod = importlib.import_module(module_name)
self.model_class = getattr(mod, class_name)
last_updated_timestamp = int(time.time())
result = {'timestamp': last_updated_timestamp}
results = [self.model_class(**result)]
self._iter = iter(results)
else:
raise RuntimeError('model class needs to be provided!')
def extract(self):
# type: () -> Any
"""
Fetch one sql result row, convert to {model_class} if specified before
returning.
:return:
"""
try:
result = next(self._iter)
return result
except StopIteration:
return None
def get_scope(self):
# type: () -> str
return 'extractor.neo4j_es_last_updated'
import importlib
import logging
from typing import Any, Iterator, Union # noqa: F401
from pyhocon import ConfigTree # noqa: F401
from neo4j.v1 import GraphDatabase
from databuilder.extractor.base_extractor import Extractor
LOGGER = logging.getLogger(__name__)
class Neo4jExtractor(Extractor):
"""
Extractor to fetch records from Neo4j Graph database
"""
CYPHER_QUERY_CONFIG_KEY = 'cypher_query'
GRAPH_URL_CONFIG_KEY = 'graph_url'
MODEL_CLASS_CONFIG_KEY = 'model_class'
NEO4J_AUTH_USER = 'neo4j_auth_user'
NEO4J_AUTH_PW = 'neo4j_auth_pw'
def init(self, conf):
# type: (ConfigTree) -> None
"""
Establish connections and import data model class if provided
:param conf:
"""
self.conf = conf
self.graph_url = conf.get_string(Neo4jExtractor.GRAPH_URL_CONFIG_KEY)
self.cypher_query = conf.get_string(Neo4jExtractor.CYPHER_QUERY_CONFIG_KEY)
self.driver = self._get_driver()
self._extract_iter = None # type: Union[None, Iterator]
model_class = conf.get(Neo4jExtractor.MODEL_CLASS_CONFIG_KEY, None)
if model_class:
module_name, class_name = model_class.rsplit(".", 1)
mod = importlib.import_module(module_name)
self.model_class = getattr(mod, class_name)
def close(self):
# type: () -> None
"""
close connection to neo4j cluster
"""
try:
self.driver.close()
except Exception as e:
LOGGER.error("Exception encountered while closing the graph driver", e)
def _get_driver(self):
# type: () -> Any
"""
Create a Neo4j connection to Database
"""
return GraphDatabase.driver(self.graph_url,
auth=(self.conf.get_string(Neo4jExtractor.NEO4J_AUTH_USER),
self.conf.get_string(Neo4jExtractor.NEO4J_AUTH_PW)))
def _execute_query(self, tx):
# type: (Any) -> Any
"""
Create an iterator to execute sql.
"""
result = tx.run(self.cypher_query)
return result
def _get_extract_iter(self):
# type: () -> Iterator[Any]
"""
Execute {cypher_query} and yield result one at a time
"""
with self.driver.session() as session:
if not hasattr(self, 'results'):
self.results = session.read_transaction(self._execute_query)
for result in self.results:
if hasattr(self, 'model_class'):
obj = self.model_class(**result)
yield obj
else:
yield result
def extract(self):
# type: () -> Any
"""
Return {result} object as it is or convert to object of
{model_class}, if specified.
"""
if not self._extract_iter:
self._extract_iter = self._get_extract_iter()
try:
return next(self._extract_iter)
except StopIteration:
return None
def get_scope(self):
# type: () -> str
return 'extractor.neo4j'
import textwrap
from typing import Any # noqa: F401
from pyhocon import ConfigTree # noqa: F401
from databuilder import Scoped
from databuilder.extractor.base_extractor import Extractor
from databuilder.extractor.neo4j_extractor import Neo4jExtractor
class Neo4jSearchDataExtractor(Extractor):
"""
Extractor to fetch data required to support search from Neo4j graph database
Use Neo4jExtractor extractor class
"""
CYPHER_QUERY_CONFIG_KEY = 'cypher_query'
DEFAULT_NEO4J_CYPHER_QUERY = textwrap.dedent(
"""
MATCH (db:Database)<-[:CLUSTER_OF]-(cluster:Cluster)<-[:SCHEMA_OF]-(schema:Schema)<-[:TABLE_OF]-(table:Table)
OPTIONAL MATCH (table)-[:DESCRIPTION]->(table_description:Description)
OPTIONAL MATCH (table)-[read:READ_BY]->(user:User)
OPTIONAL MATCH (table)-[:COLUMN]->(cols:Column)
OPTIONAL MATCH (cols)-[:DESCRIPTION]->(col_description:Description)
OPTIONAL MATCH (table)-[:TAGGED_BY]->(tags:Tag)
OPTIONAL MATCH (table)-[:LAST_UPDATED_AT]->(time_stamp:Timestamp)
RETURN db.name as database, cluster.name AS cluster, schema.name AS schema_name,
table.name AS table_name, table.key AS table_key, table_description.description AS table_description,
time_stamp.last_updated_timestamp AS table_last_updated_epoch,
EXTRACT(c in COLLECT(DISTINCT cols)| c.name) AS column_names,
EXTRACT(cd IN COLLECT(DISTINCT col_description)| cd.description) AS column_descriptions,
REDUCE(sum_r = 0, r in COLLECT(DISTINCT read)| sum_r + r.read_count) AS total_usage,
COUNT(DISTINCT user.email) as unique_usage,
COLLECT(DISTINCT tags.key) as tag_names
ORDER BY table.name;
"""
)
def init(self, conf):
# type: (ConfigTree) -> None
"""
Initialize Neo4jExtractor object from configuration and use that for extraction
"""
self.conf = conf
# extract cypher query from conf, if specified, else use default query
self.cypher_query = conf.get_string(Neo4jSearchDataExtractor.CYPHER_QUERY_CONFIG_KEY,
Neo4jSearchDataExtractor.DEFAULT_NEO4J_CYPHER_QUERY)
self.neo4j_extractor = Neo4jExtractor()
# write the cypher query in configs in Neo4jExtractor scope
key = self.neo4j_extractor.get_scope() + '.' + Neo4jExtractor.CYPHER_QUERY_CONFIG_KEY
self.conf.put(key, self.cypher_query)
# initialize neo4j_extractor from configs
self.neo4j_extractor.init(Scoped.get_scoped_conf(self.conf, self.neo4j_extractor.get_scope()))
def close(self):
# type: () -> None
"""
Use close() method specified by neo4j_extractor
to close connection to neo4j cluster
"""
self.neo4j_extractor.close()
def extract(self):
# type: () -> Any
"""
Invoke extract() method defined by neo4j_extractor
"""
return self.neo4j_extractor.extract()
def get_scope(self):
# type: () -> str
return 'extractor.search_data'
import importlib
from sqlalchemy import create_engine
from pyhocon import ConfigTree # noqa: F401
from typing import Any, Iterator # noqa: F401
from databuilder.extractor.base_extractor import Extractor
class SQLAlchemyExtractor(Extractor):
# Config keys
CONN_STRING = 'conn_string'
EXTRACT_SQL = 'extract_sql'
"""
An Extractor that extracts records via SQLAlchemy. Database that supports SQLAlchemy can use this extractor
"""
def init(self, conf):
# type: (ConfigTree) -> None
"""
Establish connections and import data model class if provided
:param conf:
"""
self.conf = conf
self.conn_string = conf.get_string(SQLAlchemyExtractor.CONN_STRING)
self.connection = self._get_connection()
self.extract_sql = conf.get_string(SQLAlchemyExtractor.EXTRACT_SQL)
model_class = conf.get('model_class', None)
if model_class:
module_name, class_name = model_class.rsplit(".", 1)
mod = importlib.import_module(module_name)
self.model_class = getattr(mod, class_name)
self._execute_query()
def _get_connection(self):
# type: () -> Any
"""
Create a SQLAlchemy connection to Database
"""
engine = create_engine(self.conn_string)
conn = engine.connect()
return conn
def _execute_query(self):
# type: () -> None
"""
Create an iterator to execute sql.
"""
if not hasattr(self, 'results'):
self.results = self.connection.execute(self.extract_sql)
if hasattr(self, 'model_class'):
results = [self.model_class(**result)
for result in self.results]
else:
results = self.results
self.iter = iter(results)
def extract(self):
# type: () -> Any
"""
Yield the sql result one at a time.
convert the result to model if a model_class is provided
"""
try:
return next(self.iter)
except StopIteration:
return None
except Exception as e:
raise e
def get_scope(self):
# type: () -> str
return 'extractor.sqlalchemy'
from collections import namedtuple
import logging
from pyhocon import ConfigTree # noqa: F401
from typing import Dict, List, Any, Optional # noqa: F401
from databuilder import Scoped
from databuilder.transformer.base_transformer import NoopTransformer
from databuilder.extractor.base_extractor import Extractor
from databuilder.models.table_column_usage import ColumnReader, TableColumnUsage
from databuilder.transformer.base_transformer import ChainedTransformer
from databuilder.transformer.regex_str_replace_transformer import RegexStrReplaceTransformer
from databuilder.transformer.sql_to_table_col_usage_transformer import SqlToTblColUsageTransformer
TableColumnUsageTuple = namedtuple('TableColumnUsageTuple', ['database', 'cluster', 'schema',
'table', 'column', 'email'])
LOGGER = logging.getLogger(__name__)
# Config keys:
RAW_EXTRACTOR = 'raw_extractor'
class TblColUsgAggExtractor(Extractor):
"""
An aggregate extractor for table column usage.
It uses RegexStrReplaceTransformer to cleanse SQL statement and uses SqlToTblColUsageTransformer to get table
column usage.
All usage will be aggregated in memory and on last record, it will return aggregated TableColumnUsage
Note that this extractor will do all the transformation and aggregation so that no more transformation is needed,
after this.
"""
def init(self, conf):
# type: (ConfigTree) -> None
self._extractor = conf.get(RAW_EXTRACTOR) # type: Extractor
self._extractor.init(Scoped.get_scoped_conf(conf, self._extractor.get_scope()))
regex_transformer = RegexStrReplaceTransformer() # type: Any
if conf.get(regex_transformer.get_scope(), None):
regex_transformer.init(Scoped.get_scoped_conf(conf, regex_transformer.get_scope()))
else:
LOGGER.info('{} is not defined. Not using it'.format(regex_transformer.get_scope()))
regex_transformer = NoopTransformer()
sql_to_usage_transformer = SqlToTblColUsageTransformer()
sql_to_usage_transformer.init(Scoped.get_scoped_conf(conf, sql_to_usage_transformer.get_scope()))
self._transformer = ChainedTransformer((regex_transformer, sql_to_usage_transformer))
def extract(self):
# type: () -> Optional[TableColumnUsage]
"""
It aggregates all count per table and user in memory. Table level aggregation don't expect to occupy much
memory.
:return: Provides a record or None if no more to extract
"""
count_map = {} # type: Dict[TableColumnUsageTuple, int]
record = self._extractor.extract()
count = 0
while record:
count += 1
if count % 1000 == 0:
LOGGER.info('Aggregated {} records'.format(count))
tbl_col_usg = self._transformer.transform(record=record)
record = self._extractor.extract()
# filtered case
if not tbl_col_usg:
continue
for col_rdr in tbl_col_usg.col_readers:
key = TableColumnUsageTuple(database=col_rdr.database, cluster=col_rdr.cluster, schema=col_rdr.schema,
table=col_rdr.table, column=col_rdr.column, email=col_rdr.user_email)
new_count = count_map.get(key, 0) + col_rdr.read_count
count_map[key] = new_count
if not len(count_map):
return None
col_readers = [] # type: List[ColumnReader]
while len(count_map):
tbl_col_rdr_tuple, count = count_map.popitem()
col_readers.append(ColumnReader(database=tbl_col_rdr_tuple.database, cluster=tbl_col_rdr_tuple.cluster,
schema=tbl_col_rdr_tuple.schema, table=tbl_col_rdr_tuple.table,
column=tbl_col_rdr_tuple.column, user_email=tbl_col_rdr_tuple.email,
read_count=count))
return TableColumnUsage(col_readers=col_readers)
def get_scope(self):
# type: () -> str
return 'extractor.table_column_usage_aggregate'
def close(self):
# type: () -> None
self._transformer.close()
import logging
from pyhocon import ConfigFactory, ConfigTree # noqa: F401
from retrying import retry
from typing import List # noqa: F401
from databuilder import Scoped
from databuilder.filesystem.metadata import FileMetadata
LOGGER = logging.getLogger(__name__)
CLIENT_ERRORS = {'ClientError', 'FileNotFoundError', 'ParamValidationError'}
def is_client_side_error(e):
# type: (Exception) -> bool
"""
An method that determines if the error is client side error within FileSystem context
:param e:
:return:
"""
return e.__class__.__name__ in CLIENT_ERRORS
def is_retriable_error(e):
# type: (Exception) -> bool
"""
An method that determines if the error is retriable error within FileSystem context
:param e:
:return:
"""
return not is_client_side_error(e)
class FileSystem(Scoped):
"""
An high level file system, that utilizes Dask File system.
http://docs.dask.org/en/latest/remote-data-services.html
All remote call leverages retry against any failure. https://pypi.org/project/retrying/
"""
# METADATA KEYS
LAST_UPDATED = 'last_updated'
SIZE = 'size'
# CONFIG KEYS
DASK_FILE_SYSTEM = 'dask_file_system'
# File metadata that is provided via info(path) method on Dask file system provides a dictionary. As dictionary
# does not guarantee same key across different implementation, user can provide key mapping.
FILE_METADATA_MAPPING_KEY = 'file_metadata_mapping'
default_metadata_mapping = {LAST_UPDATED: 'LastModified',
SIZE: 'Size'}
DEFAULT_CONFIG = ConfigFactory.from_dict({FILE_METADATA_MAPPING_KEY: default_metadata_mapping})
def init(self,
conf # type: ConfigTree
):
# type: (...) -> None
"""
Initialize Filesystem with DASK file system instance
Dask file system supports multiple remote storage such as S3, HDFS, Google cloud storage,
Azure Datalake, etc
http://docs.dask.org/en/latest/remote-data-services.html
https://github.com/dask/s3fs
https://github.com/dask/hdfs3
...
:param conf: hocon config
:return:
"""
self._conf = conf.with_fallback(FileSystem.DEFAULT_CONFIG)
self._dask_fs = self._conf.get(FileSystem.DASK_FILE_SYSTEM)
self._metadata_key_mapping = self._conf.get(FileSystem.FILE_METADATA_MAPPING_KEY).as_plain_ordered_dict()
@retry(retry_on_exception=is_retriable_error, stop_max_attempt_number=3, wait_exponential_multiplier=1000,
wait_exponential_max=5000)
def ls(self, path):
# type: (str) -> List[str]
"""
A scope for the config. Typesafe config supports nested config.
Scope, string, is used to basically peel off nested config
:return:
"""
return self._dask_fs.ls(path)
@retry(retry_on_exception=is_retriable_error, stop_max_attempt_number=3, wait_exponential_multiplier=1000,
wait_exponential_max=5000)
def is_file(self, path):
# type: (str) -> bool
contents = self._dask_fs.ls(path)
return len(contents) == 1 and contents[0] == path
@retry(retry_on_exception=is_retriable_error, stop_max_attempt_number=3, wait_exponential_multiplier=1000,
wait_exponential_max=5000)
def info(self, path):
# type: (str) -> FileMetadata
"""
Metadata information about the file. It utilizes _metadata_key_mapping when fetching metadata so that it can
deal with different keys
:return:
"""
metadata_dict = self._dask_fs.info(path)
fm = FileMetadata(path=path,
last_updated=metadata_dict[self._metadata_key_mapping[FileSystem.LAST_UPDATED]],
size=metadata_dict[self._metadata_key_mapping[FileSystem.SIZE]])
return fm
def get_scope(self):
# type: () -> str
return 'filesystem'
from datetime import datetime # noqa: F401
class FileMetadata(object):
def __init__(self,
path, # type: str
last_updated, # type: datetime
size # type: int
):
# type: (...) -> None
self.path = path
self.last_updated = last_updated
self.size = size
def __repr__(self):
# type: () -> str
return """FileMetadata(path={!r}, last_updated={!r}, size={!r})""" \
.format(self.path, self.last_updated, self.size)
import abc
from pyhocon import ConfigTree # noqa: F401
from databuilder import Scoped
from databuilder.utils.closer import Closer
class Job(Scoped):
closer = Closer()
"""
A Databuilder job that represents single work unit.
"""
@abc.abstractmethod
def init(self, conf):
# type: (ConfigTree) -> None
pass
@abc.abstractmethod
def launch(self):
# type: () -> None
"""
Launch a job
:return: None
"""
pass
def get_scope(self):
# type: () -> str
return 'job'
import logging
from pyhocon import ConfigTree # noqa: F401
from statsd import StatsClient
from databuilder import Scoped
from databuilder.job.base_job import Job
from databuilder.publisher.base_publisher import NoopPublisher
from databuilder.publisher.base_publisher import Publisher # noqa: F401
from databuilder.task.base_task import Task # noqa: F401
LOGGER = logging.getLogger(__name__)
class DefaultJob(Job):
# Config keys
IS_STATSD_ENABLED = 'is_statsd_enabled'
JOB_IDENTIFIER = 'identifier'
"""
Default job that expects a task, and optional publisher
If configured job will emit success/fail metric counter through statsd where prefix will be
amundsen.databuilder.job.[identifier] .
Note that job.identifier is part of metrics prefix and choose unique & readable identifier for the job.
To configure statsd itself, use environment variable: https://statsd.readthedocs.io/en/v3.2.1/configure.html
"""
def __init__(self,
conf,
task,
publisher=NoopPublisher()):
# type: (Task, ConfigTree, Publisher) -> None
self.task = task
self.conf = conf
self.publisher = publisher
self.scoped_conf = Scoped.get_scoped_conf(self.conf,
self.get_scope())
if self.scoped_conf.get_bool(DefaultJob.IS_STATSD_ENABLED, False):
prefix = 'amundsen.databuilder.job.{}'.format(self.scoped_conf.get_string(DefaultJob.JOB_IDENTIFIER))
LOGGER.info('Setting statsd for job metrics with prefix: {}'.format(prefix))
self.statsd = StatsClient(prefix=prefix)
else:
self.statsd = None
def init(self, conf):
# type: (ConfigTree) -> None
pass
def _init(self):
# type: () -> None
self.task.init(self.conf)
def launch(self):
# type: () -> None
"""
Launch a job by initializing job, run task and publish
:return:
"""
logging.info('Launching a job')
# Using nested try finally to make sure task get closed as soon as possible as well as to guarantee all the
# closeable get closed.
try:
is_success = True
self._init()
try:
self.task.run()
finally:
self.task.close()
self.publisher.init(Scoped.get_scoped_conf(self.conf, self.publisher.get_scope()))
Job.closer.register(self.publisher.close)
self.publisher.publish()
except Exception as e:
is_success = False
raise e
finally:
# TODO: If more metrics are needed on different construct, such as task, consider abstracting this out
if self.statsd:
if is_success:
LOGGER.info('Publishing job metrics for success')
self.statsd.incr('success')
else:
LOGGER.info('Publishing job metrics for failure')
self.statsd.incr('fail')
Job.closer.close()
logging.info('Job completed')
import abc
from pyhocon import ConfigTree # noqa: F401
from databuilder import Scoped
from typing import Any # noqa: F401
class Loader(Scoped):
"""
A loader loads to the destination or to the staging area
"""
@abc.abstractmethod
def init(self, conf):
# type: (ConfigTree) -> None
pass
@abc.abstractmethod
def load(self, record):
# type: (Any) -> None
pass
def get_scope(self):
# type: () -> str
return 'loader'
import csv
import logging
from pyhocon import ConfigTree # noqa: F401
from typing import Any # noqa: F401
from databuilder.loader.base_loader import Loader
class FileSystemCSVLoader(Loader):
"""
Loader class to write csv files to Local FileSystem
"""
def init(self, conf):
# type: (ConfigTree) -> None
"""
Initialize file handlers from conf
:param conf:
"""
self.conf = conf
self.file_path = self.conf.get_string('file_path')
self.file_mode = self.conf.get_string('mode', 'w')
self.file_handler = open(self.file_path, self.file_mode)
def load(self, record):
# type: (Any) -> None
"""
Write record object as csv to file
:param record:
:return:
"""
if not record:
return
if not hasattr(self, 'writer'):
self.writer = csv.DictWriter(self.file_handler,
fieldnames=vars(record).keys())
self.writer.writeheader()
self.writer.writerow(vars(record))
self.file_handler.flush()
def close(self):
# type: () -> None
"""
Close file handlers
:return:
"""
try:
self.file_handler.close()
except Exception as e:
logging.warning("Failed trying to close a file handler! %s",
str(e))
def get_scope(self):
# type: () -> str
return "loader.filesystem.csv"
import os
from pyhocon import ConfigTree # noqa: F401
from databuilder.loader.base_loader import Loader
from databuilder.models.elasticsearch_document import ElasticsearchDocument
class FSElasticsearchJSONLoader(Loader):
"""
Loader class to produce Elasticsearch bulk load file to Local FileSystem
"""
FILE_PATH_CONFIG_KEY = 'file_path'
FILE_MODE_CONFIG_KEY = 'mode'
def init(self, conf):
# type: (ConfigTree) -> None
"""
:param conf:
:return:
"""
self.conf = conf
self.file_path = self.conf.get_string(FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY)
self.file_mode = self.conf.get_string(FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY, 'w')
file_dir = self.file_path.rsplit('/', 1)[0]
self._ensure_directory_exists(file_dir)
self.file_handler = open(self.file_path, self.file_mode)
def _ensure_directory_exists(self, path):
# type: (str) -> None
"""
Check to ensure file directory exists; create the directories otherwise
:param path:
:return: None
"""
if os.path.exists(path):
return # nothing to do here
os.makedirs(path)
def load(self, record):
# type: (ElasticsearchDocument) -> None
"""
Write a record in json format to file
:param record:
:return:
"""
if not record:
return
if not isinstance(record, ElasticsearchDocument):
raise Exception("Record not of type 'ElasticsearchDocument'!")
self.file_handler.write(record.to_json())
self.file_handler.flush()
def close(self):
# type: () -> None
"""
close the file handler
:return:
"""
self.file_handler.close()
def get_scope(self):
# type: () -> str
return 'loader.filesystem.elasticsearch'
import csv
import logging
import os
import shutil
from csv import DictWriter # noqa: F401
from pyhocon import ConfigTree, ConfigFactory # noqa: F401
from typing import Dict, Any # noqa: F401
from databuilder.job.base_job import Job
from databuilder.loader.base_loader import Loader
from databuilder.models.neo4j_csv_serde import NODE_LABEL, \
RELATION_START_LABEL, RELATION_END_LABEL, RELATION_TYPE
from databuilder.models.neo4j_csv_serde import Neo4jCsvSerializable # noqa: F401
from databuilder.utils.closer import Closer
LOGGER = logging.getLogger(__name__)
class FsNeo4jCSVLoader(Loader):
"""
Write node and relationship CSV file(s) that can be consumed by
Neo4jCsvPublisher.
It assumes that the record it consumes is instance of Neo4jCsvSerializable
"""
# Config keys
NODE_DIR_PATH = 'node_dir_path'
RELATION_DIR_PATH = 'relationship_dir_path'
SHOULD_DELETE_CREATED_DIR = 'delete_created_directories'
_DEFAULT_CONFIG = ConfigFactory.from_dict({
SHOULD_DELETE_CREATED_DIR: True
})
def __init__(self):
# type: () -> None
self._node_file_mapping = {} # type: Dict[Any, DictWriter]
self._relation_file_mapping = {} # type: Dict[Any, DictWriter]
self._closer = Closer()
def init(self, conf):
# type: (ConfigTree) -> None
"""
Initializing FsNeo4jCsvLoader by creating directory for node files
and relationship files. Note that the directory defined in
configuration should not exist.
:param conf:
:return:
"""
conf = conf.with_fallback(FsNeo4jCSVLoader._DEFAULT_CONFIG)
self._node_dir = conf.get_string(FsNeo4jCSVLoader.NODE_DIR_PATH)
self._relation_dir = \
conf.get_string(FsNeo4jCSVLoader.RELATION_DIR_PATH)
self._delete_created_dir = \
conf.get_bool(FsNeo4jCSVLoader.SHOULD_DELETE_CREATED_DIR)
self._create_directory(self._node_dir)
self._create_directory(self._relation_dir)
def _create_directory(self, path):
# type: (str) -> None
"""
Validate directory does not exist, creates it, register deletion of
created directory function to Job.closer.
:param path:
:return:
"""
if os.path.exists(path):
raise RuntimeError(
'Directory should not exist: {}'.format(path))
os.makedirs(path)
def _delete_dir():
# type: () -> None
if not self._delete_created_dir:
LOGGER.warn('Skip Deleting directory {}'.format(path))
return
LOGGER.info('Deleting directory {}'.format(path))
shutil.rmtree(path)
# Directory should be deleted after publish is finished
Job.closer.register(_delete_dir)
def load(self, csv_serializable):
# type: (Neo4jCsvSerializable) -> None
"""
Writes Neo4jCsvSerializable into CSV files.
There are multiple CSV files that this method writes.
This is because there're not only node and relationship, but also it
can also have different nodes, and relationships.
Common pattern for both nodes and relations:
1. retrieve csv row (a dict where keys represent a header,
values represent a row)
2. using this dict to get a appropriate csv writer and write to it.
3. repeat 1 and 2
:param csv_serializable:
:return:
"""
node_dict = csv_serializable.next_node()
while node_dict:
key = (node_dict[NODE_LABEL], len(node_dict))
file_suffix = '{}_{}'.format(*key)
node_writer = self._get_writer(node_dict,
self._node_file_mapping,
key,
self._node_dir,
file_suffix)
node_writer.writerow(node_dict)
node_dict = csv_serializable.next_node()
relation_dict = csv_serializable.next_relation()
while relation_dict:
key2 = (relation_dict[RELATION_START_LABEL],
relation_dict[RELATION_END_LABEL],
relation_dict[RELATION_TYPE],
len(relation_dict))
file_suffix = '{}_{}_{}'.format(key2[0], key2[1], key2[2])
relation_writer = self._get_writer(relation_dict,
self._relation_file_mapping,
key2,
self._relation_dir,
file_suffix)
relation_writer.writerow(relation_dict)
relation_dict = csv_serializable.next_relation()
def _get_writer(self,
csv_record_dict, # type: Dict[str, Any]
file_mapping, # type: Dict[Any, DictWriter]
key, # type: Any
dir_path, # type: str
file_suffix # type: str
):
# type: (...) -> DictWriter
"""
Finds a writer based on csv record, key.
If writer does not exist, it's creates a csv writer and update the
mapping.
:param csv_record_dict:
:param file_mapping:
:param key:
:param file_suffix:
:return:
"""
writer = file_mapping.get(key)
if writer:
return writer
LOGGER.info('Creating file for {}'.format(key))
file_out = open('{}/{}.csv'.format(dir_path, file_suffix), 'w')
def file_out_close():
# type: () -> None
LOGGER.info('Closing file IO {}'.format(file_out))
file_out.close()
self._closer.register(file_out_close)
writer = csv.DictWriter(file_out, fieldnames=csv_record_dict.keys(),
quoting=csv.QUOTE_NONNUMERIC)
writer.writeheader()
file_mapping[key] = writer
return writer
def close(self):
# type: () -> None
"""
Any closeable callable registered in _closer, it will close.
:return:
"""
self._closer.close()
def get_scope(self):
# type: () -> str
return "loader.filesystem_csv_neo4j"
from typing import Any, Dict, List, Union # noqa: F401
from databuilder.models.neo4j_csv_serde import Neo4jCsvSerializable, NODE_KEY, \
NODE_LABEL, RELATION_START_KEY, RELATION_START_LABEL, RELATION_END_KEY, \
RELATION_END_LABEL, RELATION_TYPE, RELATION_REVERSE_TYPE
from databuilder.models.table_metadata import TableMetadata
class Application(Neo4jCsvSerializable):
"""
Application-table matching model (Airflow task and table)
"""
APPLICATION_LABEL = 'Application'
APPLICATION_KEY_FORMAT = 'application://{cluster}.airflow/{dag}/{task}'
APPLICATION_URL_NAME = 'application_url'
APPLICATION_NAME = 'name'
APPLICATION_ID = 'id'
APPLICATION_ID_FORMAT = '{dag_id}/{task_id}'
APPLICATION_DESCRIPTION = 'description'
APPLICATION_TYPE = 'Airflow'
APPLICATION_TABLE_RELATION_TYPE = 'GENERATES'
TABLE_APPLICATION_RELATION_TYPE = 'DERIVED_FROM'
def __init__(self,
task_id, # type: str
dag_id, # type: str,
application_url_template, # type: str
exec_date, # type: str
):
# type: (...) -> None
self.task = task_id
# todo: need to modify this hack
self.application_url = application_url_template.format(dag_id=dag_id)
self.database, self.schema, self.table = task_id.split('.')
self.dag = dag_id
self._node_iter = iter(self.create_nodes())
self._relation_iter = iter(self.create_relation())
def create_next_node(self):
# type: (...) -> Union[Dict[str, Any], None]
# creates new node
try:
return next(self._node_iter)
except StopIteration:
return None
def create_next_relation(self):
# type: (...) -> Union[Dict[str, Any], None]
try:
return next(self._relation_iter)
except StopIteration:
return None
def get_table_model_key(self):
# type: (...) -> str
# returns formatted string for table name
return TableMetadata.TABLE_KEY_FORMAT.format(db=self.database,
schema=self.schema,
tbl=self.table,
cluster='gold')
def get_application_model_key(self):
# type: (...) -> str
# returns formatting string for application of type dag
return Application.APPLICATION_KEY_FORMAT.format(cluster='gold',
dag=self.dag,
task=self.task)
def create_nodes(self):
# type: () -> List[Dict[str, Any]]
"""
Create a list of Neo4j node records
:return:
"""
results = []
results.append({
NODE_KEY: self.get_application_model_key(),
NODE_LABEL: Application.APPLICATION_LABEL,
Application.APPLICATION_URL_NAME: self.application_url,
Application.APPLICATION_NAME: Application.APPLICATION_TYPE,
Application.APPLICATION_DESCRIPTION:
'{app_type} with id {id}'.format(app_type=Application.APPLICATION_TYPE,
id=Application.APPLICATION_ID_FORMAT.format(dag_id=self.dag,
task_id=self.task)),
Application.APPLICATION_ID: Application.APPLICATION_ID_FORMAT.format(dag_id=self.dag,
task_id=self.task)
})
return results
def create_relation(self):
# type: () -> List[Dict[str, Any]]
"""
Create a list of relation map between watermark record with original hive table
:return:
"""
results = [{
RELATION_START_KEY: self.get_table_model_key(),
RELATION_START_LABEL: TableMetadata.TABLE_NODE_LABEL,
RELATION_END_KEY: self.get_application_model_key(),
RELATION_END_LABEL: Application.APPLICATION_LABEL,
RELATION_TYPE: Application.TABLE_APPLICATION_RELATION_TYPE,
RELATION_REVERSE_TYPE: Application.APPLICATION_TABLE_RELATION_TYPE
}]
return results
import json
from typing import List, Optional # noqa: F401
class ElasticsearchDocument:
"""
Schema for the Search index document
"""
def __init__(self,
elasticsearch_index, # type: str
elasticsearch_type, # type: str
database, # type: str
cluster, # type: str
schema_name, # type: str
table_name, # type: str
table_key, # type: str
table_description, # type: str
table_last_updated_epoch, # type: Optional[int]
column_names, # type: List[str]
column_descriptions, # type: List[str]
total_usage, # type: int
unique_usage, # type: int
tag_names, # type: List[str]
):
# type: (...) -> None
self.elasticsearch_index = elasticsearch_index
self.elasticsearch_type = elasticsearch_type
self.database = database
self.cluster = cluster
self.schema_name = schema_name
self.table_name = table_name
self.table_key = table_key
self.table_description = table_description
self.table_last_updated_epoch = table_last_updated_epoch
self.column_names = column_names
self.column_descriptions = column_descriptions
self.total_usage = total_usage
self.unique_usage = unique_usage
# todo: will include tag_type once we have better understanding from UI flow.
self.tag_names = tag_names
def to_json(self):
# type: () -> str
"""
Convert object to json for elasticsearch bulk upload
Bulk load JSON format is defined here:
https://www.elastic.co/guide/en/elasticsearch/reference/6.2/docs-bulk.html
:return:
"""
index_row = dict(index=dict(_index=self.elasticsearch_index,
_type=self.elasticsearch_type))
data = json.dumps(index_row) + "\n"
# convert rest of the object
obj_dict = {k: v for k, v in sorted(self.__dict__.items())
if k not in ['elasticsearch_index', 'elasticsearch_type']}
data += json.dumps(obj_dict) + "\n"
return data
from typing import Any, Dict, List, Union # noqa: F401
from databuilder.models.neo4j_csv_serde import Neo4jCsvSerializable, NODE_KEY, \
NODE_LABEL, RELATION_START_KEY, RELATION_START_LABEL, RELATION_END_KEY, \
RELATION_END_LABEL, RELATION_TYPE, RELATION_REVERSE_TYPE
class HiveWatermark(Neo4jCsvSerializable):
# type: (...) -> None
"""
Hive table watermark result model.
Each instance represents one row of hive watermark result.
"""
LABEL = 'Watermark'
KEY_FORMAT = 'hive://{cluster}.{schema}' \
'/{table}/{part_type}/'
WATERMARK_TABLE_RELATION_TYPE = 'BELONG_TO_TABLE'
TABLE_WATERMARK_RELATION_TYPE = 'WATERMARK'
def __init__(self,
create_time, # type: str
schema_name, # type: str
table_name, # type: str
part_name, # type: str
part_type='high_watermark', # type: str
cluster='gold', # type: str
):
# type: (...) -> None
self.create_time = create_time
self.schema = schema_name.lower()
self.table = table_name.lower()
self.parts = [] # type: list
if '=' not in part_name:
raise Exception('Only partition table has high watermark')
# currently we don't consider nested partitions
idx = part_name.find('=')
name, value = part_name.lower()[:idx], part_name.lower()[idx + 1:]
self.parts = [(name, value)]
self.part_type = part_type.lower()
self.cluster = cluster.lower()
self._node_iter = iter(self.create_nodes())
self._relation_iter = iter(self.create_relation())
def create_next_node(self):
# type: (...) -> Union[Dict[str, Any], None]
# return the string representation of the data
try:
return next(self._node_iter)
except StopIteration:
return None
def create_next_relation(self):
# type: (...) -> Union[Dict[str, Any], None]
try:
return next(self._relation_iter)
except StopIteration:
return None
def get_watermark_model_key(self):
# type: (...) -> str
return HiveWatermark.KEY_FORMAT.format(cluster=self.cluster,
schema=self.schema,
table=self.table,
part_type=self.part_type)
def get_metadata_model_key(self):
# type: (...) -> str
return 'hive://{cluster}.{schema}/{table}'.format(cluster=self.cluster,
schema=self.schema,
table=self.table)
def create_nodes(self):
# type: () -> List[Dict[str, Any]]
"""
Create a list of Neo4j node records
:return:
"""
results = []
for part in self.parts:
results.append({
NODE_KEY: self.get_watermark_model_key(),
NODE_LABEL: HiveWatermark.LABEL,
'partition_key': part[0],
'partition_value': part[1],
'create_time': self.create_time
})
return results
def create_relation(self):
# type: () -> List[Dict[str, Any]]
"""
Create a list of relation map between watermark record with original hive table
:return:
"""
results = [{
RELATION_START_KEY: self.get_watermark_model_key(),
RELATION_START_LABEL: HiveWatermark.LABEL,
RELATION_END_KEY: self.get_metadata_model_key(),
RELATION_END_LABEL: 'Table',
RELATION_TYPE: HiveWatermark.WATERMARK_TABLE_RELATION_TYPE,
RELATION_REVERSE_TYPE: HiveWatermark.TABLE_WATERMARK_RELATION_TYPE
}]
return results
import abc
import six
from typing import Dict, Set, Any, Union # noqa: F401
NODE_KEY = 'KEY'
NODE_LABEL = 'LABEL'
NODE_REQUIRED_HEADERS = {NODE_LABEL, NODE_KEY}
RELATION_START_KEY = 'START_KEY'
RELATION_START_LABEL = 'START_LABEL'
RELATION_END_KEY = 'END_KEY'
RELATION_END_LABEL = 'END_LABEL'
RELATION_TYPE = 'TYPE'
RELATION_REVERSE_TYPE = 'REVERSE_TYPE'
RELATION_REQUIRED_HEADERS = {RELATION_START_KEY, RELATION_START_LABEL,
RELATION_END_KEY, RELATION_END_LABEL,
RELATION_TYPE, RELATION_REVERSE_TYPE}
LABELS = {NODE_LABEL, RELATION_START_LABEL, RELATION_END_LABEL}
TYPES = {RELATION_TYPE, RELATION_REVERSE_TYPE}
@six.add_metaclass(abc.ABCMeta)
class Neo4jCsvSerializable(object):
"""
A Serializable abstract class asks subclass to implement next node or
next relation in dict form so that it can be serialized to CSV file.
Any model class that needs to be pushed to Neo4j should inherit this class.
"""
def __init__(self):
# type: () -> None
pass
@abc.abstractmethod
def create_next_node(self):
# type: () -> Union[Dict[str, Any], None]
"""
Creates dict where keys represent header in CSV and value represents
row in CSV file. Should the class could have different types of
nodes that it needs to serialize, it just needs to provide dict with
different header -- the one who consumes this class figures it out and
serialize to different file.
Node is Neo4j's term of Vertex in Graph. More information on
https://neo4j.com/docs/developer-manual/current/introduction/
graphdb-concepts/
:return: a dict or None if no more record to serialize
"""
raise NotImplementedError
@abc.abstractmethod
def create_next_relation(self):
# type: () -> Union[Dict[str, Any], None]
"""
Creates dict where keys represent header in CSV and value represents
row in CSV file. Should the class could have different types of
relations that it needs to serialize, it just needs to provide dict
with different header -- the one who consumes this class figures it
out and serialize to different file.
Relationship is Neo4j's term of Edge in Graph. More information on
https://neo4j.com/docs/developer-manual/current/introduction/
graphdb-concepts/
:return: a dict or None if no more record to serialize
"""
raise NotImplementedError
def next_node(self):
# type: () -> Union[Dict[str, Any], None]
"""
Provides node(vertex) in dict form.
Note that subsequent call can create different header (dict.keys())
which implicitly mean that it needs to be serialized in different
CSV file (as CSV is in fixed header)
:return: Non-nested dict where key is CSV header and each value
is a column
"""
node_dict = self.create_next_node()
if not node_dict:
return None
self._validate(NODE_REQUIRED_HEADERS, node_dict)
return node_dict
def next_relation(self):
# type: () -> Union[Dict[str, Any], None]
"""
Provides relation(edge) in dict form.
Note that subsequent call can create different header (dict.keys())
which implicitly mean that it needs to be serialized in different
CSV file (as CSV is in fixed header)
:return: Non-nested dict where key is CSV header and each value
is a column
"""
relation_dict = self.create_next_relation()
if not relation_dict:
return None
self._validate(RELATION_REQUIRED_HEADERS, relation_dict)
return relation_dict
def _validate(self, required_set, val_dict):
# type: (Set[str], Dict[str, Any]) -> None
"""
Validates dict that represents CSV header and a row.
- Checks if it has required headers for either Node or Relation
- Checks value of LABEL if only first character is upper case
- Checks value of TYPE if it's all upper case characters
:param required_set:
:param val_dict:
:return:
"""
required_count = 0
for header_col, val_col in \
((header_col, val_col) for header_col, val_col
in six.iteritems(val_dict) if header_col in required_set):
required_count += 1
if header_col in LABELS:
if not val_col.istitle():
raise RuntimeError(
'LABEL should only have upper case character on its '
'first one: {}'.format(val_col))
elif header_col in TYPES:
if not val_col == val_col.upper():
raise RuntimeError(
'TYPE needs to be upper case: '.format(val_col))
if required_count != len(required_set):
raise RuntimeError(
'Required header missing. Required: {} , Header: {}'.format(
required_set, val_dict.keys()))
from typing import List, Optional # noqa: F401
class Neo4jDataResult:
"""
Neo4j Graph data model
CYPHER QUERY returns one column per row
"""
def __init__(self,
database, # type: str
cluster, # type: str
schema_name, # type: str
table_name, # type: str
table_key, # type: str
table_description, # type: str
table_last_updated_epoch, # type: Optional[int]
column_names, # type: List[str]
column_descriptions, # type: List[str]
total_usage, # type: int
unique_usage, # type: int
tag_names, # type: List[str]
):
# type: (...) -> None
self.database = database
self.cluster = cluster
self.schema_name = schema_name
self.table_name = table_name
self.table_key = table_key
self.table_description = table_description
self.table_last_updated_epoch = int(table_last_updated_epoch) if table_last_updated_epoch else None
self.column_names = column_names
self.column_descriptions = column_descriptions
self.total_usage = total_usage
self.unique_usage = unique_usage
self.tag_names = tag_names
from typing import Any, Dict, List, Union # noqa: F401
from databuilder.models.neo4j_csv_serde import Neo4jCsvSerializable, NODE_KEY, NODE_LABEL
class Neo4jESLastUpdated(Neo4jCsvSerializable):
# type: (...) -> None
"""
Data model to keep track the last updated timestamp for
neo4j and es.
"""
LABEL = 'Updatedtimestamp'
KEY = 'amundsen_updated_timestamp'
LATEST_TIMESTAMP = 'latest_timestmap'
def __init__(self,
timestamp, # type: int
):
# type: (...) -> None
"""
:param timestamp: epoch for latest updated timestamp for neo4j an es
"""
self.timestamp = timestamp
self._node_iter = iter(self.create_nodes())
self._rel_iter = iter(self.create_relation())
def create_next_node(self):
# type: (...) -> Union[Dict[str, Any], None]
"""
Will create an orphan node for last updated timestamp.
:return:
"""
try:
return next(self._node_iter)
except StopIteration:
return None
def create_nodes(self):
# type: () -> List[Dict[str, Any]]
"""
Create a list of Neo4j node records.
:return:
"""
return [{
NODE_KEY: Neo4jESLastUpdated.KEY,
NODE_LABEL: Neo4jESLastUpdated.LABEL,
Neo4jESLastUpdated.LATEST_TIMESTAMP: self.timestamp
}]
def create_next_relation(self):
# type: () -> Union[Dict[str, Any], None]
"""
:return:
"""
try:
return next(self._rel_iter)
except StopIteration:
return None
def create_relation(self):
# type: () -> List[Dict[str, Any]]
return []
class PrestoQueryLogs:
"""
Presto Query logs model.
Sql result has one row per presto query.
"""
def __init__(self,
user, # type: str
query_text, # type: str
occurred_at # type: str
):
# type: (...) -> None
self.user = user
self.query_text = query_text
self.occurred_at = occurred_at
from typing import Iterable, Union, Dict, Any, Iterator # noqa: F401
from databuilder.models.neo4j_csv_serde import (
Neo4jCsvSerializable, NODE_KEY, NODE_LABEL, RELATION_START_KEY, RELATION_END_KEY,
RELATION_START_LABEL, RELATION_END_LABEL, RELATION_TYPE, RELATION_REVERSE_TYPE
)
from databuilder.models.table_metadata import TableMetadata
from databuilder.publisher.neo4j_csv_publisher import UNQUOTED_SUFFIX
class ColumnReader(object):
"""
A class represent user's read action on column. Implicitly assumes that read count is one.
"""
def __init__(self,
database, # type: str
cluster, # type: str
schema, # type: str
table, # type: str
column, # type: str
user_email, # type: str
read_count=1 # type: int
):
# type: (...) -> None
self.database = database.lower()
self.cluster = cluster.lower()
self.schema = schema.lower()
self.table = table.lower()
self.column = column.lower()
self.user_email = user_email.lower()
self.read_count = read_count
def __repr__(self):
# type: () -> str
return """\
ColumnReader(database={!r}, cluster={!r}, schema={!r}, table={!r}, column={!r}, user_email={!r}, read_count={!r})"""\
.format(self.database, self.cluster, self.schema, self.table, self.column, self.user_email, self.read_count)
class TableColumnUsage(Neo4jCsvSerializable):
"""
A model represents user <--> column graph model
Currently it only support to serialize to table level
"""
USER_NODE_LABEL = 'User'
USER_NODE_KEY_FORMAT = '{email}'
USER_NODE_EMAIL = 'email'
TABLE_NODE_LABEL = TableMetadata.TABLE_NODE_LABEL
TABLE_NODE_KEY_FORMAT = TableMetadata.TABLE_KEY_FORMAT
USER_TABLE_RELATION_TYPE = 'READ'
TABLE_USER_RELATION_TYPE = 'READ_BY'
# Property key for relationship read, readby relationship
READ_RELATION_COUNT = 'read_count{}'.format(UNQUOTED_SUFFIX)
def __init__(self,
col_readers, # type: Iterable[ColumnReader]
):
# type: (...) -> None
for col_reader in col_readers:
if col_reader.column != '*':
raise NotImplementedError('Column is not supported yet {}'.format(col_readers))
self.col_readers = col_readers
self._node_iterator = self._create_next_node()
self._rel_iter = self._create_rel_iterator()
def create_next_node(self):
# type: () -> Union[Dict[str, Any], None]
try:
return next(self._node_iterator)
except StopIteration:
return None
def _create_next_node(self):
# type: () -> Iterator[Any]
for col_reader in self.col_readers:
if col_reader.column != '*':
raise NotImplementedError('Column is not supported yet {}'.format(col_reader))
yield {
NODE_LABEL: TableColumnUsage.USER_NODE_LABEL,
NODE_KEY: self._get_user_key(col_reader.user_email),
TableColumnUsage.USER_NODE_EMAIL: col_reader.user_email
}
def create_next_relation(self):
# type: () -> Union[Dict[str, Any], None]
try:
return next(self._rel_iter)
except StopIteration:
return None
def _create_rel_iterator(self):
# type: () -> Iterator[Any]
for col_reader in self.col_readers:
yield {
RELATION_START_LABEL: TableMetadata.TABLE_NODE_LABEL,
RELATION_END_LABEL: TableColumnUsage.USER_NODE_LABEL,
RELATION_START_KEY: self._get_table_key(col_reader),
RELATION_END_KEY: self._get_user_key(col_reader.user_email),
RELATION_TYPE: TableColumnUsage.TABLE_USER_RELATION_TYPE,
RELATION_REVERSE_TYPE: TableColumnUsage.USER_TABLE_RELATION_TYPE,
TableColumnUsage.READ_RELATION_COUNT: col_reader.read_count
}
def _get_table_key(self, col_reader):
# type: (ColumnReader) -> str
return TableMetadata.TABLE_KEY_FORMAT.format(db=col_reader.database,
cluster=col_reader.cluster,
schema=col_reader.schema,
tbl=col_reader.table)
def _get_user_key(self, email):
# type: (str) -> str
return TableColumnUsage.USER_NODE_KEY_FORMAT.format(email=email)
def __repr__(self):
# type: () -> str
return 'TableColumnUsage(col_readers={!r})'.format(self.col_readers)
from typing import Any, Dict, List, Union # noqa: F401
from databuilder.models.neo4j_csv_serde import Neo4jCsvSerializable, NODE_KEY, \
NODE_LABEL, RELATION_START_KEY, RELATION_START_LABEL, RELATION_END_KEY, \
RELATION_END_LABEL, RELATION_TYPE, RELATION_REVERSE_TYPE
from databuilder.models.table_metadata import TableMetadata
class TableLastUpdated(Neo4jCsvSerializable):
# constants
LAST_UPDATED_NODE_LABEL = 'Timestamp'
LAST_UPDATED_KEY_FORMAT = '{db}://{cluster}.{schema}/{tbl}/timestamp'
TIMESTAMP_PROPERTY = 'last_updated_timestamp'
TIMESTAMP_NAME_PROPERTY = 'name'
TABLE_LASTUPDATED_RELATION_TYPE = 'LAST_UPDATED_AT'
LASTUPDATED_TABLE_RELATION_TYPE = 'LAST_UPDATED_TIME_OF'
def __init__(self,
table_name, # type: str
last_updated_time_epoch, # type: int
schema_name, # type: str
db='hive', # type: str
cluster='gold' # type: str
):
# type: (...) -> None
self.table_name = table_name
self.last_updated_time = int(last_updated_time_epoch)
self.schema = schema_name
self.db = db
self.cluster = cluster
self._node_iter = iter(self.create_nodes())
self._relation_iter = iter(self.create_relation())
def __repr__(self):
# type: (...) -> str
return \
"""TableLastUpdated(table_name={!r}, last_updated_time={!r}, schema_name={!r}, db={!r}, cluster={!r})"""\
.format(self.table_name, self.last_updated_time, self.schema, self.db, self.cluster)
def create_next_node(self):
# type: (...) -> Union[Dict[str, Any], None]
# creates new node
try:
return next(self._node_iter)
except StopIteration:
return None
def create_next_relation(self):
# type: (...) -> Union[Dict[str, Any], None]
try:
return next(self._relation_iter)
except StopIteration:
return None
def get_table_model_key(self):
# type: (...) -> str
# returns formatted string for table name
return TableMetadata.TABLE_KEY_FORMAT.format(db=self.db,
cluster=self.cluster,
schema=self.schema,
tbl=self.table_name)
def get_last_updated_model_key(self):
# type: (...) -> str
# returns formatted string for last updated name
return TableLastUpdated.LAST_UPDATED_KEY_FORMAT.format(db=self.db,
cluster=self.cluster,
schema=self.schema,
tbl=self.table_name)
def create_nodes(self):
# type: () -> List[Dict[str, Any]]
"""
Create a list of Neo4j node records
:return:
"""
results = []
results.append({
NODE_KEY: self.get_last_updated_model_key(),
NODE_LABEL: TableLastUpdated.LAST_UPDATED_NODE_LABEL,
TableLastUpdated.TIMESTAMP_PROPERTY: self.last_updated_time,
TableLastUpdated.TIMESTAMP_NAME_PROPERTY: TableLastUpdated.TIMESTAMP_PROPERTY
})
return results
def create_relation(self):
# type: () -> List[Dict[str, Any]]
"""
Create a list of relations mapping last updated node with table node
:return:
"""
results = [{
RELATION_START_KEY: self.get_table_model_key(),
RELATION_START_LABEL: TableMetadata.TABLE_NODE_LABEL,
RELATION_END_KEY: self.get_last_updated_model_key(),
RELATION_END_LABEL: TableLastUpdated.LAST_UPDATED_NODE_LABEL,
RELATION_TYPE: TableLastUpdated.TABLE_LASTUPDATED_RELATION_TYPE,
RELATION_REVERSE_TYPE: TableLastUpdated.LASTUPDATED_TABLE_RELATION_TYPE
}]
return results
This diff is collapsed.
from typing import Any, Dict, List, Union # noqa: F401
from databuilder.models.neo4j_csv_serde import Neo4jCsvSerializable, NODE_KEY, \
NODE_LABEL, RELATION_START_KEY, RELATION_START_LABEL, RELATION_END_KEY, \
RELATION_END_LABEL, RELATION_TYPE, RELATION_REVERSE_TYPE
from databuilder.models.table_column_usage import TableColumnUsage
class TableOwner(Neo4jCsvSerializable):
# type: (...) -> None
"""
Hive table owner model.
"""
OWNER_TABLE_RELATION_TYPE = 'OWNER_OF'
TABLE_OWNER_RELATION_TYPE = 'OWNER'
def __init__(self,
db_name, # type: str
schema_name, # type: str
table_name, # type: str
owners, # type: List
cluster='gold', # type: str
):
# type: (...) -> None
self.db = db_name.lower()
self.schema = schema_name.lower()
self.table = table_name.lower()
self.owners = owners
self.cluster = cluster.lower()
self._node_iter = iter(self.create_nodes())
self._relation_iter = iter(self.create_relation())
def create_next_node(self):
# type: (...) -> Union[Dict[str, Any], None]
# return the string representation of the data
try:
return next(self._node_iter)
except StopIteration:
return None
def create_next_relation(self):
# type: (...) -> Union[Dict[str, Any], None]
try:
return next(self._relation_iter)
except StopIteration:
return None
def get_owner_model_key(self, owner # type: str
):
# type: (...) -> str
return TableColumnUsage.USER_NODE_KEY_FORMAT.format(email=owner)
def get_metadata_model_key(self):
# type: (...) -> str
return '{db}://{cluster}.{schema}/{table}'.format(db=self.db,
cluster=self.cluster,
schema=self.schema,
table=self.table)
def create_nodes(self):
# type: () -> List[Dict[str, Any]]
"""
Create a list of Neo4j node records
:return:
"""
results = []
for owner in self.owners:
if owner:
results.append({
NODE_KEY: self.get_owner_model_key(owner),
NODE_LABEL: TableColumnUsage.USER_NODE_LABEL,
TableColumnUsage.USER_NODE_EMAIL: owner
})
return results
def create_relation(self):
# type: () -> List[Dict[str, Any]]
"""
Create a list of relation map between owner record with original hive table
:return:
"""
results = []
for owner in self.owners:
results.append({
RELATION_START_KEY: self.get_owner_model_key(owner),
RELATION_START_LABEL: TableColumnUsage.USER_NODE_LABEL,
RELATION_END_KEY: self.get_metadata_model_key(),
RELATION_END_LABEL: 'Table',
RELATION_TYPE: TableOwner.OWNER_TABLE_RELATION_TYPE,
RELATION_REVERSE_TYPE: TableOwner.TABLE_OWNER_RELATION_TYPE
})
return results
def __repr__(self):
# type: () -> str
return 'TableOwner({!r}, {!r}, {!r}, {!r}, {!r})'.format(self.db,
self.cluster,
self.schema,
self.table,
self.owners)
from typing import Any, Dict, List, Union # noqa: F401
from databuilder.models.neo4j_csv_serde import Neo4jCsvSerializable, NODE_KEY, \
NODE_LABEL, RELATION_START_KEY, RELATION_START_LABEL, RELATION_END_KEY, \
RELATION_END_LABEL, RELATION_TYPE, RELATION_REVERSE_TYPE
from databuilder.models.table_metadata import ColumnMetadata
class TableColumnStats(Neo4jCsvSerializable):
# type: (...) -> None
"""
Hive table stats model.
Each instance represents one row of hive watermark result.
"""
LABEL = 'Stat'
KEY_FORMAT = '{db}://{cluster}.{schema}' \
'/{table}/{col}/{stat_name}/'
STAT_Column_RELATION_TYPE = 'STAT_OF'
Column_STAT_RELATION_TYPE = 'STAT'
def __init__(self,
table_name, # type: str
col_name, # type: str
stat_name, # type: str
stat_val, # type: str
start_epoch, # type: str
end_epoch, # type: str
db='hive', # type: str
cluster='gold', # type: str
):
# type: (...) -> None
self.db = db
self.schema, self.table = table_name.lower().split('.')
self.col_name = col_name.lower()
self.start_epoch = start_epoch
self.end_epoch = end_epoch
self.cluster = cluster
self.stat_name = stat_name
self.stat_val = stat_val
self._node_iter = iter(self.create_nodes())
self._relation_iter = iter(self.create_relation())
def create_next_node(self):
# type: (...) -> Union[Dict[str, Any], None]
# return the string representation of the data
try:
return next(self._node_iter)
except StopIteration:
return None
def create_next_relation(self):
# type: (...) -> Union[Dict[str, Any], None]
try:
return next(self._relation_iter)
except StopIteration:
return None
def get_table_stat_model_key(self):
# type: (...) -> str
return TableColumnStats.KEY_FORMAT.format(db=self.db,
cluster=self.cluster,
schema=self.schema,
table=self.table,
col=self.col_name,
stat_name=self.stat_name)
def get_col_key(self):
# type: (...) -> str
# no cluster, schema info from the input
return ColumnMetadata.COLUMN_KEY_FORMAT.format(db=self.db,
cluster=self.cluster,
schema=self.schema,
tbl=self.table,
col=self.col_name)
def create_nodes(self):
# type: () -> List[Dict[str, Any]]
"""
Create a list of Neo4j node records
:return:
"""
results = [{
NODE_KEY: self.get_table_stat_model_key(),
NODE_LABEL: TableColumnStats.LABEL,
'stat_val:UNQUOTED': self.stat_val,
'stat_name': self.stat_name,
'start_epoch': self.start_epoch,
'end_epoch': self.end_epoch,
}]
return results
def create_relation(self):
# type: () -> List[Dict[str, Any]]
"""
Create a list of relation map between table stat record with original hive table
:return:
"""
results = [{
RELATION_START_KEY: self.get_table_stat_model_key(),
RELATION_START_LABEL: TableColumnStats.LABEL,
RELATION_END_KEY: self.get_col_key(),
RELATION_END_LABEL: ColumnMetadata.COLUMN_NODE_LABEL,
RELATION_TYPE: TableColumnStats.STAT_Column_RELATION_TYPE,
RELATION_REVERSE_TYPE: TableColumnStats.Column_STAT_RELATION_TYPE
}]
return results
import abc
from pyhocon import ConfigTree # noqa: F401
from databuilder import Scoped
class Publisher(Scoped):
"""
A Publisher that writes dataset (not a record) in Atomic manner,
if possible.
(Either success or fail, no partial state)
Use case: If you want to use neo4j import util or Load CSV util,
that takes CSV file to load database, you need to first create CSV file.
CSV file holds number of records, and loader can writes multiple records
to it. Once loader finishes writing CSV file, you have complete CSV file,
ready to publish to Neo4j. Publisher can take the location of CSV file,
and push to Neo4j.
"""
@abc.abstractmethod
def init(self, conf):
# type: (ConfigTree) -> None
pass
@abc.abstractmethod
def publish(self):
# type: () -> None
pass
def get_scope(self):
# type: () -> str
return 'publisher'
class NoopPublisher(Publisher):
def __init__(self):
# type: () -> None
pass
def init(self, conf):
# type: (ConfigTree) -> None
pass
def publish(self):
# type: () -> None
pass
def get_scope(self):
# type: () -> str
return 'publisher.noop'
import json
import logging
import textwrap
from typing import List # noqa: F401
from pyhocon import ConfigTree # noqa: F401
from elasticsearch.exceptions import NotFoundError
from databuilder.publisher.base_publisher import Publisher
LOGGER = logging.getLogger(__name__)
class ElasticsearchPublisher(Publisher):
"""
Elasticsearch Publisher uses Bulk API to load data from JSON file.
A new index is created and data is uploaded into it. After the upload
is complete, index alias is swapped to point to new index from old index
and traffic is routed to new index.
Old index is deleted after the alias swap is complete
"""
FILE_PATH_CONFIG_KEY = 'file_path'
FILE_MODE_CONFIG_KEY = 'mode'
ELASTICSEARCH_CLIENT_CONFIG_KEY = 'client'
ELASTICSEARCH_NEW_INDEX_CONFIG_KEY = 'new_index'
ELASTICSEARCH_ALIAS_CONFIG_KEY = 'alias'
ELASTICSEARCH_MAPPING_CONFIG_KEY = 'mapping'
# Specifying default mapping for elasticsearch index
# Documentation: https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping.html
# Setting type to "text" for all fields that would be used in search
# Using Simple Analyzer to convert all text into search terms
# https://www.elastic.co/guide/en/elasticsearch/reference/current/analysis-simple-analyzer.html
# Standard Analyzer is used for all text fields that don't explicitly specify an analyzer
# https://www.elastic.co/guide/en/elasticsearch/reference/current/analysis-standard-analyzer.html
DEFAULT_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent(
"""
{
"mappings":{
"table":{
"properties": {
"table_name": {
"type":"text",
"analyzer": "simple",
"fields": {
"raw": {
"type": "keyword"
}
}
},
"schema_name": {
"type":"text",
"analyzer": "simple",
"fields": {
"raw": {
"type": "keyword"
}
}
},
"table_last_updated_epoch": {
"type": "date",
"format": "epoch_second"
},
"table_description": {
"type": "text",
"analyzer": "simple"
},
"column_names": {
"type":"text",
"analyzer": "simple",
"fields": {
"raw": {
"type": "keyword"
}
}
},
"column_descriptions": {
"type": "text",
"analyzer": "simple"
},
"tag_names": {
"type": "keyword"
},
"cluster": {
"type": "text"
},
"database": {
"type": "text"
},
"table_key": {
"type": "keyword"
},
"total_usage":{
"type": "long"
},
"unique_usage": {
"type": "long"
}
}
}
}
}
"""
)
def __init__(self):
# type: () -> None
pass
def init(self, conf):
# type: (ConfigTree) -> None
self.conf = conf
self.file_path = self.conf.get_string(ElasticsearchPublisher.FILE_PATH_CONFIG_KEY)
self.file_mode = self.conf.get_string(ElasticsearchPublisher.FILE_MODE_CONFIG_KEY, 'w')
self.elasticsearch_client = self.conf.get(ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY)
self.elasticsearch_new_index = self.conf.get(ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY)
self.elasticsearch_alias = self.conf.get(ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY)
self.elasticsearch_mapping = self.conf.get(ElasticsearchPublisher.ELASTICSEARCH_MAPPING_CONFIG_KEY,
ElasticsearchPublisher.DEFAULT_ELASTICSEARCH_INDEX_MAPPING)
self.file_handler = open(self.file_path, self.file_mode)
def _fetch_old_index(self):
# type: () -> List[str]
"""
Retrieve all indices that currently have {elasticsearch_alias} alias
:return: list of elasticsearch indices
"""
try:
indices = self.elasticsearch_client.indices.get_alias(self.elasticsearch_alias).keys()
return indices
except NotFoundError as ne:
LOGGER.warn("Received index not found error from Elasticsearch", exc_info=True)
# return empty list on exception
return []
def publish(self):
# type: () -> None
"""
Use Elasticsearch Bulk API to load data from file to a {new_index}.
After upload, swap alias from {old_index} to {new_index} in a atomic operation
to route traffic to {new_index}
"""
actions = [json.loads(l) for l in self.file_handler.readlines()]
# ensure new data exists
if not actions:
LOGGER.warning("received no data to upload to Elasticsearch!")
return
# create new index with mapping
self.elasticsearch_client.indices.create(index=self.elasticsearch_new_index, body=self.elasticsearch_mapping)
# bulk upload data
self.elasticsearch_client.bulk(actions)
# fetch indices that have {elasticsearch_alias} as alias
elasticsearch_old_indices = self._fetch_old_index()
# update alias to point to the new index
actions = [{"add": {"index": self.elasticsearch_new_index, "alias": self.elasticsearch_alias}}]
# delete old indices
delete_actions = [{"remove_index": {"index": index}} for index in elasticsearch_old_indices]
actions.extend(delete_actions)
update_action = {"actions": actions}
# perform alias update and index delete in single atomic operation
self.elasticsearch_client.indices.update_aliases(update_action)
def get_scope(self):
# type: () -> str
return 'publisher.elasticsearch'
This diff is collapsed.
import copy
import logging
import six
from typing import Union, Iterable, Optional # noqa: F401
LOGGER = logging.getLogger(__name__)
def remove_double_quotes(val):
# type: (Union[str, None]) -> Union[str, None]
"""
Often times, column name, table name or any identifier comes with double quoted. This method will remove double
quotes.
:param val:
:return:
"""
if not val:
return val
if six.PY2 and isinstance(val, six.text_type):
val = val.encode('utf-8', 'ignore')
if val.startswith('"') and val.endswith('"'):
return val[1:-1]
return val
class Column(object):
"""
Column for usage.
"""
def __init__(self, name, table=None, col_alias=None):
# type: (str, Union[Table, None], Union[str, None]) -> None
self.col_name = remove_double_quotes(name)
self.table = table
self.col_alias = remove_double_quotes(col_alias)
def __repr__(self):
# type: () -> str
return 'Column(name={!r}, table={!r}, col_alias={!r})'.format(self.col_name, self.table, self.col_alias)
def resolve_col_name(self, col_name):
# type: (Union[str, None]) -> Union[str, None]
"""
Resolve column name for currently processing column.
e.g: SELECT bar from (SELECT foo as bar FROM foobar)
Above case, bar is trying to resolve with column foo that has alias bar. In this case, it will resolve to foo,
as that is the actual column name.
:param col_name:
:return:
"""
if self.col_name == '*':
return col_name
if col_name == self.col_alias or col_name == self.col_name:
return self.col_name
return None
@staticmethod
def resolve(select_col, from_cols):
# type: (Column, Iterable[Column]) -> Iterable[Column]
"""
Resolve processing column with processed columns. Processing column is the one from SELECT clause where it
does not have table information where it can optionally have table alias in front of column (foo.bar)
Processed columns are already resolved columns that it has column with table with it.
Resolving processing columns with processed columns means having processing columns choose correct processed
columns. The result is proper column name with table name.
e.g1: SELECT foo from foobar
- processing column: foo
- processed column: all columns from foobar
--> result: column 'foo' from 'foobar' table
e.g2: SELECT foo from (SELECT foo, bar FROM foobar)
- processing column: foo
- processed column: foo and bar columns from foobar
--> result: column 'foo' from 'foobar' table
:param select_col: column from select clause
:param from_cols: column from 'from' clause
:return: List of columns
"""
if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug('select_col: {}'.format(select_col))
LOGGER.debug('from_cols: {}'.format(from_cols))
if select_col.col_name != '*':
return Column.resolve_named_column(select_col, from_cols)
return Column._resolve_all_column(select_col, from_cols)
@staticmethod
def resolve_named_column(select_col, from_cols):
# type: (Column, Iterable[Column]) -> Iterable[Column]
"""
SELECT clause where column has name (not *)
e.g: SELECT foo, bar FROM foobar.
:param select_col: column from select clause
:param from_cols: column from 'from' clause
:return:
"""
# column name is defined and has table alias. (SELECT foo.bar)
if select_col.table:
for processed_col in from_cols:
# resolve column name as processing column name can be alias
col_name = processed_col.resolve_col_name(select_col.col_name)
if col_name and processed_col.table:
# resolve table name using alias
table = processed_col.table.resolve_table(select_col.table.name)
if table:
alias = Column.get_column_alias(select_col=select_col, from_col=processed_col)
col = Column(col_name, table=table, col_alias=alias)
return [col]
raise Exception('Cannot found case 1. select_col: {} , from_cols: {}'
.format(select_col, from_cols))
# col name defined but no table. (SELECT foo)
else:
sub_result = []
# Using column name only to find a match from processed column.
# Note that we can have a column name with multiple table as a result. This is the case that SQL engine
# resolves ambiguity by looking into actual columns in table. Here we use OrTable so that later on it
# can be disambiguated.
for processed_col in from_cols:
col_name = processed_col.resolve_col_name(select_col.col_name)
if col_name:
col = copy.deepcopy(processed_col)
col.col_name = col_name
alias = Column.get_column_alias(select_col=select_col, from_col=processed_col)
col.col_alias = alias
sub_result.append(col)
if not sub_result:
raise Exception('Cannot find case 2. select_col: {} , from_cols: {}'
.format(select_col, from_cols))
if len(sub_result) == 1:
return sub_result
tables = []
for col in sub_result:
tables.append(copy.deepcopy(col.table))
col = sub_result[0]
col.table = OrTable(tables)
return [col]
@staticmethod
def get_column_alias(select_col, from_col):
# type: (Column, Column) -> str
"""
Use processing column alias if not null.
:param select_col: column from select clause
:param from_col: column from 'from' clause
:return:
"""
return select_col.col_alias if select_col.col_alias else from_col.col_alias
@staticmethod
def _resolve_all_column(processing_col, processed_cols):
# type: (Column, Iterable[Column]) -> Iterable[Column]
"""
SELECT statement where column is '*'
e.g: SELECT * FROM foobar;
:param processed_cols:
:return:
"""
if processing_col.table:
result = []
# Select whatever we have in processed where it just need to match table
for processed_col in processed_cols:
if processed_col.table:
table = processed_col.table.resolve_table(processing_col.table.name)
if table:
col = copy.deepcopy(processed_col)
col.table = table
result.append(col)
if not result:
raise Exception('Cannot find case 3. select_col: {} , from_cols: {}'
.format(processing_col, processed_cols))
return result
# SELECT * case
else:
result = []
for processed_col in processed_cols:
result.append(copy.deepcopy(processed_col))
if not result:
raise Exception('Cannot find case 4. select_col: {} , from_cols: {}'
.format(processing_col, processed_cols))
return result
class Table(object):
"""
Table class for usage
"""
def __init__(self, name, schema=None, alias=None):
# type: (str, Union[str, None], Union[str, None]) -> None
self.name = remove_double_quotes(name)
self.schema = remove_double_quotes(schema)
self.alias = remove_double_quotes(alias)
def resolve_table(self, select_table_name):
# type: (Union[str, None]) -> Union[Table, None]
"""
If processing_table_name matches table return table instance
:param select_table_name: table in select clause
:return:
"""
if select_table_name == self.alias or select_table_name == self.name:
return self
return None
def __repr__(self):
# type: () -> str
return 'Table(name={!r}, schema={!r}, alias={!r})'.format(self.name, self.schema, self.alias)
class OrTable(Table):
"""
Table that holds multiple table. This is for ambiguous case.
For example, "SELECT a, b FROM foo JOIN bar USING c" statement does not tell if column a is from foo or bar.
Thus, column a is either from table foo or bar and this class represent this problem.
"""
def __init__(self, tables):
# type: (Iterable[Optional[Table]]) -> None
self.tables = tables
def resolve_table(self, select_table_name):
# type: (str) -> Optional[Table]
"""
If any of term matches with table return it
:param select_table_name:
:return:
"""
for table in self.tables:
if isinstance(table, OrTable):
result = table.resolve_table(select_table_name)
if result:
return result
continue
if select_table_name == table.alias or select_table_name == table.name:
return table
return None
def __repr__(self):
# type: () -> str
return 'OrTable(tables={!r})'.format(self.tables)
How to generate Antlr runtime targets:
1. Install Antlr4. https://github.com/antlr/antlr4/blob/master/doc/getting-started.md
1. Download grammar. Presto: https://github.com/prestodb/presto/blob/master/presto-parser/src/main/antlr4/com/facebook/presto/sql/parser/SqlBase.g4
1. Generate Antlr runtime target https://github.com/antlr/antlr4/blob/master/doc/python-target.md
1. More on Antlr: https://github.com/antlr/antlr4/blob/master/doc/index.md
This diff is collapsed.
import abc
from pyhocon import ConfigTree # noqa: F401
from databuilder import Scoped
class Task(Scoped):
"""
A Abstract task that can run an abstract task
"""
@abc.abstractmethod
def init(self, conf):
# type: (ConfigTree) -> None
pass
@abc.abstractmethod
def run(self):
# type: () -> None
"""
Runs a task
:return:
"""
pass
def get_scope(self):
# type: () -> str
return 'task'
import logging
from pyhocon import ConfigTree # noqa: F401
from databuilder import Scoped
from databuilder.extractor.base_extractor import Extractor # noqa: F401
from databuilder.loader.base_loader import Loader # noqa: F401
from databuilder.task.base_task import Task # noqa: F401
from databuilder.transformer.base_transformer import Transformer # noqa: F401
from databuilder.transformer.base_transformer \
import NoopTransformer # noqa: F401
from databuilder.utils.closer import Closer
class DefaultTask(Task):
"""
A default task expecting to extract, transform and load.
"""
def __init__(self,
extractor,
loader,
transformer=NoopTransformer()):
# type: (Extractor, Loader, Transformer) -> None
self.extractor = extractor
self.transformer = transformer
self.loader = loader
self._closer = Closer()
self._closer.register(self.extractor.close)
self._closer.register(self.transformer.close)
self._closer.register(self.loader.close)
def init(self, conf):
# type: (ConfigTree) -> None
self.extractor.init(Scoped.get_scoped_conf(conf, self.extractor.get_scope()))
self.transformer.init(Scoped.get_scoped_conf(conf, self.transformer.get_scope()))
self.loader.init(Scoped.get_scoped_conf(conf, self.loader.get_scope()))
def run(self):
# type: () -> None
"""
Runs a task
:return:
"""
logging.info('Running a task')
try:
record = self.extractor.extract()
while record:
record = self.transformer.transform(record)
if not record:
continue
self.loader.load(record)
record = self.extractor.extract()
finally:
self._closer.close()
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
name,description,col_type,sort_order,database,cluster,schema_name,table_name,table_desc
col1,"col1","string",1,hive,gold,test_schema,test_table1,"1st test table"
col2,"col2","string",2,hive,gold,test_schema,test_table1,"1st test table"
col3,"col3","string",3,hive,gold,test_schema,test_table1,"1st test table"
col4,"col4","string",4,hive,gold,test_schema,test_table1,"1st test table"
col1,"col1","string",1,dynamo,gold,test_schema,test_table2,"2nd test table"
col2,"col2","string",2,dynamo,gold,test_schema,test_table2,"2nd test table"
col3,"col3","string",3,dynamo,gold,test_schema,test_table2,"2nd test table"
col4,"col4","string",4,dynamo,gold,test_schema,test_table2,"2nd test table"
\ No newline at end of file
database,cluster,schema_name,table_name,table_desc
hive,gold,test_schema,test_table1,"1st test table"
dynamo,gold,test_schema,test_table2,"2nd test table"
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment