Unverified Commit b8db1626 authored by Jin Hyuk Chang's avatar Jin Hyuk Chang Committed by GitHub

feat: Ingest Mode Analytics user (#282)

* Ingest Mode Analytics user

* Flake8

* Update

* Added doc
parent 06c1dbef
...@@ -587,6 +587,27 @@ job = DefaultJob(conf=job_config, ...@@ -587,6 +587,27 @@ job = DefaultJob(conf=job_config,
job.launch() job.launch()
``` ```
#### [ModeDashboardUserExtractor](./databuilder/extractor/dashboard/mode_analytics/mode_dashboard_user_extractor.py)
A Extractor that extracts Mode user_id and then update User node.
You can create Databuilder job config like this. (configuration related to loader and publisher is omitted as it is mostly the same. Please take a look at this [example](#ModeDashboardExtractor) for the configuration that holds loader and publisher.
```python
extractor = ModeDashboardUserExtractor()
task = DefaultTask(extractor=extractor, loader=FsNeo4jCSVLoader())
job_config = ConfigFactory.from_dict({
'{}.{}'.format(extractor.get_scope(), ORGANIZATION): organization,
'{}.{}'.format(extractor.get_scope(), MODE_ACCESS_TOKEN): mode_token,
'{}.{}'.format(extractor.get_scope(), MODE_PASSWORD_TOKEN): mode_password,
})
job = DefaultJob(conf=job_config,
task=task,
publisher=Neo4jCsvPublisher())
job.launch()
```
#### [ModeDashboardUsageExtractor](./databuilder/extractor/dashboard/mode_analytics/mode_dashboard_usage_extractor.py) #### [ModeDashboardUsageExtractor](./databuilder/extractor/dashboard/mode_analytics/mode_dashboard_usage_extractor.py)
A Extractor that extracts Mode dashboard's accumulated view count. A Extractor that extracts Mode dashboard's accumulated view count.
...@@ -625,6 +646,9 @@ Transforms dictionary into model ...@@ -625,6 +646,9 @@ Transforms dictionary into model
#### [TimestampStringToEpoch](./databuilder/transformer/timestamp_string_to_epoch.py) #### [TimestampStringToEpoch](./databuilder/transformer/timestamp_string_to_epoch.py)
Transforms string timestamp into int epoch Transforms string timestamp into int epoch
#### [RemoveFieldTransformer](./databuilder/transformer/remove_field_transformer.py)
Remove fields from the Dict.
## List of loader ## List of loader
#### [FsNeo4jCSVLoader](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/loader/file_system_neo4j_csv_loader.py "FsNeo4jCSVLoader") #### [FsNeo4jCSVLoader](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/loader/file_system_neo4j_csv_loader.py "FsNeo4jCSVLoader")
......
import logging
from typing import Any # noqa: F401
from pyhocon import ConfigTree, ConfigFactory # noqa: F401
from requests.auth import HTTPBasicAuth
from databuilder import Scoped
from databuilder.extractor.base_extractor import Extractor
from databuilder.extractor.dashboard.mode_analytics.mode_dashboard_constants import ORGANIZATION, MODE_ACCESS_TOKEN, \
MODE_PASSWORD_TOKEN
from databuilder.extractor.dashboard.mode_analytics.mode_dashboard_utils import ModeDashboardUtils
from databuilder.rest_api.base_rest_api_query import RestApiQuerySeed
from databuilder.rest_api.rest_api_failure_handlers import HttpFailureSkipOnStatus
from databuilder.rest_api.rest_api_query import RestApiQuery
from databuilder.transformer.base_transformer import ChainedTransformer
from databuilder.transformer.dict_to_model import DictToModel, MODEL_CLASS
from databuilder.transformer.remove_field_transformer import RemoveFieldTransformer, FIELD_NAMES
LOGGER = logging.getLogger(__name__)
class ModeDashboardUserExtractor(Extractor):
"""
An Extractor that extracts all Mode Dashboard user and add mode_user_id attribute to User model.
"""
def init(self, conf):
# type: (ConfigTree) -> None
self._conf = conf
restapi_query = self._build_restapi_query()
self._extractor = ModeDashboardUtils.create_mode_rest_api_extractor(
restapi_query=restapi_query,
conf=self._conf
)
# Remove all unnecessary fields because User model accepts all attributes and push it to Neo4j.
transformers = []
remove_fields_transformer = RemoveFieldTransformer()
remove_fields_transformer.init(
conf=Scoped.get_scoped_conf(self._conf, remove_fields_transformer.get_scope()).with_fallback(
ConfigFactory.from_dict(
{FIELD_NAMES: ['organization', 'mode_user_resource_path', 'product']})))
transformers.append(remove_fields_transformer)
dict_to_model_transformer = DictToModel()
dict_to_model_transformer.init(
conf=Scoped.get_scoped_conf(self._conf, dict_to_model_transformer.get_scope()).with_fallback(
ConfigFactory.from_dict(
{MODEL_CLASS: 'databuilder.models.user.User'})))
transformers.append(dict_to_model_transformer)
self._transformer = ChainedTransformer(transformers=transformers)
def extract(self):
# type: () -> Any
record = self._extractor.extract()
if not record:
return None
return self._transformer.transform(record=record)
def get_scope(self):
# type: () -> str
return 'extractor.mode_dashboard_owner'
def _build_restapi_query(self):
"""
Build REST API Query. To get Mode Dashboard owner, it needs to call three APIs (spaces API, reports
API, and user API) joining together.
:return: A RestApiQuery that provides Mode Dashboard owner
"""
# type: () -> RestApiQuery
# Seed query record for next query api to join with
seed_record = [{
'organization': self._conf.get_string(ORGANIZATION),
'is_active': None,
'updated_at': None,
'do_not_update_empty_attribute': True,
}]
seed_query = RestApiQuerySeed(seed_record=seed_record)
# memberships
# https://mode.com/developer/api-reference/management/organization-memberships/#listMemberships
memberships_url_template = 'https://app.mode.com/api/{organization}/memberships'
params = {'auth': HTTPBasicAuth(self._conf.get_string(MODE_ACCESS_TOKEN),
self._conf.get_string(MODE_PASSWORD_TOKEN))}
json_path = '(_embedded.memberships[*].member_username) | (_embedded.memberships[*]._links.user.href)'
field_names = ['mode_user_id', 'mode_user_resource_path']
mode_user_ids_query = RestApiQuery(query_to_join=seed_query, url=memberships_url_template, params=params,
json_path=json_path, field_names=field_names,
skip_no_result=True, json_path_contains_or=True)
# https://mode.com/developer/api-reference/management/users/
user_url_template = 'https://app.mode.com{mode_user_resource_path}'
json_path = 'email'
field_names = ['email']
failure_handler = HttpFailureSkipOnStatus(status_codes_to_skip={404})
mode_user_email_query = RestApiQuery(query_to_join=mode_user_ids_query, url=user_url_template,
params=params, json_path=json_path, field_names=field_names,
skip_no_result=True, can_skip_failure=failure_handler.can_skip_failure)
return mode_user_email_query
...@@ -43,6 +43,7 @@ class User(Neo4jCsvSerializable): ...@@ -43,6 +43,7 @@ class User(Neo4jCsvSerializable):
is_active=True, # type: bool is_active=True, # type: bool
updated_at=0, # type: int updated_at=0, # type: int
role_name='', # type: str role_name='', # type: str
do_not_update_empty_attribute=False, # type: bool
**kwargs # type: Dict **kwargs # type: Dict
): ):
# type: (...) -> None # type: (...) -> None
...@@ -62,6 +63,8 @@ class User(Neo4jCsvSerializable): ...@@ -62,6 +63,8 @@ class User(Neo4jCsvSerializable):
then we will have a cron job to update the ex-employee nodes based on then we will have a cron job to update the ex-employee nodes based on
the case if this timestamp hasn't been updated for two weeks. the case if this timestamp hasn't been updated for two weeks.
:param role_name: the role_name of the user (e.g swe) :param role_name: the role_name of the user (e.g swe)
:param do_not_update_empty_attribute: If False, all empty or not defined params will be overwritten with
empty string.
:param kwargs: Any K/V attributes we want to update the :param kwargs: Any K/V attributes we want to update the
""" """
self.first_name = first_name self.first_name = first_name
...@@ -79,6 +82,7 @@ class User(Neo4jCsvSerializable): ...@@ -79,6 +82,7 @@ class User(Neo4jCsvSerializable):
self.is_active = is_active self.is_active = is_active
self.updated_at = updated_at self.updated_at = updated_at
self.role_name = role_name self.role_name = role_name
self.do_not_update_empty_attribute = do_not_update_empty_attribute
self.attrs = None self.attrs = None
if kwargs: if kwargs:
self.attrs = copy.deepcopy(kwargs) self.attrs = copy.deepcopy(kwargs)
...@@ -132,14 +136,23 @@ class User(Neo4jCsvSerializable): ...@@ -132,14 +136,23 @@ class User(Neo4jCsvSerializable):
result_node[User.USER_NODE_TEAM] = self.team_name if self.team_name else '' result_node[User.USER_NODE_TEAM] = self.team_name if self.team_name else ''
result_node[User.USER_NODE_EMPLOYEE_TYPE] = self.employee_type if self.employee_type else '' result_node[User.USER_NODE_EMPLOYEE_TYPE] = self.employee_type if self.employee_type else ''
result_node[User.USER_NODE_SLACK_ID] = self.slack_id if self.slack_id else '' result_node[User.USER_NODE_SLACK_ID] = self.slack_id if self.slack_id else ''
result_node[User.USER_NODE_UPDATED_AT] = self.updated_at if self.updated_at else 0
result_node[User.USER_NODE_ROLE_NAME] = self.role_name if self.role_name else '' result_node[User.USER_NODE_ROLE_NAME] = self.role_name if self.role_name else ''
if self.updated_at:
result_node[User.USER_NODE_UPDATED_AT] = self.updated_at
elif not self.do_not_update_empty_attribute:
result_node[User.USER_NODE_UPDATED_AT] = 0
if self.attrs: if self.attrs:
for k, v in self.attrs.items(): for k, v in self.attrs.items():
if k not in result_node: if k not in result_node:
result_node[k] = v result_node[k] = v
if self.do_not_update_empty_attribute:
for k, v in list(result_node.items()):
if not v:
del result_node[k]
return [result_node] return [result_node]
def create_relation(self): def create_relation(self):
......
import logging
from typing import Any, Dict # noqa: F401
from pyhocon import ConfigTree # noqa: F401
from databuilder.transformer.base_transformer import Transformer
FIELD_NAMES = 'field_names' # field name to be removed
LOGGER = logging.getLogger(__name__)
class RemoveFieldTransformer(Transformer):
"""
Remove field in Dict by specifying list of fields (keys).
"""
def init(self, conf):
# type: (ConfigTree) -> None
self._field_names = conf.get_list(FIELD_NAMES)
def transform(self, record):
# type: (Dict[str, Any]) -> Dict[str, Any]
for k in self._field_names:
if k in record:
del record[k]
return record
def get_scope(self):
# type: () -> str
return 'transformer.remove_field'
...@@ -57,3 +57,5 @@ unicodecsv==0.14.1,<1.0 ...@@ -57,3 +57,5 @@ unicodecsv==0.14.1,<1.0
httplib2>=0.18.0 httplib2>=0.18.0
unidecode unidecode
requests==2.23.0,<3.0
...@@ -2,7 +2,7 @@ import os ...@@ -2,7 +2,7 @@ import os
from setuptools import setup, find_packages from setuptools import setup, find_packages
__version__ = '2.6.0' __version__ = '2.6.1'
requirements_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'requirements.txt') requirements_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'requirements.txt')
with open(requirements_path) as requirements_file: with open(requirements_path) as requirements_file:
......
...@@ -2,7 +2,6 @@ import unittest ...@@ -2,7 +2,6 @@ import unittest
from databuilder.models.neo4j_csv_serde import RELATION_START_KEY, RELATION_START_LABEL, RELATION_END_KEY, \ from databuilder.models.neo4j_csv_serde import RELATION_START_KEY, RELATION_START_LABEL, RELATION_END_KEY, \
RELATION_END_LABEL, RELATION_TYPE, RELATION_REVERSE_TYPE RELATION_END_LABEL, RELATION_TYPE, RELATION_REVERSE_TYPE
from databuilder.models.user import User from databuilder.models.user import User
...@@ -71,3 +70,22 @@ class TestUser(unittest.TestCase): ...@@ -71,3 +70,22 @@ class TestUser(unittest.TestCase):
} }
self.assertTrue(relation in relations) self.assertTrue(relation in relations)
def test_not_including_empty_attribute(self):
# type: () -> None
test_user = User(email='test@email.com',
foo='bar')
self.assertDictEqual(test_user.create_next_node(),
{'KEY': 'test@email.com', 'LABEL': 'User', 'email': 'test@email.com',
'is_active:UNQUOTED': True, 'first_name': '', 'last_name': '', 'full_name': '',
'github_username': '', 'team_name': '', 'employee_type': '', 'slack_id': '',
'role_name': '', 'updated_at': 0, 'foo': 'bar'})
test_user2 = User(email='test@email.com',
foo='bar',
is_active=None,
do_not_update_empty_attribute=True)
self.assertDictEqual(test_user2.create_next_node(),
{'KEY': 'test@email.com', 'LABEL': 'User', 'email': 'test@email.com', 'foo': 'bar'})
import unittest
from pyhocon import ConfigFactory
from databuilder.transformer.remove_field_transformer import RemoveFieldTransformer, FIELD_NAMES
class TestRemoveFieldTransformer(unittest.TestCase):
def test_conversion(self):
# type: () -> None
transformer = RemoveFieldTransformer()
config = ConfigFactory.from_dict({
FIELD_NAMES: ['foo', 'bar'],
})
transformer.init(conf=config)
actual = transformer.transform({
'foo': 'foo_val',
'bar': 'bar_val',
'baz': 'baz_val',
})
expected = {
'baz': 'baz_val'
}
self.assertDictEqual(expected, actual)
def test_conversion_missing_field(self):
# type: () -> None
transformer = RemoveFieldTransformer()
config = ConfigFactory.from_dict({
FIELD_NAMES: ['foo', 'bar'],
})
transformer.init(conf=config)
actual = transformer.transform({
'foo': 'foo_val',
'baz': 'baz_val',
'john': 'doe',
})
expected = {
'baz': 'baz_val',
'john': 'doe'
}
self.assertDictEqual(expected, actual)
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment