Unverified Commit 6c4bd0ba authored by Jin Hyuk Chang's avatar Jin Hyuk Chang Committed by GitHub

Mode Dashboard execution timestamp and state (#197)

* Initial commit

* Update

* Update

* Update

* flake8

* Update docstring

* Update

* Update

* Add mode_dashboard_constants and mode_dashboard_utils

* Add more to Util

* Added back model class to ModeDashboardExtractor

* flake8

* Address PR comments
parent dac8110d
ORGANIZATION = 'organization'
MODE_ACCESS_TOKEN = 'mode_user_token'
MODE_PASSWORD_TOKEN = 'mode_password_token'
import logging
from pyhocon import ConfigTree, ConfigFactory # noqa: F401
from typing import Any # noqa: F401
from databuilder import Scoped
from databuilder.extractor.base_extractor import Extractor
from databuilder.extractor.dashboard.mode_analytics.mode_dashboard_utils import ModeDashboardUtils
from databuilder.rest_api.rest_api_query import RestApiQuery
from databuilder.transformer.base_transformer import ChainedTransformer
from databuilder.transformer.dict_to_model import DictToModel, MODEL_CLASS
from databuilder.transformer.timestamp_string_to_epoch import TimestampStringToEpoch, FIELD_NAME
LOGGER = logging.getLogger(__name__)
class ModeDashboardExecutionsExtractor(Extractor):
"""
A Extractor that extracts run (execution) status and timestamp.
"""
def init(self, conf):
# type: (ConfigTree) -> None
self._conf = conf
restapi_query = self._build_restapi_query()
self._extractor = ModeDashboardUtils.create_mode_rest_api_extractor(
restapi_query=restapi_query,
conf=self._conf
)
# Payload from RestApiQuery has timestamp which is ISO8601. Here we are using TimestampStringToEpoch to
# transform into epoch and then using DictToModel to convert Dictionary to Model
transformers = []
timestamp_str_to_epoch_transformer = TimestampStringToEpoch()
timestamp_str_to_epoch_transformer.init(
conf=Scoped.get_scoped_conf(self._conf, timestamp_str_to_epoch_transformer.get_scope()).with_fallback(
ConfigFactory.from_dict({FIELD_NAME: 'execution_timestamp', })))
transformers.append(timestamp_str_to_epoch_transformer)
dict_to_model_transformer = DictToModel()
dict_to_model_transformer.init(
conf=Scoped.get_scoped_conf(self._conf, dict_to_model_transformer.get_scope()).with_fallback(
ConfigFactory.from_dict(
{MODEL_CLASS: 'databuilder.models.dashboard.dashboard_execution.DashboardExecution'})))
transformers.append(dict_to_model_transformer)
self._transformer = ChainedTransformer(transformers=transformers)
def extract(self):
# type: () -> Any
record = self._extractor.extract()
if not record:
return None
return self._transformer.transform(record=record)
def get_scope(self):
# type: () -> str
return 'extractor.mode_dashboard_execution'
def _build_restapi_query(self):
"""
Build REST API Query. To get Mode Dashboard last execution, it needs to call three APIs (spaces API, reports
API, and run API) joining together.
:return: A RestApiQuery that provides Mode Dashboard execution (run)
"""
# type: () -> RestApiQuery
spaces_query = ModeDashboardUtils.get_spaces_query_api(conf=self._conf)
params = ModeDashboardUtils.get_auth_params(conf=self._conf)
# Reports
# https://mode.com/developer/api-reference/analytics/reports/#listReportsInSpace
url = 'https://app.mode.com/api/{organization}/spaces/{dashboard_group_id}/reports'
json_path = '(_embedded.reports[*].token) | (_embedded.reports[*]._links.last_run.href)'
field_names = ['dashboard_id', 'last_run_resource_path']
last_run_resource_path_query = RestApiQuery(query_to_join=spaces_query, url=url, params=params,
json_path=json_path, field_names=field_names, skip_no_result=True,
json_path_contains_or=True)
# https://mode.com/developer/api-reference/analytics/report-runs/#getReportRun
url = 'https://app.mode.com{last_run_resource_path}'
json_path = '[state,completed_at]'
field_names = ['execution_state', 'execution_timestamp']
last_run_state_query = RestApiQuery(query_to_join=last_run_resource_path_query, url=url, params=params,
json_path=json_path, field_names=field_names, skip_no_result=True)
return last_run_state_query
import logging
from pyhocon import ConfigTree, ConfigFactory # noqa: F401
from requests.auth import HTTPBasicAuth
from typing import Any # noqa: F401
from databuilder import Scoped
from databuilder.extractor.base_extractor import Extractor
from databuilder.extractor.restapi.rest_api_extractor import RestAPIExtractor, REST_API_QUERY, MODEL_CLASS, \
STATIC_RECORD_DICT
from databuilder.rest_api.base_rest_api_query import RestApiQuerySeed
from databuilder.extractor.dashboard.mode_analytics.mode_dashboard_utils import ModeDashboardUtils
from databuilder.extractor.restapi.rest_api_extractor import MODEL_CLASS
from databuilder.rest_api.rest_api_query import RestApiQuery
# CONFIG KEYS
......@@ -39,19 +36,15 @@ class ModeDashboardExtractor(Extractor):
self._conf = conf
restapi_query = self._build_restapi_query()
self._extractor = RestAPIExtractor()
rest_api_extractor_conf = Scoped.get_scoped_conf(conf, self._extractor.get_scope()).with_fallback(
ConfigFactory.from_dict(
{
REST_API_QUERY: restapi_query,
MODEL_CLASS: 'databuilder.models.dashboard_metadata.DashboardMetadata',
STATIC_RECORD_DICT: {'product': 'mode'}
}
self._extractor = ModeDashboardUtils.create_mode_rest_api_extractor(
restapi_query=restapi_query,
conf=self._conf.with_fallback(
ConfigFactory.from_dict(
{MODEL_CLASS: 'databuilder.models.dashboard_metadata.DashboardMetadata', }
)
)
)
self._extractor.init(conf=rest_api_extractor_conf)
def extract(self):
# type: () -> Any
......@@ -70,23 +63,11 @@ class ModeDashboardExtractor(Extractor):
"""
# type: () -> RestApiQuery
spaces_url_template = 'https://app.mode.com/api/{organization}/spaces?filter=all'
# https://mode.com/developer/api-reference/analytics/reports/#listReportsInSpace
reports_url_template = 'https://app.mode.com/api/{organization}/spaces/{dashboard_group_id}/reports'
# Seed query record for next query api to join with
seed_record = [{'organization': self._conf.get_string(ORGANIZATION)}]
seed_query = RestApiQuerySeed(seed_record=seed_record)
params = {'auth': HTTPBasicAuth(self._conf.get_string(MODE_ACCESS_TOKEN),
self._conf.get_string(MODE_PASSWORD_TOKEN))}
# Spaces
# JSONPATH expression. it goes into array which is located in _embedded.spaces and then extracts token, name,
# and description
json_path = '_embedded.spaces[*].[token,name,description]'
field_names = ['dashboard_group_id', 'dashboard_group', 'dashboard_group_description']
spaces_query = RestApiQuery(query_to_join=seed_query, url=spaces_url_template, params=params,
json_path=json_path, field_names=field_names)
spaces_query = ModeDashboardUtils.get_spaces_query_api(conf=self._conf)
params = ModeDashboardUtils.get_auth_params(conf=self._conf)
# Reports
# JSONPATH expression. it goes into array which is located in _embedded.reports and then extracts token, name,
......
from pyhocon import ConfigTree, ConfigFactory # noqa: F401
from requests.auth import HTTPBasicAuth
from databuilder import Scoped
from databuilder.extractor.dashboard.mode_analytics.mode_dashboard_constants import ORGANIZATION, MODE_ACCESS_TOKEN, \
MODE_PASSWORD_TOKEN
from databuilder.extractor.restapi.rest_api_extractor import RestAPIExtractor, REST_API_QUERY, STATIC_RECORD_DICT
from databuilder.rest_api.base_rest_api_query import BaseRestApiQuery # noqa: F401
from databuilder.rest_api.base_rest_api_query import RestApiQuerySeed
from databuilder.rest_api.rest_api_query import RestApiQuery # noqa: F401
class ModeDashboardUtils(object):
@staticmethod
def get_spaces_query_api(conf, # type: ConfigTree
):
"""
Provides RestApiQuerySeed where it will provides iterator of dictionaries as records where dictionary keys are
organization, dashboard_group_id, dashboard_group and dashboard_group_description
:param conf:
:return:
"""
# type: (...) -> BaseRestApiQuery
# https://mode.com/developer/api-reference/management/spaces/#listSpaces
spaces_url_template = 'https://app.mode.com/api/{organization}/spaces?filter=all'
# Seed query record for next query api to join with
seed_record = [{'organization': conf.get_string(ORGANIZATION)}]
seed_query = RestApiQuerySeed(seed_record=seed_record)
# Spaces
params = {'auth': HTTPBasicAuth(conf.get_string(MODE_ACCESS_TOKEN),
conf.get_string(MODE_PASSWORD_TOKEN))}
json_path = '_embedded.spaces[*].[token,name,description]'
field_names = ['dashboard_group_id', 'dashboard_group', 'dashboard_group_description']
spaces_query = RestApiQuery(query_to_join=seed_query, url=spaces_url_template, params=params,
json_path=json_path, field_names=field_names)
return spaces_query
@staticmethod
def get_auth_params(conf, # type: ConfigTree
):
params = {'auth': HTTPBasicAuth(conf.get_string(MODE_ACCESS_TOKEN),
conf.get_string(MODE_PASSWORD_TOKEN)
)
}
return params
@staticmethod
def create_mode_rest_api_extractor(restapi_query, # type: BaseRestApiQuery
conf, # type: ConfigTree
):
"""
Creates RestAPIExtractor. Note that RestAPIExtractor is already initialized
:param restapi_query:
:param conf:
:return: RestAPIExtractor. Note that RestAPIExtractor is already initialized
"""
extractor = RestAPIExtractor()
rest_api_extractor_conf = \
Scoped.get_scoped_conf(conf, extractor.get_scope())\
.with_fallback(conf)\
.with_fallback(ConfigFactory.from_dict({REST_API_QUERY: restapi_query,
STATIC_RECORD_DICT: {'product': 'mode'}
}
)
)
extractor.init(conf=rest_api_extractor_conf)
return extractor
import logging
from typing import Optional, Dict, Any, Union, Iterator # noqa: F401
from databuilder.models.dashboard_metadata import DashboardMetadata
from databuilder.models.neo4j_csv_serde import (
Neo4jCsvSerializable, NODE_LABEL, NODE_KEY, RELATION_START_KEY, RELATION_END_KEY, RELATION_START_LABEL,
RELATION_END_LABEL, RELATION_TYPE, RELATION_REVERSE_TYPE)
LOGGER = logging.getLogger(__name__)
class DashboardExecution(Neo4jCsvSerializable):
"""
A model that encapsulate Dashboard's execution timestamp in epoch and execution state
"""
DASHBOARD_EXECUTION_LABEL = 'Execution'
DASHBOARD_EXECUTION_KEY_FORMAT = '{product}_dashboard://{cluster}.{dashboard_group_id}/' \
'{dashboard_id}/execution/{execution_id}'
DASHBOARD_EXECUTION_RELATION_TYPE = 'LAST_EXECUTED'
EXECUTION_DASHBOARD_RELATION_TYPE = 'LAST_EXECUTION_OF'
def __init__(self,
dashboard_group_id, # type: Optional[str]
dashboard_id, # type: Optional[str]
execution_timestamp, # type: int
execution_state, # type: str
execution_id='_last_execution', # type: str
product='', # type: Optional[str]
cluster='gold', # type: str
**kwargs
):
self._dashboard_group_id = dashboard_group_id
self._dashboard_id = dashboard_id
self._execution_timestamp = execution_timestamp
self._execution_state = execution_state
self._execution_id = execution_id
self._product = product
self._cluster = cluster
self._node_iterator = self._create_node_iterator()
self._relation_iterator = self._create_relation_iterator()
def create_next_node(self):
# type: () -> Union[Dict[str, Any], None]
try:
return next(self._node_iterator)
except StopIteration:
return None
def _create_node_iterator(self): # noqa: C901
# type: () -> Iterator[[Dict[str, Any]]]
yield {
NODE_LABEL: DashboardExecution.DASHBOARD_EXECUTION_LABEL,
NODE_KEY: self._get_last_execution_node_key(),
'time_stamp': self._execution_timestamp,
'state': self._execution_state
}
def create_next_relation(self):
# type: () -> Union[Dict[str, Any], None]
try:
return next(self._relation_iterator)
except StopIteration:
return None
def _create_relation_iterator(self):
# type: () -> Iterator[[Dict[str, Any]]]
yield {
RELATION_START_LABEL: DashboardMetadata.DASHBOARD_NODE_LABEL,
RELATION_END_LABEL: DashboardExecution.DASHBOARD_EXECUTION_LABEL,
RELATION_START_KEY: DashboardMetadata.DASHBOARD_KEY_FORMAT.format(
product=self._product,
cluster=self._cluster,
dashboard_group=self._dashboard_group_id,
dashboard_name=self._dashboard_id
),
RELATION_END_KEY: self._get_last_execution_node_key(),
RELATION_TYPE: DashboardExecution.DASHBOARD_EXECUTION_RELATION_TYPE,
RELATION_REVERSE_TYPE: DashboardExecution.EXECUTION_DASHBOARD_RELATION_TYPE
}
def _get_last_execution_node_key(self):
return DashboardExecution.DASHBOARD_EXECUTION_KEY_FORMAT.format(
product=self._product,
cluster=self._cluster,
dashboard_group_id=self._dashboard_group_id,
dashboard_id=self._dashboard_id,
execution_id=self._execution_id
)
def __repr__(self):
return 'DashboardExecution({!r}, {!r}, {!r}, {!r}, {!r}, {!r}, {!r})'.format(
self._dashboard_group_id,
self._dashboard_id,
self._execution_timestamp,
self._execution_state,
self._execution_id,
self._product,
self._cluster
)
......@@ -55,6 +55,7 @@ class RestApiQuery(BaseRestApiQuery):
field_names, # type: List[str]
fail_no_result=False, # type: bool
skip_no_result=False, # type: bool
json_path_contains_or=False, # type: bool
):
# type: (...) -> None
"""
......@@ -92,20 +93,38 @@ class RestApiQuery(BaseRestApiQuery):
:param fail_no_result: If there's no result from the query it will make it fail.
:param skip_no_result: If there's no result from the query, it will skip this record.
:param json_path_contains_or: JSON Path expression accepts | ( OR ) operation, mostly to extract values in
different level. In this case, JSON Path will extract the value from first expression and then second,
and so forth.
Example:
JSON result:
[{"report_id": "1", "report_name": "first report", "foo": {"bar": "baz"}},
{"report_id": "2", "report_name": "second report", "foo": {"bar": "box"}}]
JSON PATH:
([*].report_id) | ([*].(foo.bar))
["1", "2", "baz", "box"]
"""
self._inner_rest_api_query = query_to_join
self._url = url
self._params = params
self._json_path = json_path
if ',' in json_path and '|' in json_path:
raise Exception('RestApiQuery does not support "and (,)" and "or (|)" at the same time')
self._jsonpath_expr = parse(self._json_path)
self._fail_no_result = fail_no_result
self._skip_no_result = skip_no_result
self._field_names = field_names
self._json_path_contains_or = json_path_contains_or
self._more_pages = False
def execute(self):
# type: () -> Iterator[Dict[str, Any]]
self._authenticate()
for record_dict in self._inner_rest_api_query.execute():
......@@ -135,10 +154,14 @@ class RestApiQuery(BaseRestApiQuery):
yield copy.deepcopy(record_dict)
while result_list:
sub_records = RestApiQuery._compute_sub_records(result_list=result_list,
field_names=self._field_names,
json_path_contains_or=self._json_path_contains_or)
for sub_record in sub_records:
record_dict = copy.deepcopy(record_dict)
for field_name in self._field_names:
record_dict[field_name] = result_list.pop(0)
record_dict[field_name] = sub_record.pop(0)
yield record_dict
self._post_process(response)
......@@ -152,7 +175,6 @@ class RestApiQuery(BaseRestApiQuery):
:param record:
:return: a URL that is ready to be called.
"""
return self._url.format(**record)
@retry(stop_max_attempt_number=5, wait_exponential_multiplier=1000, wait_exponential_max=10000)
......@@ -165,12 +187,57 @@ class RestApiQuery(BaseRestApiQuery):
:param url:
:return:
"""
LOGGER.info('Calling URL {}'.format(url))
response = requests.get(url, **self._params)
response.raise_for_status()
return response
@classmethod
def _compute_sub_records(self,
result_list, # type: List
field_names, # type: List[str]
json_path_contains_or=False, # type: bool
):
"""
The behavior of JSONPATH is different when it's extracting multiple fields using AND(,) vs OR(|)
If it uses AND(,), first n records will be first record. If it uses OR(|), it will list first field of all
records, and then second field of all records etc.
For example, when we have 3 fields to extract using "AND(,)" in JSONPATH:
Result from JSONPATH:
['1', 'a', 'x', '2', 'b', 'y', '3', 'c', 'z']
Resulting 3 records (means that original JSON has an array of size 3):
['1', 'a', 'x'], ['2', 'b', 'y'], ['3', 'c', 'z']
When we have two fields and extracting using "OR(|)" in JSONPATH, the result is follow:
Result from JSONPATH:
['1', '2', '3', 'a', 'b', 'c']
Resulting 3 records (means that original JSON has an array of size 3):
['1', 'a'], ['2', 'b'], ['3', 'c']
:param result_list:
:param field_names:
:param json_path_contains_or:
:return:
"""
# type: (...) -> List[List[Any]]
if not field_names:
raise Exception('Field names should not be empty')
if not json_path_contains_or:
return [result_list[i:i + len(field_names)] for i in range(0, len(result_list), len(field_names))]
result = []
num_subresult = int(len(result_list) / len(field_names))
for i in range(num_subresult):
sub_result = [result_list[j] for j in range(i, len(result_list), num_subresult)]
result.append(sub_result)
return result
def _post_process(self,
response, # type: requests.Response
):
......
import importlib
import logging
from pyhocon import ConfigTree # noqa: F401
from typing import Any, Dict # noqa: F401
from databuilder.transformer.base_transformer import Transformer
MODEL_CLASS = 'model_class'
LOGGER = logging.getLogger(__name__)
class DictToModel(Transformer):
"""
Transforms dictionary into model
"""
def init(self, conf):
# type: (ConfigTree) -> None
model_class = conf.get_string(MODEL_CLASS)
module_name, class_name = model_class.rsplit(".", 1)
mod = importlib.import_module(module_name)
self._model_class = getattr(mod, class_name)
def transform(self, record):
# type: (Dict[str, Any]) -> Dict[str, Any]
return self._model_class(**record)
def get_scope(self):
# type: () -> str
return 'transformer.dict_to_model'
import logging
from datetime import datetime
from pyhocon import ConfigFactory # noqa: F401
from pyhocon import ConfigTree # noqa: F401
from typing import Any, Dict # noqa: F401
from databuilder.transformer.base_transformer import Transformer
TIMESTAMP_FORMAT = 'timestamp_format'
FIELD_NAME = 'field_name'
LOGGER = logging.getLogger(__name__)
DEFAULT_CONFIG = ConfigFactory.from_dict({TIMESTAMP_FORMAT: '%Y-%m-%dT%H:%M:%S.%fZ'})
class TimestampStringToEpoch(Transformer):
"""
Transforms string timestamp into epoch
"""
def init(self, conf):
# type: (ConfigTree) -> None
self._conf = conf.with_fallback(DEFAULT_CONFIG)
self._timestamp_format = self._conf.get_string(TIMESTAMP_FORMAT)
self._field_name = self._conf.get_string(FIELD_NAME)
def transform(self, record):
# type: (Dict[str, Any]) -> Dict[str, Any]
timestamp_str = record.get(self._field_name, '')
if not timestamp_str:
return record
utc_dt = datetime.strptime(timestamp_str, self._timestamp_format)
record[self._field_name] = int((utc_dt - datetime(1970, 1, 1)).total_seconds())
return record
def get_scope(self):
# type: () -> str
return 'transformer.timestamp_str_to_epoch'
......@@ -71,3 +71,64 @@ class TestRestApiQuery(unittest.TestCase):
for actual in query.execute():
self.assertDictEqual(expected.pop(0), actual)
def test_compute_subresult_single_field(self):
sub_records = RestApiQuery._compute_sub_records(result_list=['1', '2', '3'], field_names=['foo'])
expected_records = [
['1'], ['2'], ['3']
]
assert expected_records == sub_records
sub_records = RestApiQuery._compute_sub_records(result_list=['1', '2', '3'], field_names=['foo'],
json_path_contains_or=True)
assert expected_records == sub_records
def test_compute_subresult_multiple_fields_json_path_and_expression(self):
sub_records = RestApiQuery._compute_sub_records(
result_list=['1', 'a', '2', 'b', '3', 'c'], field_names=['foo', 'bar'])
expected_records = [
['1', 'a'], ['2', 'b'], ['3', 'c']
]
assert expected_records == sub_records
sub_records = RestApiQuery._compute_sub_records(
result_list=['1', 'a', 'x', '2', 'b', 'y', '3', 'c', 'z'], field_names=['foo', 'bar', 'baz'])
expected_records = [
['1', 'a', 'x'], ['2', 'b', 'y'], ['3', 'c', 'z']
]
assert expected_records == sub_records
def test_compute_subresult_multiple_fields_json_path_or_expression(self):
sub_records = RestApiQuery._compute_sub_records(
result_list=['1', '2', '3', 'a', 'b', 'c'],
field_names=['foo', 'bar'],
json_path_contains_or=True
)
expected_records = [
['1', 'a'], ['2', 'b'], ['3', 'c']
]
self.assertEqual(expected_records, sub_records)
sub_records = RestApiQuery._compute_sub_records(
result_list=['1', '2', '3', 'a', 'b', 'c', 'x', 'y', 'z'],
field_names=['foo', 'bar', 'baz'],
json_path_contains_or=True)
expected_records = [
['1', 'a', 'x'], ['2', 'b', 'y'], ['3', 'c', 'z']
]
self.assertEqual(expected_records, sub_records)
if __name__ == '__main__':
unittest.main()
import unittest
from pyhocon import ConfigFactory
from databuilder.transformer.dict_to_model import DictToModel, MODEL_CLASS
from databuilder.models.dashboard.dashboard_execution import DashboardExecution
class TestDictToModel(unittest.TestCase):
def test_conversion(self):
# type: () -> None
transformer = DictToModel()
config = ConfigFactory.from_dict({
MODEL_CLASS: 'databuilder.models.dashboard.dashboard_execution.DashboardExecution',
})
transformer.init(conf=config)
actual = transformer.transform(
{
'dashboard_group_id': 'foo',
'dashboard_id': 'bar',
'execution_timestamp': 123456789,
'execution_state': 'succeed',
'product': 'mode',
'cluster': 'gold'
}
)
self.assertTrue(isinstance(actual, DashboardExecution))
self.assertEqual(actual.__repr__(), DashboardExecution(
dashboard_group_id='foo',
dashboard_id='bar',
execution_timestamp=123456789,
execution_state='succeed',
product='mode',
cluster='gold'
).__repr__())
import unittest
from pyhocon import ConfigFactory
from databuilder.transformer.timestamp_string_to_epoch import TimestampStringToEpoch, FIELD_NAME, TIMESTAMP_FORMAT
class TestTimestampStrToEpoch(unittest.TestCase):
def test_conversion(self):
# type: () -> None
transformer = TimestampStringToEpoch()
config = ConfigFactory.from_dict({
FIELD_NAME: 'foo',
})
transformer.init(conf=config)
actual = transformer.transform({'foo': '2020-02-19T19:52:33.1Z'})
self.assertDictEqual({'foo': 1582141953}, actual)
def test_conversion_with_format(self):
# type: () -> None
transformer = TimestampStringToEpoch()
config = ConfigFactory.from_dict({
FIELD_NAME: 'foo',
TIMESTAMP_FORMAT: '%Y-%m-%dT%H:%M:%SZ'
})
transformer.init(conf=config)
actual = transformer.transform({'foo': '2020-02-19T19:52:33Z'})
self.assertDictEqual({'foo': 1582141953}, actual)
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment