Unverified Commit 01a0f962 authored by Jin Hyuk Chang's avatar Jin Hyuk Chang Committed by GitHub

Mode Dashboard extractor with Generic REST API Query (#194)

* Initial check in on REST API Query

* Working version

* docstring

* Update

* Update

* Make unit test happy

* Update docstring

* Update

* Update

* Update

* Adding unit tests

* Updated README.md

* jsonpath_rw to extra_requires
parent 9556b186
......@@ -395,3 +395,28 @@ Callback interface is built upon a [Observer pattern](https://en.wikipedia.org/w
Publisher is the first one adopting Callback where registered Callback will be called either when publish succeeded or when publish failed. In order to register callback, Publisher provides [register_call_back](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/publisher/base_publisher.py#L50 "register_call_back") method.
One use case is for Extractor that needs to commit when job is finished (e.g: Kafka). Having Extractor register a callback to Publisher to commit when publish is successful, extractor can safely commit by implementing commit logic into [on_success](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/callback/call_back.py#L18 "on_success") method.
### REST API Query
Databuilder now has a generic REST API Query capability that can be joined each other.
Most of the cases of extraction is currently from Database or Datawarehouse that is queryable via SQL. However, not all metadata sources provide our access to its Database and they mostly provide REST API to consume their metadata.
The challenges come with REST API is that:
1. there's no explicit standard in REST API. Here, we need to conform to majority of cases (HTTP call with JSON payload & response) but open for extension for different authentication scheme, and different way of pagination, etc.
2. It is hardly the case that you would get what you want from one REST API call. It is usually the case that you need to snitch (JOIN) multiple REST API calls together to get the information you want.
To solve this challenges, we introduce [RestApiQuery](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/rest_api/rest_api_query.py)
RestAPIQuery is:
1. Assuming that REST API is using HTTP(S) call with GET method -- RestAPIQuery intention's is **read**, not write -- where basic HTTP auth is supported out of the box. There's extension point on other authentication scheme such as Oauth, and pagination, etc.
2. Usually, you want the subset of the response you get from the REST API call -- value extraction. To extract the value you want, RestApiQuery uses [JSONPath](https://goessner.net/articles/JsonPath/) which is similar product as XPATH of XML.
3. You can JOIN multiple RestApiQuery together.
More detail on JOIN operation in RestApiQuery:
1. It joins multiple RestApiQuery together by accepting prior RestApiQuery as a constructor -- a [Decorator pattern](https://en.wikipedia.org/wiki/Decorator_pattern)
2. In REST API, URL is the one that locates the resource we want. Here, JOIN simply means we need to find resource **based on the identifier that other query's result has**. In other words, when RestApiQuery forms URL, it uses previous query's result to compute the URL `e.g: Previous record: {"dashboard_id": "foo"}, URL before: http://foo.bar/dashboard/{dashboard_id} URL after compute: http://foo.bar/dashboard/foo`
With this pattern RestApiQuery supports 1:1 and 1:N JOIN relationship.
(GROUP BY or any other aggregation, sub-query join is not supported)
To see in action, take a peek at [ModeDashboardExtractor](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/dashboard/mode_dashboard_extractor.py)
import logging
from pyhocon import ConfigTree, ConfigFactory # noqa: F401
from requests.auth import HTTPBasicAuth
from typing import Any # noqa: F401
from databuilder import Scoped
from databuilder.extractor.base_extractor import Extractor
from databuilder.extractor.restapi.rest_api_extractor import RestAPIExtractor, REST_API_QUERY, MODEL_CLASS, \
STATIC_RECORD_DICT
from databuilder.rest_api.base_rest_api_query import RestApiQuerySeed
from databuilder.rest_api.rest_api_query import RestApiQuery
# CONFIG KEYS
ORGANIZATION = 'organization'
MODE_ACCESS_TOKEN = 'mode_user_token'
MODE_PASSWORD_TOKEN = 'mode_password_token'
LOGGER = logging.getLogger(__name__)
class ModeDashboardExtractor(Extractor):
"""
A Extractor that extracts core metadata on Mode dashboard. https://app.mode.com/
It extracts list of reports that consists of:
Dashboard group name (Space name)
Dashboard group id (Space token)
Dashboard group description (Space description)
Dashboard name (Report name)
Dashboard id (Report token)
Dashboard description (Report description)
Other information such as report run, owner, chart name, query name is in separate extractor.
"""
def init(self, conf):
# type: (ConfigTree) -> None
self._conf = conf
restapi_query = self._build_restapi_query()
self._extractor = RestAPIExtractor()
rest_api_extractor_conf = Scoped.get_scoped_conf(conf, self._extractor.get_scope()).with_fallback(
ConfigFactory.from_dict(
{
REST_API_QUERY: restapi_query,
MODEL_CLASS: 'databuilder.models.dashboard_metadata.DashboardMetadata',
STATIC_RECORD_DICT: {'product': 'mode'}
}
)
)
self._extractor.init(conf=rest_api_extractor_conf)
def extract(self):
# type: () -> Any
return self._extractor.extract()
def get_scope(self):
# type: () -> str
return 'extractor.mode_dashboard'
def _build_restapi_query(self):
"""
Build REST API Query. To get Mode Dashboard metadata, it needs to call two APIs (spaces API and reports
API) joining together.
:return: A RestApiQuery that provides Mode Dashboard metadata
"""
# type: () -> RestApiQuery
spaces_url_template = 'https://app.mode.com/api/{organization}/spaces?filter=all'
reports_url_template = 'https://app.mode.com/api/{organization}/spaces/{dashboard_group_id}/reports'
# Seed query record for next query api to join with
seed_record = [{'organization': self._conf.get_string(ORGANIZATION)}]
seed_query = RestApiQuerySeed(seed_record=seed_record)
params = {'auth': HTTPBasicAuth(self._conf.get_string(MODE_ACCESS_TOKEN),
self._conf.get_string(MODE_PASSWORD_TOKEN))}
# Spaces
# JSONPATH expression. it goes into array which is located in _embedded.spaces and then extracts token, name,
# and description
json_path = '_embedded.spaces[*].[token,name,description]'
field_names = ['dashboard_group_id', 'dashboard_group', 'dashboard_group_description']
spaces_query = RestApiQuery(query_to_join=seed_query, url=spaces_url_template, params=params,
json_path=json_path, field_names=field_names)
# Reports
# JSONPATH expression. it goes into array which is located in _embedded.reports and then extracts token, name,
# and description
json_path = '_embedded.reports[*].[token,name,description]'
field_names = ['dashboard_id', 'dashboard_name', 'description']
reports_query = RestApiQuery(query_to_join=spaces_query, url=reports_url_template, params=params,
json_path=json_path, field_names=field_names, skip_no_result=True)
return reports_query
import logging
import importlib
from typing import Iterator, Any # noqa: F401
from pyhocon import ConfigTree # noqa: F401
from databuilder.extractor.base_extractor import Extractor
from databuilder.rest_api.base_rest_api_query import BaseRestApiQuery # noqa: F401
REST_API_QUERY = 'restapi_query'
MODEL_CLASS = 'model_class'
# Static record that will be added into extracted record
# For example, DashboardMetadata requires product name (static name) of Dashboard and REST api does not provide
# it. and you can add {'product': 'mode'} so that it will be included in the record.
STATIC_RECORD_DICT = 'static_record_dict'
LOGGER = logging.getLogger(__name__)
class RestAPIExtractor(Extractor):
"""
An Extractor that calls one or more REST API to extract the data.
This extractor almost entirely depends on RestApiQuery.
"""
def init(self, conf):
# type: (ConfigTree) -> None
self._restapi_query = conf.get(REST_API_QUERY) # type: BaseRestApiQuery
self._iterator = None # type: Iterator[Dict[str, Any]]
self._static_dict = conf.get(STATIC_RECORD_DICT, dict())
LOGGER.info('static record: {}'.format(self._static_dict))
model_class = conf.get(MODEL_CLASS, None)
if model_class:
module_name, class_name = model_class.rsplit(".", 1)
mod = importlib.import_module(module_name)
self.model_class = getattr(mod, class_name)
def extract(self):
# type: () -> Any
"""
Fetch one result row from RestApiQuery, convert to {model_class} if specified before
returning.
:return:
"""
if not self._iterator:
self._iterator = self._restapi_query.execute()
try:
record = next(self._iterator)
except StopIteration:
return None
if self._static_dict:
record.update(self._static_dict)
if hasattr(self, 'model_class'):
return self.model_class(**record)
return record
def get_scope(self):
# type: () -> str
return 'extractor.restapi'
This diff is collapsed.
import abc
import logging
import six
from typing import Iterable, Any, Dict, Iterator # noqa: F401
LOGGER = logging.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta)
class BaseRestApiQuery(object):
@abc.abstractmethod
def execute(self):
"""
Provides iterator of the records. It uses iterator so that it can stream the result.
:return:
"""
# type: () -> Iterator[Dict[str, Any]]
return iter([dict()])
class RestApiQuerySeed(BaseRestApiQuery):
"""
A seed RestApiQuery.
RestApiQuery is using decorator pattern where it needs to have a seed to begin with. RestApiQuerySeed is for
RestApiQuery to start with.
Example: see ModeDashboardExtractor._build_restapi_query
"""
def __init__(self,
seed_record # type: Iterable[Dict[str, Any]]
):
# type: (...) -> None
self._seed_record = seed_record
def execute(self):
# type: () -> Iterator[Dict[str, Any]]
return iter(self._seed_record)
import copy
import logging
import requests
from jsonpath_rw import parse
from retrying import retry
from typing import List, Dict, Any, Union # noqa: F401
from databuilder.rest_api.base_rest_api_query import BaseRestApiQuery
LOGGER = logging.getLogger(__name__)
class RestApiQuery(BaseRestApiQuery):
"""
A generic REST API Query that can be joined with other REST API query.
Major feature of RestApiQuery is the fact that it is joinable with other RestApiQuery.
Two main problems RestAPIQuery is trying to solve is:
1. How to retrieve values that I want from the REST API's result. (value extraction)
2. Most of the cases, one call is not enough. How to join the queries together to get the result that I want?
On "1" value extraction: RestApiQuery uses JSONPath which is similar product as XPATH of XML.
https://goessner.net/articles/JsonPath/
On "2" join: The idea on joining REST API is the fact that previous query's result is used to query subsequent
query.
To bring this into implementation:
1. It accepts prior RestApiQuery as a constructor -- Decorator pattern
2. When URL is formed, it uses previous query's result to compute the URL
e.g: Previous record: {"dashboard_id": "foo"},
URL before: http://foo.bar/dashboard/{dashboard_id}
URL after compute: http://foo.bar/dashboard/foo
With this pattern RestApiQuery supports 1:1 and 1:N JOIN relationship.
(GROUP BY or any other aggregation, sub-query join is not supported)
Supports basic HTTP authentication.
Extension point is available for other authentication scheme such as Oauth.
Extension point is available for pagination.
All extension point is designed for subclass because there's no exact standard on Oauth and pagination.
(How it would work with Tableau/Looker is described in docstring of _authenticate method)
"""
def __init__(self,
query_to_join, # type: BaseRestApiQuery
url, # type: str
params, # type: Dict[str, Any]
json_path, # type: str
field_names, # type: List[str]
fail_no_result=False, # type: bool
skip_no_result=False, # type: bool
):
# type: (...) -> None
"""
:param query_to_join: Previous query to JOIN. RestApiQuerySeed can be used for the first query
:param url: URL string. It will use <str>.format operation using record that comes from previous query to
substitute any variable that URL has.
e.g: Previous record: {"dashboard_id": "foo"},
URL before: http://foo.bar/dashboard/{dashboard_id}
URL after compute: http://foo.bar/dashboard/foo
:param params: A keyword arguments that pass into requests.get function.
https://requests.readthedocs.io/en/master/user/quickstart/#make-a-request
:param json_path: A JSONPath expression. https://github.com/kennknowles/python-jsonpath-rw
Example:
JSON result:
[{"report_id": "1", "report_name": "first report", "foo": "bar"},
{"report_id": "2", "report_name": "second report"}]
JSON PATH:
[*].[report_id,report_name]
["1", "first report", "2", "second report"]
:param field_names: Field names to be used on the result. Result is dictionary where field_name will be the key
and the values extracted via JSON PATH will be the value.
JSON Path result:
["1", "first report", "2", "second report"]
field_names:
["dashboard_id", "dashboard_description"]
{"dashboard_id": "1", "dashboard_description": "first report"}
{"dashboard_id": "2", "dashboard_description": "second report"}
:param fail_no_result: If there's no result from the query it will make it fail.
:param skip_no_result: If there's no result from the query, it will skip this record.
"""
self._inner_rest_api_query = query_to_join
self._url = url
self._params = params
self._json_path = json_path
self._jsonpath_expr = parse(self._json_path)
self._fail_no_result = fail_no_result
self._skip_no_result = skip_no_result
self._field_names = field_names
self._more_pages = False
def execute(self):
# type: () -> Iterator[Dict[str, Any]]
self._authenticate()
for record_dict in self._inner_rest_api_query.execute():
first_try = True # To control pagination. Always pass the while loop on the first try
while first_try or self._more_pages:
first_try = False
url = self._preprocess_url(record=record_dict)
response = self._send_request(url=url)
response_json = response.json() # type: Union[List[Any], Dict[str, Any]]
# value extraction via JSON Path
result_list = [match.value for match in self._jsonpath_expr.find(response_json)] # type: List[Any]
if not result_list:
log_msg = 'No result from URL: {url} , JSONPATH: {json_path} , response payload: {response}' \
.format(url=self._url, json_path=self._json_path, response=response_json)
LOGGER.info(log_msg)
if self._fail_no_result:
raise Exception(log_msg)
if self._skip_no_result:
continue
yield copy.deepcopy(record_dict)
while result_list:
record_dict = copy.deepcopy(record_dict)
for field_name in self._field_names:
record_dict[field_name] = result_list.pop(0)
yield record_dict
self._post_process(response)
def _preprocess_url(self,
record, # type: Dict[str, Any]
):
# type: (...) -> str
"""
Performs variable substitution using a dict comes as a record from previous query.
:param record:
:return: a URL that is ready to be called.
"""
return self._url.format(**record)
@retry(stop_max_attempt_number=5, wait_exponential_multiplier=1000, wait_exponential_max=10000)
def _send_request(self,
url # type: str
):
# type: (...) -> requests.Response
"""
Performs HTTP GET operation with retry on failure.
:param url:
:return:
"""
LOGGER.info('Calling URL {}'.format(url))
response = requests.get(url, **self._params)
response.raise_for_status()
return response
def _post_process(self,
response, # type: requests.Response
):
# type: (...) -> None
"""
Extension point for post-processing such thing as pagination
:return:
"""
pass
def _authenticate(self):
# type: (...) -> None
"""
Extension point to support other authentication mechanism such as Oauth.
Subclass this class and implement authentication process.
This assumes that most of authentication process can work with updating member variable such as url and params
For example, Tableau's authentication pattern is that of Oauth where you need to call end point with JSON
payload via POST method. This call will return one-time token along with LUID. On following calls,
one time token needs to be added on header, and LUID needs to be used to form URL to fetch information.
This is why url and params is part of RestApiQuery's member variable and above operation can be done by
mutating these two values.
Another Dashboard product Looker uses Oauth for authentication, and it can be done in similar way as Tableau.
:return: None
"""
pass
......@@ -59,3 +59,4 @@ retrying==1.3.3
httplib2~=0.9.2
unidecode
......@@ -31,7 +31,9 @@ bigquery = [
'google-auth>=1.0.0, <2.0.0dev'
]
all_deps = requirements + kafka + cassandra + glue + snowflake + athena + bigquery
jsonpath = ['jsonpath_rw==1.4.0']
all_deps = requirements + kafka + cassandra + glue + snowflake + athena + bigquery + jsonpath
setup(
name='amundsen-databuilder',
......@@ -51,6 +53,7 @@ setup(
'glue': glue,
'snowflake': snowflake,
'athena': athena,
'bigquery': bigquery
'bigquery': bigquery,
'jsonpath': jsonpath
},
)
import unittest
from pyhocon import ConfigFactory # noqa: F401
from databuilder.extractor.restapi.rest_api_extractor import RestAPIExtractor, REST_API_QUERY, MODEL_CLASS, \
STATIC_RECORD_DICT
from databuilder.models.dashboard_metadata import DashboardMetadata
from databuilder.rest_api.base_rest_api_query import RestApiQuerySeed
class TestRestAPIExtractor(unittest.TestCase):
def test_static_data(self):
# type: (...) -> None
conf = ConfigFactory.from_dict(
{
REST_API_QUERY: RestApiQuerySeed(seed_record=[{'foo': 'bar'}]),
STATIC_RECORD_DICT: {'john': 'doe'}
}
)
extractor = RestAPIExtractor()
extractor.init(conf=conf)
record = extractor.extract()
expected = {'foo': 'bar', 'john': 'doe'}
self.assertDictEqual(expected, record)
def test_model_construction(self):
conf = ConfigFactory.from_dict(
{
REST_API_QUERY: RestApiQuerySeed(
seed_record=[{'dashboard_group': 'foo',
'dashboard_name': 'bar',
'description': 'john',
'dashboard_group_description': 'doe'}]),
MODEL_CLASS: 'databuilder.models.dashboard_metadata.DashboardMetadata',
}
)
extractor = RestAPIExtractor()
extractor.init(conf=conf)
record = extractor.extract()
expected = DashboardMetadata(dashboard_group='foo', dashboard_name='bar', description='john',
dashboard_group_description='doe')
self.assertEqual(expected.__repr__(), record.__repr__())
import unittest
from mock import patch
from databuilder.rest_api.base_rest_api_query import RestApiQuerySeed
from databuilder.rest_api.rest_api_query import RestApiQuery
class TestRestApiQuery(unittest.TestCase):
def test_rest_api_query_seed(self):
rest_api_query = RestApiQuerySeed(seed_record=[
{'foo': 'bar'},
{'john': 'doe'}
])
result = [v for v in rest_api_query.execute()]
expected = [
{'foo': 'bar'},
{'john': 'doe'}
]
self.assertListEqual(expected, result)
def test_rest_api_query(self):
seed_record = [{'foo1': 'bar1'},
{'foo2': 'bar2'}]
seed_query = RestApiQuerySeed(seed_record=seed_record)
with patch('databuilder.rest_api.rest_api_query.requests.get') as mock_get:
json_path = 'foo.name'
field_names = ['name_field']
mock_get.return_value.json.side_effect = [
{'foo': {'name': 'john'}},
{'foo': {'name': 'doe'}},
]
query = RestApiQuery(query_to_join=seed_query, url='foobar', params={},
json_path=json_path, field_names=field_names)
expected = [
{'name_field': 'john', 'foo1': 'bar1'},
{'name_field': 'doe', 'foo2': 'bar2'}
]
for actual in query.execute():
self.assertDictEqual(expected.pop(0), actual)
def test_rest_api_query_multiple_fields(self):
seed_record = [{'foo1': 'bar1'},
{'foo2': 'bar2'}]
seed_query = RestApiQuerySeed(seed_record=seed_record)
with patch('databuilder.rest_api.rest_api_query.requests.get') as mock_get:
json_path = 'foo.[name,hobby]'
field_names = ['name_field', 'hobby']
mock_get.return_value.json.side_effect = [
{'foo': {'name': 'john', 'hobby': 'skiing'}},
{'foo': {'name': 'doe', 'hobby': 'snowboarding'}},
]
query = RestApiQuery(query_to_join=seed_query, url='foobar', params={},
json_path=json_path, field_names=field_names)
expected = [
{'name_field': 'john', 'hobby': 'skiing', 'foo1': 'bar1'},
{'name_field': 'doe', 'hobby': 'snowboarding', 'foo2': 'bar2'}
]
for actual in query.execute():
self.assertDictEqual(expected.pop(0), actual)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment