Commit 142600c0 authored by Jakub Hettler's avatar Jakub Hettler Committed by Tao Feng

Add Dashboard and Metrics (#120)

* Load service location from ENV

* Add Dashboards and Metrics

* Fix default ES names

* Make tag names always lowercase

* Update sample_data_loader.py
parent 9e0fb3b6
......@@ -11,6 +11,7 @@ dist/
venv/
venv3/
.cache/
.env
.idea/
.vscode/
.coverage
......
import textwrap
from typing import Any # noqa: F401
from pyhocon import ConfigTree # noqa: F401
from databuilder import Scoped
from databuilder.extractor.base_extractor import Extractor
from databuilder.extractor.neo4j_extractor import Neo4jExtractor
from databuilder.publisher.neo4j_csv_publisher import JOB_PUBLISH_TAG
class Neo4jDashboardSearchDataExtractor(Extractor):
"""
Extractor to fetch data required to support search from Neo4j graph database
Use Neo4jExtractor extractor class
"""
CYPHER_QUERY_CONFIG_KEY = 'cypher_query'
DEFAULT_NEO4J_CYPHER_QUERY = textwrap.dedent(
"""
MATCH (dgroup:Dashboardgroup)<-[:DASHBOARD_OF]-(d:Dashboard)
{publish_tag_filter}
OPTIONAL MATCH (d)-[:DESCRIPTION]->(ddesc:Description)
OPTIONAL MATCH (d)-[:OWNER]->(owner:User)
OPTIONAL MATCH (d)-[:TAG]->(tags:Tag)
OPTIONAL MATCH (d)-[:LAST_RELOAD_TIME]->(lrt:Lastreloadtime)
RETURN dgroup.name as dashboard_group, d.name AS dashboard_name,
ddesc.description AS description, owner.full_name AS user_name,
owner.key AS user_id, left(lrt.value,16) as last_reload_time,
COLLECT(DISTINCT lower(tags.key)) as tags
ORDER BY d.name;
"""
)
def init(self, conf):
# type: (ConfigTree) -> None
"""
Initialize Neo4jExtractor object from configuration and use that for extraction
"""
self.conf = conf
# extract cypher query from conf, if specified, else use default query
if Neo4jDashboardSearchDataExtractor.CYPHER_QUERY_CONFIG_KEY in conf:
self.cypher_query = conf.get_string(Neo4jDashboardSearchDataExtractor.CYPHER_QUERY_CONFIG_KEY)
else:
self.cypher_query = self._add_publish_tag_filter(conf.get_string(JOB_PUBLISH_TAG, ''),
Neo4jDashboardSearchDataExtractor.
DEFAULT_NEO4J_CYPHER_QUERY)
self.neo4j_extractor = Neo4jExtractor()
# write the cypher query in configs in Neo4jExtractor scope
key = self.neo4j_extractor.get_scope() + '.' + Neo4jExtractor.CYPHER_QUERY_CONFIG_KEY
self.conf.put(key, self.cypher_query)
# initialize neo4j_extractor from configs
self.neo4j_extractor.init(Scoped.get_scoped_conf(self.conf, self.neo4j_extractor.get_scope()))
def close(self):
# type: () -> None
"""
Use close() method specified by neo4j_extractor
to close connection to neo4j cluster
"""
self.neo4j_extractor.close()
def extract(self):
# type: () -> Any
"""
Invoke extract() method defined by neo4j_extractor
"""
return self.neo4j_extractor.extract()
def get_scope(self):
# type: () -> str
return 'extractor.dashboard_search_data'
def _add_publish_tag_filter(self, publish_tag, cypher_query):
"""
Adds publish tag filter into Cypher query
:param publish_tag: value of publish tag.
:param cypher_query:
:return:
"""
if not publish_tag:
publish_tag_filter = ''
else:
publish_tag_filter = """WHERE dashboard.published_tag = '{}'""".format(publish_tag)
return cypher_query.format(publish_tag_filter=publish_tag_filter)
import textwrap
from typing import Any # noqa: F401
from pyhocon import ConfigTree # noqa: F401
from databuilder import Scoped
from databuilder.extractor.base_extractor import Extractor
from databuilder.extractor.neo4j_extractor import Neo4jExtractor
from databuilder.publisher.neo4j_csv_publisher import JOB_PUBLISH_TAG
class Neo4jMetricSearchDataExtractor(Extractor):
"""
Extractor to fetch data required to support search from Neo4j graph database
Use Neo4jExtractor extractor class
"""
CYPHER_QUERY_CONFIG_KEY = 'cypher_query'
DEFAULT_NEO4J_CYPHER_QUERY = textwrap.dedent(
"""
MATCH (m)-[:METRIC_OF]->(d:Dashboard)
{publish_tag_filter}
OPTIONAL MATCH (m)-[:DESCRIPTION]->(mdesc:Description)
OPTIONAL MATCH (m)-[:METRIC_TYPE]->(mtype:Metrictype)
OPTIONAL MATCH (d)-[:DASHBOARD_OF]->(dg:Dashboardgroup)
OPTIONAL MATCH (m)-[:TAG]->(tags:Tag)
RETURN m.name as name, mtype.name as type,
mdesc.description as description,
COLLECT(DISTINCT lower(tags.key)) as tags,
COLLECT(DISTINCT dg.name+"://" +d.name) as dashboards
"""
)
def init(self, conf):
# type: (ConfigTree) -> None
"""
Initialize Neo4jExtractor object from configuration and use that for extraction
"""
self.conf = conf
# extract cypher query from conf, if specified, else use default query
if Neo4jMetricSearchDataExtractor.CYPHER_QUERY_CONFIG_KEY in conf:
self.cypher_query = conf.get_string(Neo4jMetricSearchDataExtractor.CYPHER_QUERY_CONFIG_KEY)
else:
self.cypher_query = self._add_publish_tag_filter(conf.get_string(JOB_PUBLISH_TAG, ''),
Neo4jMetricSearchDataExtractor.DEFAULT_NEO4J_CYPHER_QUERY)
self.neo4j_extractor = Neo4jExtractor()
# write the cypher query in configs in Neo4jExtractor scope
key = self.neo4j_extractor.get_scope() + '.' + Neo4jExtractor.CYPHER_QUERY_CONFIG_KEY
self.conf.put(key, self.cypher_query)
# initialize neo4j_extractor from configs
self.neo4j_extractor.init(Scoped.get_scoped_conf(self.conf, self.neo4j_extractor.get_scope()))
def close(self):
# type: () -> None
"""
Use close() method specified by neo4j_extractor
to close connection to neo4j cluster
"""
self.neo4j_extractor.close()
def extract(self):
# type: () -> Any
"""
Invoke extract() method defined by neo4j_extractor
"""
return self.neo4j_extractor.extract()
def get_scope(self):
# type: () -> str
return 'extractor.dashboard_search_data'
def _add_publish_tag_filter(self, publish_tag, cypher_query):
"""
Adds publish tag filter into Cypher query
:param publish_tag: value of publish tag.
:param cypher_query:
:return:
"""
if not publish_tag:
publish_tag_filter = ''
else:
publish_tag_filter = """WHERE metric.published_tag = '{}'""".format(publish_tag)
return cypher_query.format(publish_tag_filter=publish_tag_filter)
from typing import List, Optional # noqa: F401
from databuilder.models.elasticsearch_document import ElasticsearchDocument
class DashboardESDocument(ElasticsearchDocument):
"""
Schema for the Search index document
"""
def __init__(self,
dashboard_group, # type: str
dashboard_name, # type: str
description, # type: Union[str, None]
last_reload_time, # type: str
user_id, # type: str
user_name, # type: str
tags # type: list
):
# type: (...) -> None
self.dashboard_group = dashboard_group
self.dashboard_name = dashboard_name
self.description = description
self.last_reload_time = last_reload_time
self.user_id = user_id
self.user_name = user_name
self.tags = tags
This diff is collapsed.
from typing import List, Optional # noqa: F401
from databuilder.models.elasticsearch_document import ElasticsearchDocument
class MetricESDocument(ElasticsearchDocument):
"""
Schema for the Search index document
"""
def __init__(self,
name, # type: str
description, # type: str
type, # type: str
dashboards, # type: List
tags, # type: List
):
# type: (...) -> None
self.name = name
self.description = description
self.type = type
self.dashboards = dashboards
self.tags = tags
from collections import namedtuple
from typing import Iterable, Any, Union, Iterator, Dict, Set # noqa: F401
# TODO: We could separate TagMetadata from table_metadata to own module
from databuilder.models.table_metadata import TagMetadata
from databuilder.models.neo4j_csv_serde import (
Neo4jCsvSerializable, NODE_LABEL, NODE_KEY, RELATION_START_KEY, RELATION_END_KEY, RELATION_START_LABEL,
RELATION_END_LABEL, RELATION_TYPE, RELATION_REVERSE_TYPE)
NodeTuple = namedtuple('KeyName', ['key', 'name', 'label'])
RelTuple = namedtuple('RelKeys', ['start_label', 'end_label', 'start_key', 'end_key', 'type', 'reverse_type'])
class MetricMetadata(Neo4jCsvSerializable):
"""
Table metadata that contains columns. It implements Neo4jCsvSerializable so that it can be serialized to produce
Table, Column and relation of those along with relationship with table and schema. Additionally, it will create
Database, Cluster, and Schema with relastionships between those.
These are being created here as it does not make much sense to have different extraction to produce this. As
database, cluster, schema would be very repititive with low cardinality, it will perform de-dupe so that publisher
won't need to publish same nodes, relationships.
This class can be used for both table and view metadata. If it is a View, is_view=True should be passed in.
"""
DESCRIPTION_NODE_LABEL = 'Description'
METRIC_NODE_LABEL = 'Metric'
METRIC_KEY_FORMAT = 'metric://{name}'
METRIC_NAME = 'name'
METRIC_DESCRIPTION = 'description'
METRIC_DESCRIPTION_FORMAT = 'metric://{name}/_description'
METRIC_DESCRIPTION_RELATION_TYPE = 'DESCRIPTION'
DESCRIPTION_METRIC_RELATION_TYPE = 'DESCRIPTION_OF'
DASHBOARD_NODE_LABEL = 'Dashboard'
DASHBOARD_KEY_FORMAT = '{dashboard_group}://{dashboard_name}'
DASHBOARD_NAME = 'name'
DASHBOARD_METRIC_RELATION_TYPE = 'METRIC'
METRIC_DASHBOARD_RELATION_TYPE = 'METRIC_OF'
METRIC_TYPE_NODE_LABEL = 'Metrictype'
METRIC_TYPE_KEY_FORMAT = 'type://{type}'
METRIC_METRIC_TYPE_RELATION_TYPE = 'METRIC_TYPE'
METRIC_TYPE_METRIC_RELATION_TYPE = 'METRIC_TYPE_OF'
# TODO: Idea, maybe move expression from attribute
# to node or delete commented code below?
# METRIC_EXPRESSION_NODE_LABEL = 'Metricexpression'
# METRIC_EXPRESSION_KEY_FORMAT = 'expression://{name}'
# METRIC_METRIC_EXPRESSION_RELATION_TYPE = 'DEFINITION'
# METRIC_EXPRESSION_METRIC_RELATION_TYPE = 'DEFINITION_OF'
METRIC_EXPRESSION_VALUE = 'expression'
METRIC_TAG_RELATION_TYPE = 'TAG'
TAG_METRIC_RELATION_TYPE = 'TAG_OF'
serialized_nodes = set() # type: Set[Any]
serialized_rels = set() # type: Set[Any]
def __init__(self,
dashboard_group, # type: str
dashboard_name, # type: str
name, # type: Union[str, None]
expression, # type: str
description, # type: str
type, # type: str
tags, # type: List
):
# type: (...) -> None
self.dashboard_group = dashboard_group
self.dashboard_name = dashboard_name
self.name = name
self.expression = expression
self.description = description
self.type = type
self.tags = tags
self._node_iterator = self._create_next_node()
self._relation_iterator = self._create_next_relation()
def __repr__(self):
# type: () -> str
return 'MetricMetadata({!r}, {!r}, {!r}, {!r}, {!r}, {!r}, {!r}'.format(
self.dashboard_group,
self.dashboard_name,
self.name,
self.expression,
self.description,
self.type,
self.tags
)
def _get_metric_key(self):
# type: () -> str
return MetricMetadata.METRIC_KEY_FORMAT.format(name=self.name)
def _get_metric_type_key(self):
# type: () -> str
return MetricMetadata.METRIC_TYPE_KEY_FORMAT.format(type=self.type)
def _get_dashboard_key(self):
# type: () -> str
return MetricMetadata.DASHBOARD_KEY_FORMAT.format(dashboard_group=self.dashboard_group,
dashboard_name=self.dashboard_name)
def _get_metric_description_key(self):
# type: () -> str
return MetricMetadata.METRIC_DESCRIPTION_FORMAT.format(name=self.name)
def _get_metric_expression_key(self):
# type: () -> str
return MetricMetadata.METRIC_EXPRESSION_KEY_FORMAT.format(name=self.name)
def create_next_node(self):
# type: () -> Union[Dict[str, Any], None]
try:
return next(self._node_iterator)
except StopIteration:
return None
def _create_next_node(self):
# type: () -> Iterator[Any]
# Metric node
yield {NODE_LABEL: MetricMetadata.METRIC_NODE_LABEL,
NODE_KEY: self._get_metric_key(),
MetricMetadata.METRIC_NAME: self.name,
MetricMetadata.METRIC_EXPRESSION_VALUE: self.expression
}
# Description node
if self.description:
yield {NODE_LABEL: MetricMetadata.DESCRIPTION_NODE_LABEL,
NODE_KEY: self._get_metric_description_key(),
MetricMetadata.METRIC_DESCRIPTION: self.description}
# Metric tag node
if self.tags:
for tag in self.tags:
yield {NODE_LABEL: TagMetadata.TAG_NODE_LABEL,
NODE_KEY: TagMetadata.get_tag_key(tag),
TagMetadata.TAG_TYPE: 'metric'}
# Metric type node
if self.type:
yield {NODE_LABEL: MetricMetadata.METRIC_TYPE_NODE_LABEL,
NODE_KEY: self._get_metric_type_key(),
'name': self.type}
others = []
for node_tuple in others:
if node_tuple not in MetricMetadata.serialized_nodes:
MetricMetadata.serialized_nodes.add(node_tuple)
yield {
NODE_LABEL: node_tuple.label,
NODE_KEY: node_tuple.key,
'name': node_tuple.name
}
def create_next_relation(self):
# type: () -> Union[Dict[str, Any], None]
try:
return next(self._relation_iterator)
except StopIteration:
return None
def _create_next_relation(self):
# type: () -> Iterator[Any]
# Dashboard > Metric relation
yield {
RELATION_START_LABEL: MetricMetadata.METRIC_NODE_LABEL,
RELATION_END_LABEL: MetricMetadata.DASHBOARD_NODE_LABEL,
RELATION_START_KEY: self._get_metric_key(),
RELATION_END_KEY: self._get_dashboard_key(),
RELATION_TYPE: MetricMetadata.METRIC_DASHBOARD_RELATION_TYPE,
RELATION_REVERSE_TYPE: MetricMetadata.DASHBOARD_METRIC_RELATION_TYPE
}
# Metric > Metric description relation
if self.description:
yield {
RELATION_START_LABEL: MetricMetadata.METRIC_NODE_LABEL,
RELATION_END_LABEL: MetricMetadata.DESCRIPTION_NODE_LABEL,
RELATION_START_KEY: self._get_metric_key(),
RELATION_END_KEY: self._get_metric_description_key(),
RELATION_TYPE: MetricMetadata.METRIC_DESCRIPTION_RELATION_TYPE,
RELATION_REVERSE_TYPE: MetricMetadata.DESCRIPTION_METRIC_RELATION_TYPE
}
# Metric > Metric tag relation
if self.tags:
for tag in self.tags:
yield {
RELATION_START_LABEL: MetricMetadata.METRIC_NODE_LABEL,
RELATION_END_LABEL: TagMetadata.TAG_NODE_LABEL,
RELATION_START_KEY: self._get_metric_key(),
RELATION_END_KEY: TagMetadata.get_tag_key(tag),
RELATION_TYPE: MetricMetadata.METRIC_TAG_RELATION_TYPE,
RELATION_REVERSE_TYPE: MetricMetadata.TAG_METRIC_RELATION_TYPE
}
# Metric > Metric type relation
if self.type:
yield {
RELATION_START_LABEL: MetricMetadata.METRIC_NODE_LABEL,
RELATION_END_LABEL: MetricMetadata.METRIC_TYPE_NODE_LABEL,
RELATION_START_KEY: self._get_metric_key(),
RELATION_END_KEY: self._get_metric_type_key(),
RELATION_TYPE: MetricMetadata.METRIC_METRIC_TYPE_RELATION_TYPE,
RELATION_REVERSE_TYPE: MetricMetadata.METRIC_TYPE_METRIC_RELATION_TYPE
}
others = []
for rel_tuple in others:
if rel_tuple not in MetricMetadata.serialized_rels:
MetricMetadata.serialized_rels.add(rel_tuple)
yield {
RELATION_START_LABEL: rel_tuple.start_label,
RELATION_END_LABEL: rel_tuple.end_label,
RELATION_START_KEY: rel_tuple.start_key,
RELATION_END_KEY: rel_tuple.end_key,
RELATION_TYPE: rel_tuple.type,
RELATION_REVERSE_TYPE: rel_tuple.reverse_type
}
"""
This is a example script which demo how to load data into neo4j without using Airflow DAG.
"""
import logging
import os
from pyhocon import ConfigFactory
from databuilder.extractor.generic_extractor import GenericExtractor
from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader
from databuilder.publisher import neo4j_csv_publisher
from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher
from databuilder.task.task import DefaultTask
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.INFO)
# set env NEO4J_HOST to override localhost
NEO4J_ENDPOINT = 'bolt://{}:7687'.format(os.getenv('NEO4J_HOST', 'localhost'))
neo4j_endpoint = NEO4J_ENDPOINT
neo4j_user = 'neo4j'
neo4j_password = 'test'
# Input example
input = [
{'dashboard_name': 'Agent', 'dashboard_group': 'Product - Jobs.cz', 'description': 'description of Dash',
'last_reload_time': '2019-05-30T07:03:35.580Z', 'user_id': 'roald.amundsen@example.org',
'tags': ['test_tag', 'tag2']},
{'dashboard_name': 'Atmoskop', 'dashboard_group': 'Product - Atmoskop', 'description': 'description of Dash2',
'last_reload_time': '2019-05-30T07:07:42.326Z', 'user_id': 'buzz@example.org', 'tags': []},
{'dashboard_name': 'Dohazovac', 'dashboard_group': 'Product - Jobs.cz', 'description': '',
'last_reload_time': '2019-05-30T07:07:42.326Z', 'user_id': 'buzz@example.org',
'tags': ['test_tag', 'tag3']},
{'dashboard_name': 'PzR', 'dashboard_group': '', 'description': '',
'last_reload_time': '2019-05-30T07:07:42.326Z', 'user_id': '',
'tags': []}
]
def create_dashboard_neo4j_job(**kwargs):
tmp_folder = '/var/tmp/amundsen/table_metadata'
node_files_folder = '{tmp_folder}/nodes/'.format(tmp_folder=tmp_folder)
relationship_files_folder = '{tmp_folder}/relationships/'.format(tmp_folder=tmp_folder)
job_config = ConfigFactory.from_dict({
'extractor.generic.{}'.format(GenericExtractor.EXTRACTION_ITEMS):
iter(input),
'extractor.generic.{}'.format('model_class'):
'databuilder.models.dashboard_metadata.DashboardMetadata',
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.NODE_DIR_PATH):
node_files_folder,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.RELATION_DIR_PATH):
relationship_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NODE_FILES_DIR):
node_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.RELATION_FILES_DIR):
relationship_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_END_POINT_KEY):
neo4j_endpoint,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_USER):
neo4j_user,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_PASSWORD):
neo4j_password,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.JOB_PUBLISH_TAG):
'unique_tag', # should use unique tag here like {ds}
})
job = DefaultJob(conf=job_config,
task=DefaultTask(extractor=GenericExtractor(), loader=FsNeo4jCSVLoader()),
publisher=Neo4jCsvPublisher())
return job
if __name__ == "__main__":
job = create_dashboard_neo4j_job()
job.launch()
"""
This is a example script which demo how to load data into neo4j without using Airflow DAG.
"""
from pyhocon import ConfigFactory
import os
import random
import textwrap
from databuilder.extractor.neo4j_dashboard_search_data_extractor import Neo4jDashboardSearchDataExtractor
from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_elasticsearch_json_loader import FSElasticsearchJSONLoader
from databuilder.extractor.neo4j_extractor import Neo4jExtractor
from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher
from databuilder.task.task import DefaultTask
from elasticsearch import Elasticsearch
# set env ES_HOST to override localhost
es = Elasticsearch([
{'host': os.getenv('ES_HOST', 'localhost')},
])
# set env NEO4J_HOST to override localhost
NEO4J_ENDPOINT = 'bolt://{}:7687'.format(os.getenv('NEO4J_HOST', 'localhost'))
neo4j_endpoint = NEO4J_ENDPOINT
neo4j_user = 'neo4j'
neo4j_password = 'test'
DASHBOARD_ES_MAP = textwrap.dedent(
"""
{
"mappings":{
"dashboard":{
"properties": {
"dashboard_group": {
"type":"text",
"analyzer": "simple",
"fields": {
"raw": {
"type": "keyword"
}
}
},
"dashboard_name": {
"type":"text",
"analyzer": "simple",
"fields": {
"raw": {
"type": "keyword"
}
}
},
"description": {
"type": "text",
"analyzer": "simple"
},
"last_reload_time": {
"type": "date",
"format": "YYYY-MM-DD'T'HH:mm"
},
"user_id": {
"type": "text",
"analyzer": "simple"
},
"user_name": {
"type": "text",
"analyzer": "simple"
},
"tags": {
"type": "keyword"
}
}
}
}
}
"""
)
# todo: Add a second model
def create_neo4j_es_job():
tmp_folder = '/var/tmp/amundsen/dashboard/dashboards_search_data.json'
task = DefaultTask(loader=FSElasticsearchJSONLoader(),
extractor=Neo4jDashboardSearchDataExtractor())
elasticsearch_client = es
elasticsearch_new_index_key = 'dashboards'
elasticsearch_new_index_key_type = 'dashboard'
elasticsearch_index_alias = 'dashboard_search_index'
rand = str(random.randint(0, 1000))
job_config = ConfigFactory.from_dict({
'extractor.dashboard_search_data.extractor.neo4j.{}'.format(Neo4jExtractor.GRAPH_URL_CONFIG_KEY):
neo4j_endpoint,
'extractor.dashboard_search_data.extractor.neo4j.{}'.format(Neo4jExtractor.MODEL_CLASS_CONFIG_KEY):
'databuilder.models.dashboard_elasticsearch_document.DashboardESDocument',
'extractor.dashboard_search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_USER): neo4j_user,
'extractor.dashboard_search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_PW): neo4j_password,
'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY): tmp_folder,
'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY): 'w',
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_PATH_CONFIG_KEY): tmp_folder,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_MODE_CONFIG_KEY): 'r',
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_DOC_TYPE_CONFIG_KEY):
elasticsearch_new_index_key_type,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY):
elasticsearch_client,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY):
elasticsearch_new_index_key + str(rand),
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY):
elasticsearch_index_alias,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_MAPPING_CONFIG_KEY): DASHBOARD_ES_MAP
})
job = DefaultJob(conf=job_config,
task=task,
publisher=ElasticsearchPublisher())
return job
if __name__ == "__main__":
job = create_neo4j_es_job()
job.launch()
"""
This is a example script which demo how to load data into neo4j without using Airflow DAG.
"""
import logging
import os
from pyhocon import ConfigFactory
from databuilder.extractor.generic_extractor import GenericExtractor
from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader
from databuilder.publisher import neo4j_csv_publisher
from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher
from databuilder.task.task import DefaultTask
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.INFO)
# set env NEO4J_HOST to override localhost
NEO4J_ENDPOINT = 'bolt://{}:7687'.format(os.getenv('NEO4J_HOST', 'localhost'))
neo4j_endpoint = NEO4J_ENDPOINT
neo4j_user = 'neo4j'
neo4j_password = 'test'
# Input example
input = [
{'dashboard_name': 'Agent', 'dashboard_group': 'Product - Jobs.cz', 'name': 'Metric 1',
'expression': 'a/b*(2*x)', 'description': 'This is description of Metric 1',
'type': 'MasterMetric', 'tags': ['Dummy Metric TAG']},
{'dashboard_name': 'Agent', 'dashboard_group': 'Product - Jobs.cz', 'name': 'Metric 2',
'expression': 'b/a*(2*x)', 'description': 'M2 This is description of Metric 2',
'type': 'MasterMetric', 'tags': ['Dummy Metric TAG']},
{'dashboard_name': 'Atmoskop', 'dashboard_group': 'Product - Atmoskop', 'name': 'Metric 1',
'expression': 'a/b*(2*x)', 'description': 'This is description of Metric 1',
'type': 'MasterMetric', 'tags': ['Dummy Metric TAG1']},
{'dashboard_name': 'Atmoskop', 'dashboard_group': 'Product - Atmoskop', 'name': 'Metric 3',
'expression': 'r=b/a*(2*x)', 'description': 'M3 This is description of Metric 3',
'type': 'MasterMetric', 'tags': ['Non sense TAG']}
]
def create_metric_neo4j_job(**kwargs):
tmp_folder = '/var/tmp/amundsen/metric_metadata'
node_files_folder = '{tmp_folder}/nodes/'.format(tmp_folder=tmp_folder)
relationship_files_folder = '{tmp_folder}/relationships/'.format(tmp_folder=tmp_folder)
job_config = ConfigFactory.from_dict({
'extractor.generic.{}'.format(GenericExtractor.EXTRACTION_ITEMS):
iter(input),
'extractor.generic.{}'.format('model_class'):
'databuilder.models.metric_metadata.MetricMetadata',
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.NODE_DIR_PATH):
node_files_folder,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.RELATION_DIR_PATH):
relationship_files_folder,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.SHOULD_DELETE_CREATED_DIR): True,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.FORCE_CREATE_DIR): True,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NODE_FILES_DIR):
node_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.RELATION_FILES_DIR):
relationship_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_END_POINT_KEY):
neo4j_endpoint,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_USER):
neo4j_user,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_PASSWORD):
neo4j_password,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.JOB_PUBLISH_TAG):
'unique_tag', # should use unique tag here like {ds}
})
job = DefaultJob(conf=job_config,
task=DefaultTask(extractor=GenericExtractor(), loader=FsNeo4jCSVLoader()),
publisher=Neo4jCsvPublisher())
return job
if __name__ == "__main__":
job = create_metric_neo4j_job()
job.launch()
"""
This is a example script which demo how to load data into neo4j without using Airflow DAG.
"""
import os
import random
import textwrap
from elasticsearch import Elasticsearch
from pyhocon import ConfigFactory
from databuilder.extractor.neo4j_metric_search_data_extractor import \
Neo4jMetricSearchDataExtractor
from databuilder.extractor.neo4j_extractor import Neo4jExtractor
from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_elasticsearch_json_loader import \
FSElasticsearchJSONLoader
from databuilder.publisher.elasticsearch_publisher import \
ElasticsearchPublisher
from databuilder.task.task import DefaultTask
# set env ES_HOST to override localhost
es = Elasticsearch([
{'host': os.getenv('ES_HOST', 'localhost')},
])
# set env NEO4J_HOST to override localhost
NEO4J_ENDPOINT = 'bolt://{}:7687'.format(os.getenv('NEO4J_HOST', 'localhost'))
neo4j_endpoint = NEO4J_ENDPOINT
neo4j_user = 'neo4j'
neo4j_password = 'test'
METRIC_ES_MAP = textwrap.dedent(
"""
{
"mappings":{
"metric":{
"properties": {
"name": {
"type":"text",
"analyzer": "simple",
"fields": {
"raw": {
"type": "keyword"
}
}
},
"description": {
"type": "text",
"analyzer": "simple"
},
"type": {
"type":"text",
"analyzer": "simple",
"fields": {
"raw": {
"type": "keyword"
}
}
},
"tags": {
"type": "keyword"
},
"dashboards": {
"type":"text",
"analyzer": "simple",
"fields": {
"raw": {
"type": "keyword"
}
}
}
}
}
}
}
"""
)
# todo: Add a second model
def create_neo4j_es_job():
tmp_folder = '/var/tmp/amundsen/metric/metric_search_data.json'
task = DefaultTask(loader=FSElasticsearchJSONLoader(),
extractor=Neo4jMetricSearchDataExtractor())
elasticsearch_client = es
elasticsearch_new_index_key = 'metrics'
elasticsearch_new_index_key_type = 'metric'
elasticsearch_index_alias = 'metric_search_index'
rand = str(random.randint(0, 1000))
job_config = ConfigFactory.from_dict({
'extractor.dashboard_search_data.extractor.neo4j.{}'.format(Neo4jExtractor.GRAPH_URL_CONFIG_KEY):
neo4j_endpoint,
'extractor.dashboard_search_data.extractor.neo4j.{}'.format(Neo4jExtractor.MODEL_CLASS_CONFIG_KEY):
'databuilder.models.metric_elasticsearch_document.MetricESDocument',
'extractor.dashboard_search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_USER):
neo4j_user,
'extractor.dashboard_search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_PW):
neo4j_password,
'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY):
tmp_folder,
'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY):
'w',
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_PATH_CONFIG_KEY):
tmp_folder,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_MODE_CONFIG_KEY):
'r',
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_DOC_TYPE_CONFIG_KEY):
elasticsearch_new_index_key_type,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY):
elasticsearch_client,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY):
elasticsearch_new_index_key + str(rand),
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY):
elasticsearch_index_alias,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_MAPPING_CONFIG_KEY):
METRIC_ES_MAP
})
job = DefaultJob(conf=job_config,
task=task,
publisher=ElasticsearchPublisher())
return job
if __name__ == "__main__":
job = create_neo4j_es_job()
job.launch()
......@@ -3,6 +3,7 @@ This is a example script for extracting BigQuery usage results
"""
import logging
import os
from pyhocon import ConfigFactory
import sqlite3
......@@ -16,9 +17,8 @@ from databuilder.transformer.base_transformer import NoopTransformer
logging.basicConfig(level=logging.INFO)
# replace localhost with docker host ip
# todo: get the ip from input argument
NEO4J_ENDPOINT = 'bolt://localhost:7687'
# set env NEO4J_HOST to override localhost
NEO4J_ENDPOINT = 'bolt://{}:7687'.format(os.getenv('NEO4J_HOST', 'localhost'))
neo4j_endpoint = NEO4J_ENDPOINT
neo4j_user = 'neo4j'
......
......@@ -3,6 +3,7 @@ This is a example script for extracting BigQuery usage results
"""
import logging
import os
from pyhocon import ConfigFactory
import sqlite3
......@@ -16,9 +17,8 @@ from databuilder.transformer.bigquery_usage_transformer import BigqueryUsageTran
logging.basicConfig(level=logging.INFO)
# replace localhost with docker host ip
# todo: get the ip from input argument
NEO4J_ENDPOINT = 'bolt://localhost:7687'
# set env NEO4J_HOST to override localhost
NEO4J_ENDPOINT = 'bolt://{}:7687'.format(os.getenv('NEO4J_HOST', 'localhost'))
neo4j_endpoint = NEO4J_ENDPOINT
neo4j_user = 'neo4j'
......
......@@ -43,6 +43,7 @@ SQLITE_CONN_STRING = 'sqlite:////tmp/test.db'
Base = declarative_base()
NEO4J_ENDPOINT = 'bolt://{}:7687'.format(neo_host if neo_host else 'localhost')
neo4j_endpoint = NEO4J_ENDPOINT
neo4j_user = 'neo4j'
......
......@@ -3,6 +3,7 @@ This is a example script which demo how to load data into neo4j without using Ai
"""
import logging
import os
from pyhocon import ConfigFactory
from urllib import unquote_plus
......@@ -21,8 +22,8 @@ logging.getLogger("snowflake.connector.network").disabled = True
SNOWFLAKE_CONN_STRING = 'snowflake://username:%s@account' % unquote_plus('password')
# replace localhost with docker host ip
NEO4J_ENDPOINT = 'bolt://localhost:7687'
# set env NEO4J_HOST to override localhost
NEO4J_ENDPOINT = 'bolt://{}:7687'.format(os.getenv('NEO4J_HOST', 'localhost'))
neo4j_endpoint = NEO4J_ENDPOINT
neo4j_user = 'neo4j'
......
import unittest
from databuilder.extractor.neo4j_dashboard_search_data_extractor import Neo4jDashboardSearchDataExtractor
class TestNeo4jDashboardExtractor(unittest.TestCase):
def test_adding_filter(self):
# type: (Any) -> None
extractor = Neo4jDashboardSearchDataExtractor()
actual = extractor._add_publish_tag_filter(
'foo', 'MATCH (dashboard:Dashboard) {publish_tag_filter} RETURN dashboard')
self.assertEqual(
actual, """MATCH (dashboard:Dashboard) WHERE dashboard.published_tag = 'foo' RETURN dashboard""")
def test_not_adding_filter(self):
# type: (Any) -> None
extractor = Neo4jDashboardSearchDataExtractor()
actual = extractor._add_publish_tag_filter(
'', 'MATCH (dashboard:Dashboard) {publish_tag_filter} RETURN dashboard')
self.assertEqual(actual, """MATCH (dashboard:Dashboard) RETURN dashboard""")
if __name__ == '__main__':
unittest.main()
import unittest
from databuilder.extractor.neo4j_metric_search_data_extractor import Neo4jMetricSearchDataExtractor
class TestNeo4jMetricExtractor(unittest.TestCase):
def test_adding_filter(self):
# type: (Any) -> None
extractor = Neo4jMetricSearchDataExtractor()
actual = extractor._add_publish_tag_filter('foo', 'MATCH (metric:Metric) {publish_tag_filter} RETURN metric')
self.assertEqual(actual, """MATCH (metric:Metric) WHERE metric.published_tag = 'foo' RETURN metric""")
def test_not_adding_filter(self):
# type: (Any) -> None
extractor = Neo4jMetricSearchDataExtractor()
actual = extractor._add_publish_tag_filter('', 'MATCH (metric:Metric) {publish_tag_filter} RETURN metric')
self.assertEqual(actual, """MATCH (metric:Metric) RETURN metric""")
if __name__ == '__main__':
unittest.main()
import json
import unittest
from databuilder.models.dashboard_elasticsearch_document import DashboardESDocument
class TestDashboardElasticsearchDocument(unittest.TestCase):
def test_to_json(self):
# type: () -> None
"""
Test string generated from to_json method
"""
test_obj = DashboardESDocument(dashboard_group='test_dashboard_group',
dashboard_name='test_dashboard_name',
description='test_description',
last_reload_time='test_last_reload_time',
user_id='test_user_id',
user_name='test_user_name',
tags=['test'])
expected_document_dict = {"dashboard_group": "test_dashboard_group",
"dashboard_name": "test_dashboard_name",
"description": "test_description",
"last_reload_time": "test_last_reload_time",
"user_id": "test_user_id",
"user_name": "test_user_name",
"tags": ["test"]
}
result = test_obj.to_json()
results = result.split("\n")
# verify two new line characters in result
self.assertEqual(len(results), 2, "Result from to_json() function doesn't have a newline!")
self.assertDictEqual(json.loads(results[0]), expected_document_dict)
import copy
import unittest
from databuilder.models.dashboard_metadata import DashboardMetadata
class TestDashboardMetadata(unittest.TestCase):
def setUp(self):
# type: () -> None
# Full exammple
self.dashboard_metadata = DashboardMetadata('Product - Jobs.cz',
'Agent',
'Agent dashboard description',
'2019-05-30T07:03:35.580Z',
'roald.amundsen@example.org',
['test_tag', 'tag2']
)
# Without tags
self.dashboard_metadata2 = DashboardMetadata('Product - Atmoskop',
'Atmoskop',
'Atmoskop dashboard description',
'2019-05-30T07:07:42.326Z',
'buzz@example.org',
[]
)
# One common tag with dashboard_metadata, no description
self.dashboard_metadata3 = DashboardMetadata('Product - Jobs.cz',
'Dohazovac',
'',
'2019-05-30T07:07:42.326Z',
'buzz@example.org',
['test_tag', 'tag3']
)
# Necessary minimum -- NOT USED
self.dashboard_metadata4 = DashboardMetadata('',
'PzR',
'',
'2019-05-30T07:07:42.326Z',
'',
[]
)
self.expected_nodes_deduped = [
{'name': 'Agent', 'KEY': 'Product - Jobs.cz://Agent', 'LABEL': 'Dashboard'},
{'name': 'Product - Jobs.cz', 'KEY': 'dashboardgroup://Product - Jobs.cz', 'LABEL': 'Dashboardgroup'},
{'description': 'Agent dashboard description', 'KEY': 'Product - Jobs.cz://Agent/_description',
'LABEL': 'Description'},
{'value': '2019-05-30T07:03:35.580Z',
'KEY': 'Product - Jobs.cz://Agent/_lastreloadtime', 'LABEL': 'Lastreloadtime'},
{'tag_type': 'dashboard', 'KEY': 'test_tag', 'LABEL': 'Tag'},
{'tag_type': 'dashboard', 'KEY': 'tag2', 'LABEL': 'Tag'}
]
self.expected_nodes = copy.deepcopy(self.expected_nodes_deduped)
self.expected_rels_deduped = [
{'END_KEY': 'dashboardgroup://Product - Jobs.cz', 'START_LABEL': 'Dashboard',
'END_LABEL': 'Dashboardgroup',
'START_KEY': 'Product - Jobs.cz://Agent', 'TYPE': 'DASHBOARD_OF', 'REVERSE_TYPE': 'DASHBOARD'},
{'END_KEY': 'Product - Jobs.cz://Agent/_description', 'START_LABEL': 'Dashboard',
'END_LABEL': 'Description',
'START_KEY': 'Product - Jobs.cz://Agent', 'TYPE': 'DESCRIPTION', 'REVERSE_TYPE': 'DESCRIPTION_OF'},
{'END_KEY': 'Product - Jobs.cz://Agent/_lastreloadtime', 'START_LABEL': 'Dashboard',
'END_LABEL': 'Lastreloadtime', 'START_KEY': 'Product - Jobs.cz://Agent',
'TYPE': 'LAST_RELOAD_TIME', 'REVERSE_TYPE': 'LAST_RELOAD_TIME_OF'},
{'END_KEY': 'test_tag', 'START_LABEL': 'Dashboard', 'END_LABEL': 'Tag',
'START_KEY': 'Product - Jobs.cz://Agent', 'TYPE': 'TAG', 'REVERSE_TYPE': 'TAG_OF'},
{'END_KEY': 'tag2', 'START_LABEL': 'Dashboard', 'END_LABEL': 'Tag',
'START_KEY': 'Product - Jobs.cz://Agent', 'TYPE': 'TAG', 'REVERSE_TYPE': 'TAG_OF'},
{'END_KEY': 'roald.amundsen@example.org', 'START_LABEL': 'Dashboard', 'END_LABEL': 'User',
'START_KEY': 'Product - Jobs.cz://Agent', 'TYPE': 'OWNER', 'REVERSE_TYPE': 'OWNER_OF'}
]
self.expected_rels = copy.deepcopy(self.expected_rels_deduped)
self.expected_nodes_deduped2 = [
{'name': 'Atmoskop', 'KEY': 'Product - Atmoskop://Atmoskop', 'LABEL': 'Dashboard'},
{'name': 'Product - Atmoskop', 'KEY': 'dashboardgroup://Product - Atmoskop', 'LABEL': 'Dashboardgroup'},
{'description': 'Atmoskop dashboard description', 'KEY': 'Product - Atmoskop://Atmoskop/_description',
'LABEL': 'Description'},
{'value': '2019-05-30T07:07:42.326Z', 'KEY': 'Product - Atmoskop://Atmoskop/_lastreloadtime',
'LABEL': 'Lastreloadtime'}
]
self.expected_nodes2 = copy.deepcopy(self.expected_nodes_deduped2)
self.expected_rels_deduped2 = [
{'END_KEY': 'dashboardgroup://Product - Atmoskop', 'START_LABEL': 'Dashboard',
'END_LABEL': 'Dashboardgroup',
'START_KEY': 'Product - Atmoskop://Atmoskop', 'TYPE': 'DASHBOARD_OF', 'REVERSE_TYPE': 'DASHBOARD'},
{'END_KEY': 'Product - Atmoskop://Atmoskop/_description', 'START_LABEL': 'Dashboard',
'END_LABEL': 'Description',
'START_KEY': 'Product - Atmoskop://Atmoskop', 'TYPE': 'DESCRIPTION', 'REVERSE_TYPE': 'DESCRIPTION_OF'},
{'END_KEY': 'Product - Atmoskop://Atmoskop/_lastreloadtime', 'START_LABEL': 'Dashboard',
'END_LABEL': 'Lastreloadtime', 'START_KEY': 'Product - Atmoskop://Atmoskop',
'TYPE': 'LAST_RELOAD_TIME', 'REVERSE_TYPE': 'LAST_RELOAD_TIME_OF'},
{'END_KEY': 'buzz@example.org', 'START_LABEL': 'Dashboard', 'END_LABEL': 'User',
'START_KEY': 'Product - Atmoskop://Atmoskop', 'TYPE': 'OWNER', 'REVERSE_TYPE': 'OWNER_OF'}
]
self.expected_rels2 = copy.deepcopy(self.expected_rels_deduped2)
self.expected_nodes_deduped3 = [
{'name': 'Dohazovac', 'KEY': 'Product - Jobs.cz://Dohazovac', 'LABEL': 'Dashboard'},
{'name': 'Product - Jobs.cz', 'KEY': 'dashboardgroup://Product - Jobs.cz', 'LABEL': 'Dashboardgroup'},
{'value': '2019-05-30T07:07:42.326Z',
'KEY': 'Product - Jobs.cz://Dohazovac/_lastreloadtime', 'LABEL': 'Lastreloadtime'},
{'tag_type': 'dashboard', 'KEY': 'test_tag', 'LABEL': 'Tag'},
{'tag_type': 'dashboard', 'KEY': 'tag3', 'LABEL': 'Tag'}
]
self.expected_nodes3 = copy.deepcopy(self.expected_nodes_deduped3)
self.expected_rels_deduped3 = [
{'END_KEY': 'dashboardgroup://Product - Jobs.cz', 'START_LABEL': 'Dashboard',
'END_LABEL': 'Dashboardgroup',
'START_KEY': 'Product - Jobs.cz://Dohazovac', 'TYPE': 'DASHBOARD_OF', 'REVERSE_TYPE': 'DASHBOARD'},
{'END_KEY': 'Product - Jobs.cz://Dohazovac/_lastreloadtime', 'START_LABEL': 'Dashboard',
'END_LABEL': 'Lastreloadtime', 'START_KEY': 'Product - Jobs.cz://Dohazovac',
'TYPE': 'LAST_RELOAD_TIME', 'REVERSE_TYPE': 'LAST_RELOAD_TIME_OF'},
{'END_KEY': 'test_tag', 'START_LABEL': 'Dashboard', 'END_LABEL': 'Tag',
'START_KEY': 'Product - Jobs.cz://Dohazovac', 'TYPE': 'TAG', 'REVERSE_TYPE': 'TAG_OF'},
{'END_KEY': 'tag3', 'START_LABEL': 'Dashboard', 'END_LABEL': 'Tag',
'START_KEY': 'Product - Jobs.cz://Dohazovac', 'TYPE': 'TAG', 'REVERSE_TYPE': 'TAG_OF'},
{'END_KEY': 'buzz@example.org', 'START_LABEL': 'Dashboard', 'END_LABEL': 'User',
'START_KEY': 'Product - Jobs.cz://Dohazovac', 'TYPE': 'OWNER', 'REVERSE_TYPE': 'OWNER_OF'},
]
self.expected_rels3 = copy.deepcopy(self.expected_rels_deduped3)
def test_serialize(self):
# type: () -> None
# First test
node_row = self.dashboard_metadata.next_node()
actual = []
while node_row:
actual.append(node_row)
node_row = self.dashboard_metadata.next_node()
self.assertEqual(self.expected_nodes, actual)
relation_row = self.dashboard_metadata.next_relation()
actual = []
while relation_row:
actual.append(relation_row)
relation_row = self.dashboard_metadata.next_relation()
self.assertEqual(self.expected_rels, actual)
# Second test
node_row = self.dashboard_metadata2.next_node()
actual = []
while node_row:
actual.append(node_row)
node_row = self.dashboard_metadata2.next_node()
self.assertEqual(self.expected_nodes_deduped2, actual)
relation_row = self.dashboard_metadata2.next_relation()
actual = []
while relation_row:
actual.append(relation_row)
relation_row = self.dashboard_metadata2.next_relation()
self.assertEqual(self.expected_rels_deduped2, actual)
# Third test
node_row = self.dashboard_metadata3.next_node()
actual = []
while node_row:
actual.append(node_row)
node_row = self.dashboard_metadata3.next_node()
self.assertEqual(self.expected_nodes_deduped3, actual)
relation_row = self.dashboard_metadata3.next_relation()
actual = []
while relation_row:
actual.append(relation_row)
relation_row = self.dashboard_metadata3.next_relation()
self.assertEqual(self.expected_rels_deduped3, actual)
if __name__ == '__main__':
unittest.main()
import json
import unittest
from databuilder.models.metric_elasticsearch_document import MetricESDocument
class TestMetricElasticsearchDocument(unittest.TestCase):
def test_to_json(self):
# type: () -> None
"""
Test string generated from to_json method
"""
test_obj = MetricESDocument(name='test_metric_name',
description='test_metric_description',
type='test_metric_type',
dashboards=['test_dashboard_1', 'test_dashboard_2'],
tags=['test_metric_group'])
expected_document_dict = {"name": "test_metric_name",
"description": "test_metric_description",
"type": "test_metric_type",
"dashboards": ['test_dashboard_1', 'test_dashboard_2'],
"tags": ['test_metric_group']
}
result = test_obj.to_json()
results = result.split("\n")
# verify two new line characters in result
self.assertEqual(len(results), 2, "Result from to_json() function doesn't have a newline!")
self.assertDictEqual(json.loads(results[0]), expected_document_dict)
import copy
import unittest
from databuilder.models.metric_metadata import MetricMetadata
class TestMetricMetadata(unittest.TestCase):
def setUp(self):
# type: () -> None
self.metric_metadata = MetricMetadata('Product - Jobs.cz',
'Agent',
'Metric 1',
'a/b*(2*x)',
'This is description of Metric 1',
'MasterMetric',
['Dummy Metric TAG', 'TAG2'])
self.metric_metadata2 = MetricMetadata('Product - Jobs.cz',
'Agent',
'Metric 2',
'b/a*(2*x)',
'M2 This is description of Metric 2',
'MasterMetric',
['Dummy Metric TAG'])
self.metric_metadata3 = MetricMetadata('Product - Atmoskop',
'Atmoskop',
'Metric 3',
'x*x*x',
'',
'',
[])
self.expected_nodes_deduped = [
{'name': 'Metric 1', 'KEY': 'metric://Metric 1', 'LABEL': 'Metric', 'expression': 'a/b*(2*x)'},
{'description': 'This is description of Metric 1', 'KEY': 'metric://Metric 1/_description',
'LABEL': 'Description'},
{'tag_type': 'metric', 'KEY': 'Dummy Metric TAG', 'LABEL': 'Tag'},
{'tag_type': 'metric', 'KEY': 'TAG2', 'LABEL': 'Tag'},
{'name': 'MasterMetric', 'KEY': 'type://MasterMetric', 'LABEL': 'Metrictype'}
]
self.expected_nodes = copy.deepcopy(self.expected_nodes_deduped)
self.expected_rels_deduped = [
{'END_KEY': 'Product - Jobs.cz://Agent', 'START_LABEL': 'Metric',
'END_LABEL': 'Dashboard',
'START_KEY': 'metric://Metric 1', 'TYPE': 'METRIC_OF', 'REVERSE_TYPE': 'METRIC'},
{'END_KEY': 'metric://Metric 1/_description', 'START_LABEL': 'Metric',
'END_LABEL': 'Description',
'START_KEY': 'metric://Metric 1', 'TYPE': 'DESCRIPTION', 'REVERSE_TYPE': 'DESCRIPTION_OF'},
{'END_KEY': 'Dummy Metric TAG', 'START_LABEL': 'Metric', 'END_LABEL': 'Tag',
'START_KEY': 'metric://Metric 1', 'TYPE': 'TAG', 'REVERSE_TYPE': 'TAG_OF'},
{'END_KEY': 'TAG2', 'START_LABEL': 'Metric', 'END_LABEL': 'Tag',
'START_KEY': 'metric://Metric 1', 'TYPE': 'TAG', 'REVERSE_TYPE': 'TAG_OF'},
{'END_KEY': 'type://MasterMetric', 'START_LABEL': 'Metric',
'END_LABEL': 'Metrictype', 'START_KEY': 'metric://Metric 1',
'TYPE': 'METRIC_TYPE', 'REVERSE_TYPE': 'METRIC_TYPE_OF'}
]
self.expected_rels = copy.deepcopy(self.expected_rels_deduped)
self.expected_nodes_deduped2 = [
{'name': 'Metric 2', 'KEY': 'metric://Metric 2', 'LABEL': 'Metric', 'expression': 'b/a*(2*x)'},
{'description': 'M2 This is description of Metric 2', 'KEY': 'metric://Metric 2/_description',
'LABEL': 'Description'},
{'tag_type': 'metric', 'KEY': 'Dummy Metric TAG', 'LABEL': 'Tag'},
{'name': 'MasterMetric', 'KEY': 'type://MasterMetric', 'LABEL': 'Metrictype'}
]
self.expected_nodes2 = copy.deepcopy(self.expected_nodes_deduped2)
self.expected_rels_deduped2 = [
{'END_KEY': 'Product - Jobs.cz://Agent', 'START_LABEL': 'Metric',
'END_LABEL': 'Dashboard',
'START_KEY': 'metric://Metric 2', 'TYPE': 'METRIC_OF', 'REVERSE_TYPE': 'METRIC'},
{'END_KEY': 'metric://Metric 2/_description', 'START_LABEL': 'Metric',
'END_LABEL': 'Description',
'START_KEY': 'metric://Metric 2', 'TYPE': 'DESCRIPTION', 'REVERSE_TYPE': 'DESCRIPTION_OF'},
{'END_KEY': 'Dummy Metric TAG', 'START_LABEL': 'Metric', 'END_LABEL': 'Tag',
'START_KEY': 'metric://Metric 2', 'TYPE': 'TAG', 'REVERSE_TYPE': 'TAG_OF'},
{'START_LABEL': 'Metric', 'END_KEY': 'type://MasterMetric',
'END_LABEL': 'Metrictype', 'START_KEY': 'metric://Metric 2',
'TYPE': 'METRIC_TYPE', 'REVERSE_TYPE': 'METRIC_TYPE_OF'}
]
self.expected_rels2 = copy.deepcopy(self.expected_rels_deduped2)
self.expected_nodes_deduped3 = [
{'name': 'Metric 3', 'KEY': 'metric://Metric 3', 'LABEL': 'Metric', 'expression': 'x*x*x'}
]
self.expected_nodes3 = copy.deepcopy(self.expected_nodes_deduped3)
self.expected_rels_deduped3 = [
{'END_KEY': 'Product - Atmoskop://Atmoskop', 'START_LABEL': 'Metric',
'END_LABEL': 'Dashboard',
'START_KEY': 'metric://Metric 3', 'TYPE': 'METRIC_OF', 'REVERSE_TYPE': 'METRIC'}
]
self.expected_rels3 = copy.deepcopy(self.expected_rels_deduped3)
def test_serialize(self):
# type: () -> None
# First test
node_row = self.metric_metadata.next_node()
actual = []
while node_row:
actual.append(node_row)
node_row = self.metric_metadata.next_node()
self.assertEqual(self.expected_nodes, actual)
relation_row = self.metric_metadata.next_relation()
actual = []
while relation_row:
actual.append(relation_row)
relation_row = self.metric_metadata.next_relation()
self.assertEqual(self.expected_rels, actual)
# Second test
node_row = self.metric_metadata2.next_node()
actual = []
while node_row:
actual.append(node_row)
node_row = self.metric_metadata2.next_node()
self.assertEqual(self.expected_nodes_deduped2, actual)
relation_row = self.metric_metadata2.next_relation()
actual = []
while relation_row:
actual.append(relation_row)
relation_row = self.metric_metadata2.next_relation()
self.assertEqual(self.expected_rels_deduped2, actual)
# Third test
node_row = self.metric_metadata3.next_node()
actual = []
while node_row:
actual.append(node_row)
node_row = self.metric_metadata3.next_node()
self.assertEqual(self.expected_nodes_deduped3, actual)
relation_row = self.metric_metadata3.next_relation()
actual = []
while relation_row:
actual.append(relation_row)
relation_row = self.metric_metadata3.next_relation()
self.assertEqual(self.expected_rels_deduped3, actual)
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment