Commit 04c69a37 authored by Pedro Gonçalves Rossi Rodrigues's avatar Pedro Gonçalves Rossi Rodrigues Committed by Tao Feng

Add an AWS Glue Extractor (#157)

* add aws glue extractor

* add sample code for glue extractor on README

* patch whole glue_extractor test to avoid boto3 error if not configured

* boto3 add on requirements
parent 2c884e2c
......@@ -108,6 +108,20 @@ job = DefaultJob(
job.launch()
```
#### [GlueExtractor](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/glue_extractor.py "GlueExtractor")
An extractor that extracts table and column metadata including database, schema, table name, table description, column name and column description from AWS Glue metastore.
Before running make sure you have a working AWS profile configured and have access to search tables on Glue
```python
job_config = ConfigFactory.from_dict({})
job = DefaultJob(
conf=job_config,
task=DefaultTask(
extractor=GlueExtractor(),
loader=AnyLoader()))
job.launch()
```
#### [PostgresMetadataExtractor](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/postgres_metadata_extractor.py "PostgresMetadataExtractor")
An extractor that extracts table and column metadata including database, schema, table name, table description, column name and column description from a Postgres or Redshift database.
......
import boto3
from pyhocon import ConfigFactory, ConfigTree # noqa: F401
from typing import Iterator, Union, Dict, Any # noqa: F401
from databuilder.extractor.base_extractor import Extractor
from databuilder.models.table_metadata import TableMetadata, ColumnMetadata
class GlueExtractor(Extractor):
"""
Extracts tables and columns metadata from AWS Glue metastore
"""
CLUSTER_KEY = 'cluster'
DEFAULT_CONFIG = ConfigFactory.from_dict({CLUSTER_KEY: 'gold'})
def init(self, conf):
conf = conf.with_fallback(GlueExtractor.DEFAULT_CONFIG)
self._cluster = '{}'.format(conf.get_string(GlueExtractor.CLUSTER_KEY))
self._glue = boto3.client('glue')
self._extract_iter = None # type: Union[None, Iterator]
def extract(self):
# type: () -> Union[TableMetadata, None]
if not self._extract_iter:
self._extract_iter = self._get_extract_iter()
try:
return next(self._extract_iter)
except StopIteration:
return None
def get_scope(self):
# type: () -> str
return 'extractor.glue_extractor'
def _get_extract_iter(self):
# type: () -> Iterator[TableMetadata]
"""
It gets all tables and yields TableMetadata
:return:
"""
for row in self._get_raw_extract_iter():
columns = []
for i in range(len(row['StorageDescriptor']['Columns'])):
column = row['StorageDescriptor']['Columns'][i]
columns.append(ColumnMetadata(
column['Name'],
column['Comment'] if 'Comment' in column else None,
column['Type'],
i
))
yield TableMetadata(
'glue',
self._cluster,
row['DatabaseName'],
row['Name'],
row['Description'] if 'Description' in row else None,
columns
)
def _get_raw_extract_iter(self):
# type: () -> Iterator[Dict[str, Any]]
"""
Provides iterator of results row from glue client
:return:
"""
tables = self._search_tables()
return iter(tables)
def _search_tables(self):
tables = []
data = self._glue.search_tables()
tables += data['TableList']
while 'NextToken' in data:
token = data['NextToken']
data = self._glue.search_tables(NextToken=token)
tables += data['TableList']
return tables
......@@ -66,3 +66,4 @@ google-auth>=1.0.0, <2.0.0dev
httplib2~=0.9.2
confluent-kafka==1.0.0
unidecode
boto3==1.10.1
import logging
import unittest
from mock import patch
from pyhocon import ConfigFactory
from typing import Any, Dict # noqa: F401
from databuilder.extractor.glue_extractor import GlueExtractor
from databuilder.models.table_metadata import TableMetadata, ColumnMetadata
# patch whole class to avoid actually calling for boto3.client during tests
@patch('databuilder.extractor.glue_extractor.boto3.client', lambda x: None)
class TestGlueExtractor(unittest.TestCase):
def setUp(self):
# type: () -> None
logging.basicConfig(level=logging.INFO)
self.conf = ConfigFactory.from_dict({})
def test_extraction_with_empty_query_result(self):
# type: () -> None
"""
Test Extraction with empty result from query
"""
with patch.object(GlueExtractor, '_search_tables'):
extractor = GlueExtractor()
extractor.init(self.conf)
results = extractor.extract()
self.assertEqual(results, None)
def test_extraction_with_single_result(self):
# type: () -> None
with patch.object(GlueExtractor, '_search_tables') as mock_search:
mock_search.return_value = [
{
'Name': 'test_table',
'DatabaseName': 'test_schema',
'Description': 'a table for testing',
'StorageDescriptor': {
'Columns': [
{
'Name': 'col_id1',
'Type': 'bigint',
'Comment': 'description of id1'
},
{
'Name': 'col_id2',
'Type': 'bigint',
'Comment': 'description of id2'
},
{
'Name': 'is_active',
'Type': 'boolean'
},
{
'Name': 'source',
'Type': 'varchar',
'Comment': 'description of source'
},
{
'Name': 'etl_created_at',
'Type': 'timestamp',
'Comment': 'description of etl_created_at'
},
{
'Name': 'ds',
'Type': 'varchar'
}
]
}
}
]
extractor = GlueExtractor()
extractor.init(self.conf)
actual = extractor.extract()
expected = TableMetadata('glue', 'gold', 'test_schema', 'test_table', 'a table for testing',
[ColumnMetadata('col_id1', 'description of id1', 'bigint', 0),
ColumnMetadata('col_id2', 'description of id2', 'bigint', 1),
ColumnMetadata('is_active', None, 'boolean', 2),
ColumnMetadata('source', 'description of source', 'varchar', 3),
ColumnMetadata('etl_created_at', 'description of etl_created_at', 'timestamp', 4),
ColumnMetadata('ds', None, 'varchar', 5)])
self.assertEqual(expected.__repr__(), actual.__repr__())
self.assertIsNone(extractor.extract())
def test_extraction_with_multiple_result(self):
# type: () -> None
with patch.object(GlueExtractor, '_search_tables') as mock_search:
mock_search.return_value = [
{
'Name': 'test_table1',
'DatabaseName': 'test_schema1',
'Description': 'test table 1',
'StorageDescriptor': {
'Columns': [
{
'Name': 'col_id1',
'Type': 'bigint',
'Comment': 'description of col_id1'
},
{
'Name': 'col_id2',
'Type': 'bigint',
'Comment': 'description of col_id2'
},
{
'Name': 'is_active',
'Type': 'boolean'
},
{
'Name': 'source',
'Type': 'varchar',
'Comment': 'description of source'
},
{
'Name': 'etl_created_at',
'Type': 'timestamp',
'Comment': 'description of etl_created_at'
},
{
'Name': 'ds',
'Type': 'varchar'
}
]
}
},
{
'Name': 'test_table2',
'DatabaseName': 'test_schema1',
'Description': 'test table 2',
'StorageDescriptor': {
'Columns': [
{
'Name': 'col_name',
'Type': 'varchar',
'Comment': 'description of col_name'
},
{
'Name': 'col_name2',
'Type': 'varchar',
'Comment': 'description of col_name2'
}
]
}
},
{
'Name': 'test_table3',
'DatabaseName': 'test_schema2',
'Description': 'test table 3',
'StorageDescriptor': {
'Columns': [
{
'Name': 'col_id3',
'Type': 'varchar',
'Comment': 'description of col_id3'
},
{
'Name': 'col_name3',
'Type': 'varchar',
'Comment': 'description of col_name3'
}
]
}
}
]
extractor = GlueExtractor()
extractor.init(self.conf)
expected = TableMetadata('glue', 'gold', 'test_schema1', 'test_table1', 'test table 1',
[ColumnMetadata('col_id1', 'description of col_id1', 'bigint', 0),
ColumnMetadata('col_id2', 'description of col_id2', 'bigint', 1),
ColumnMetadata('is_active', None, 'boolean', 2),
ColumnMetadata('source', 'description of source', 'varchar', 3),
ColumnMetadata('etl_created_at', 'description of etl_created_at', 'timestamp', 4),
ColumnMetadata('ds', None, 'varchar', 5)])
self.assertEqual(expected.__repr__(), extractor.extract().__repr__())
expected = TableMetadata('glue', 'gold', 'test_schema1', 'test_table2', 'test table 2',
[ColumnMetadata('col_name', 'description of col_name', 'varchar', 0),
ColumnMetadata('col_name2', 'description of col_name2', 'varchar', 1)])
self.assertEqual(expected.__repr__(), extractor.extract().__repr__())
expected = TableMetadata('glue', 'gold', 'test_schema2', 'test_table3', 'test table 3',
[ColumnMetadata('col_id3', 'description of col_id3', 'varchar', 0),
ColumnMetadata('col_name3', 'description of col_name3',
'varchar', 1)])
self.assertEqual(expected.__repr__(), extractor.extract().__repr__())
self.assertIsNone(extractor.extract())
self.assertIsNone(extractor.extract())
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment