Commit 9afc1c28 authored by Pedro Gonçalves Rossi Rodrigues's avatar Pedro Gonçalves Rossi Rodrigues Committed by Tao Feng

add cassandra extractors, tests and docs (#169)

* add cassandra extractors, tests and docs

* add cassandra extractors, tests and docs

* code refactor based on pr and code linted
parent 8b8177b4
...@@ -108,6 +108,42 @@ job = DefaultJob( ...@@ -108,6 +108,42 @@ job = DefaultJob(
job.launch() job.launch()
``` ```
#### [CassandraExtractor](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/cassandra_extractor.py "CassandraExtractor")
An extractor that extracts table and column metadata including keyspace, table name, column name and column type from Apache Cassandra databases
```python
job_config = ConfigFactory.from_dict({
'extractor.cassandra.{}'.format(CassandraExtractor.CLUSTER_KEY): cluster_identifier_string,
'extractor.cassandra.{}'.format(CassandraExtractor.IPS_KEY): [127.0.0.1],
'extractor.cassandra.{}'.format(CassandraExtractor.KWARGS_KEY): {},
'extractor.cassandra.{}'.format(CassandraExtractor.FILTER_FUNCTION_KEY): my_filter_function,
})
job = DefaultJob(
conf=job_config,
task=DefaultTask(
extractor=CassandraExtractor(),
loader=AnyLoader()))
job.launch()
```
If using the function filter options here is the function description
```python
def filter(keytab, table):
# return False if you don't want to add that table and True if you want to add
return True
```
If needed to define more args on the cassandra cluster you can pass through kwargs args
```python
config = ConfigFactory.from_dict({
'extractor.cassandra.{}'.format(CassandraExtractor.IPS_KEY): [127.0.0.1],
'extractor.cassandra.{}'.format(CassandraExtractor.KWARGS_KEY): {'port': 9042}
})
# it will call the cluster constructor like this
Cluster([127.0.0.1], **kwargs)
```
#### [GlueExtractor](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/glue_extractor.py "GlueExtractor") #### [GlueExtractor](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/glue_extractor.py "GlueExtractor")
An extractor that extracts table and column metadata including database, schema, table name, table description, column name and column description from AWS Glue metastore. An extractor that extracts table and column metadata including database, schema, table name, table description, column name and column description from AWS Glue metastore.
......
from cassandra.cluster import Cluster
from pyhocon import ConfigFactory, ConfigTree # noqa: F401
from typing import Iterator, Union, Dict, Any # noqa: F401
from databuilder.extractor.base_extractor import Extractor
from databuilder.models.table_metadata import TableMetadata, ColumnMetadata
class CassandraExtractor(Extractor):
"""
Extracts tables and columns metadata from Apacha Cassandra
"""
CLUSTER_KEY = 'cluster'
# Key to define clusters ips, it should be List[str]
IPS_KEY = 'ips'
# Key to define extra kwargs to pass on cluster constructor,
# it should be Dict[Any]
KWARGS_KEY = 'kwargs'
# Key to define custom filter function based on keyspace and table
# since the cluster metadata doesn't support native filters,
# it should be like def filter(keyspace, table) and return False if
# going to skip that table and True if not
FILTER_FUNCTION_KEY = 'filter'
# Default values
DEFAULT_CONFIG = ConfigFactory.from_dict({
CLUSTER_KEY: 'gold',
IPS_KEY: [],
KWARGS_KEY: {},
FILTER_FUNCTION_KEY: None
})
def init(self, conf):
conf = conf.with_fallback(CassandraExtractor.DEFAULT_CONFIG)
self._cluster = '{}'.format(conf.get_string(CassandraExtractor.CLUSTER_KEY))
self._filter = conf.get(CassandraExtractor.FILTER_FUNCTION_KEY)
ips = conf.get_list(CassandraExtractor.IPS_KEY)
kwargs = conf.get(CassandraExtractor.KWARGS_KEY)
self._client = Cluster(ips, **kwargs)
self._client.connect()
self._extract_iter = None # type: Union[None, Iterator]
def extract(self):
# type: () -> Union[TableMetadata, None]
if not self._extract_iter:
self._extract_iter = self._get_extract_iter()
try:
return next(self._extract_iter)
except StopIteration:
return None
def get_scope(self):
# type: () -> str
return 'extractor.cassandra'
def _get_extract_iter(self):
# type: () -> Iterator[TableMetadata]
"""
It gets all tables and yields TableMetadata
:return:
"""
keyspaces = self._get_keyspaces()
for keyspace in keyspaces:
# system keyspaces
if keyspace.startswith('system'):
continue
for table in self._get_tables(keyspace):
if self._filter and not self._filter(keyspace, table):
continue
columns = []
columns_dict = self._get_columns(keyspace, table)
for idx, (column_name, column) in enumerate(columns_dict.items()):
columns.append(ColumnMetadata(
column_name,
None,
column.cql_type,
idx
))
yield TableMetadata(
'cassandra',
self._cluster,
keyspace,
table,
None,
columns
)
def _get_keyspaces(self):
return self._client.metadata.keyspaces
def _get_tables(self, keyspace):
return self._client.metadata.keyspaces[keyspace].tables
def _get_columns(self, keyspace, table):
return self._client.metadata.keyspaces[keyspace].tables[table].columns
...@@ -67,3 +67,4 @@ httplib2~=0.9.2 ...@@ -67,3 +67,4 @@ httplib2~=0.9.2
confluent-kafka==1.0.0 confluent-kafka==1.0.0
unidecode unidecode
boto3==1.10.1 boto3==1.10.1
cassandra-driver==3.20.1
import logging
import unittest
from collections import OrderedDict
from mock import patch
from pyhocon import ConfigFactory
from typing import Any, Dict # noqa: F401
from cassandra.metadata import ColumnMetadata as CassandraColumnMetadata
from databuilder.extractor.cassandra_extractor import CassandraExtractor
from databuilder.models.table_metadata import TableMetadata, ColumnMetadata
# patch whole class to avoid actually calling for boto3.client during tests
@patch('cassandra.cluster.Cluster.connect', lambda x: None)
class TestCassandraExtractor(unittest.TestCase):
def setUp(self):
# type: () -> None
logging.basicConfig(level=logging.INFO)
self.default_conf = ConfigFactory.from_dict({})
def test_extraction_with_empty_query_result(self):
# type: () -> None
"""
Test Extraction with empty result from query
"""
extractor = CassandraExtractor()
extractor.init(self.default_conf)
results = extractor.extract()
self.assertEqual(results, None)
@patch('databuilder.extractor.cassandra_extractor.CassandraExtractor._get_keyspaces')
@patch('databuilder.extractor.cassandra_extractor.CassandraExtractor._get_tables')
@patch('databuilder.extractor.cassandra_extractor.CassandraExtractor._get_columns')
def test_extraction_with_default_conf(self, mock_columns, mock_tables, mock_keyspaces):
# type: () -> None
mock_keyspaces.return_value = {'test_schema': None}
mock_tables.return_value = {'test_table': None}
columns_dict = OrderedDict()
columns_dict['id'] = CassandraColumnMetadata(None, 'id', 'int')
columns_dict['txt'] = CassandraColumnMetadata(None, 'txt', 'text')
mock_columns.return_value = columns_dict
extractor = CassandraExtractor()
extractor.init(self.default_conf)
actual = extractor.extract()
expected = TableMetadata('cassandra', 'gold', 'test_schema', 'test_table', None,
[ColumnMetadata('id', None, 'int', 0),
ColumnMetadata('txt', None, 'text', 1)])
self.assertEqual(expected.__repr__(), actual.__repr__())
self.assertIsNone(extractor.extract())
@patch('databuilder.extractor.cassandra_extractor.CassandraExtractor._get_keyspaces')
@patch('databuilder.extractor.cassandra_extractor.CassandraExtractor._get_tables')
@patch('databuilder.extractor.cassandra_extractor.CassandraExtractor._get_columns')
def test_extraction_with_filter_conf(self, mock_columns, mock_tables, mock_keyspaces):
# type: () -> None
mock_keyspaces.return_value = {'test_schema': None}
mock_tables.return_value = {'test_table': None}
columns_dict = OrderedDict()
columns_dict['id'] = CassandraColumnMetadata(None, 'id', 'int')
columns_dict['txt'] = CassandraColumnMetadata(None, 'txt', 'text')
mock_columns.return_value = columns_dict
def filter_function(k, t):
return False if 'test' in k or 'test' in t else False
conf = ConfigFactory.from_dict({
CassandraExtractor.FILTER_FUNCTION_KEY: filter_function
})
extractor = CassandraExtractor()
extractor.init(conf)
self.assertIsNone(extractor.extract())
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment