Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
A
AmendsenProject
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Shaik Janipasha
AmendsenProject
Commits
cf7a28ce
Unverified
Commit
cf7a28ce
authored
Mar 11, 2020
by
Tao Feng
Committed by
GitHub
Mar 11, 2020
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Remove neo4j dashboard/metric extractor (#219)
parent
aedd87fb
Changes
8
Hide whitespace changes
Inline
Side-by-side
Showing
8 changed files
with
0 additions
and
640 deletions
+0
-640
neo4j_dashboard_search_data_extractor.py
...uilder/extractor/neo4j_dashboard_search_data_extractor.py
+0
-88
neo4j_metric_search_data_extractor.py
databuilder/extractor/neo4j_metric_search_data_extractor.py
+0
-86
dashboard_data_loader.py
example/scripts/dashboard_data_loader.py
+0
-77
dashboard_es_publisher.py
example/scripts/dashboard_es_publisher.py
+0
-126
metric_data_loader.py
example/scripts/metric_data_loader.py
+0
-82
metric_es_publisher.py
example/scripts/metric_es_publisher.py
+0
-132
test_neo4j_dashboard_search_data_extractor.py
...t/extractor/test_neo4j_dashboard_search_data_extractor.py
+0
-26
test_neo4j_metric_search_data_extractor.py
...unit/extractor/test_neo4j_metric_search_data_extractor.py
+0
-23
No files found.
databuilder/extractor/neo4j_dashboard_search_data_extractor.py
deleted
100644 → 0
View file @
aedd87fb
import
textwrap
from
typing
import
Any
# noqa: F401
from
pyhocon
import
ConfigTree
# noqa: F401
from
databuilder
import
Scoped
from
databuilder.extractor.base_extractor
import
Extractor
from
databuilder.extractor.neo4j_extractor
import
Neo4jExtractor
from
databuilder.publisher.neo4j_csv_publisher
import
JOB_PUBLISH_TAG
class
Neo4jDashboardSearchDataExtractor
(
Extractor
):
"""
Extractor to fetch data required to support search from Neo4j graph database
Use Neo4jExtractor extractor class
"""
CYPHER_QUERY_CONFIG_KEY
=
'cypher_query'
DEFAULT_NEO4J_CYPHER_QUERY
=
textwrap
.
dedent
(
"""
MATCH (dgroup:Dashboardgroup)<-[:DASHBOARD_OF]-(d:Dashboard)
{publish_tag_filter}
OPTIONAL MATCH (d)-[:DESCRIPTION]->(ddesc:Description)
OPTIONAL MATCH (d)-[:OWNER]->(owner:User)
OPTIONAL MATCH (d)-[:TAG]->(tags:Tag)
OPTIONAL MATCH (d)-[:LAST_RELOAD_TIME]->(lrt:Lastreloadtime)
RETURN dgroup.name as dashboard_group, d.name AS dashboard_name,
ddesc.description AS description, owner.full_name AS user_name,
owner.key AS user_id, left(lrt.value,16) as last_reload_time,
COLLECT(DISTINCT lower(tags.key)) as tags
ORDER BY d.name;
"""
)
def
init
(
self
,
conf
):
# type: (ConfigTree) -> None
"""
Initialize Neo4jExtractor object from configuration and use that for extraction
"""
self
.
conf
=
conf
# extract cypher query from conf, if specified, else use default query
if
Neo4jDashboardSearchDataExtractor
.
CYPHER_QUERY_CONFIG_KEY
in
conf
:
self
.
cypher_query
=
conf
.
get_string
(
Neo4jDashboardSearchDataExtractor
.
CYPHER_QUERY_CONFIG_KEY
)
else
:
self
.
cypher_query
=
self
.
_add_publish_tag_filter
(
conf
.
get_string
(
JOB_PUBLISH_TAG
,
''
),
Neo4jDashboardSearchDataExtractor
.
DEFAULT_NEO4J_CYPHER_QUERY
)
self
.
neo4j_extractor
=
Neo4jExtractor
()
# write the cypher query in configs in Neo4jExtractor scope
key
=
self
.
neo4j_extractor
.
get_scope
()
+
'.'
+
Neo4jExtractor
.
CYPHER_QUERY_CONFIG_KEY
self
.
conf
.
put
(
key
,
self
.
cypher_query
)
# initialize neo4j_extractor from configs
self
.
neo4j_extractor
.
init
(
Scoped
.
get_scoped_conf
(
self
.
conf
,
self
.
neo4j_extractor
.
get_scope
()))
def
close
(
self
):
# type: () -> None
"""
Use close() method specified by neo4j_extractor
to close connection to neo4j cluster
"""
self
.
neo4j_extractor
.
close
()
def
extract
(
self
):
# type: () -> Any
"""
Invoke extract() method defined by neo4j_extractor
"""
return
self
.
neo4j_extractor
.
extract
()
def
get_scope
(
self
):
# type: () -> str
return
'extractor.dashboard_search_data'
def
_add_publish_tag_filter
(
self
,
publish_tag
,
cypher_query
):
"""
Adds publish tag filter into Cypher query
:param publish_tag: value of publish tag.
:param cypher_query:
:return:
"""
if
not
publish_tag
:
publish_tag_filter
=
''
else
:
publish_tag_filter
=
"""WHERE dashboard.published_tag = '{}'"""
.
format
(
publish_tag
)
return
cypher_query
.
format
(
publish_tag_filter
=
publish_tag_filter
)
databuilder/extractor/neo4j_metric_search_data_extractor.py
deleted
100644 → 0
View file @
aedd87fb
import
textwrap
from
typing
import
Any
# noqa: F401
from
pyhocon
import
ConfigTree
# noqa: F401
from
databuilder
import
Scoped
from
databuilder.extractor.base_extractor
import
Extractor
from
databuilder.extractor.neo4j_extractor
import
Neo4jExtractor
from
databuilder.publisher.neo4j_csv_publisher
import
JOB_PUBLISH_TAG
class
Neo4jMetricSearchDataExtractor
(
Extractor
):
"""
Extractor to fetch data required to support search from Neo4j graph database
Use Neo4jExtractor extractor class
"""
CYPHER_QUERY_CONFIG_KEY
=
'cypher_query'
DEFAULT_NEO4J_CYPHER_QUERY
=
textwrap
.
dedent
(
"""
MATCH (m)-[:METRIC_OF]->(d:Dashboard)
{publish_tag_filter}
OPTIONAL MATCH (m)-[:DESCRIPTION]->(mdesc:Description)
OPTIONAL MATCH (m)-[:METRIC_TYPE]->(mtype:Metrictype)
OPTIONAL MATCH (d)-[:DASHBOARD_OF]->(dg:Dashboardgroup)
OPTIONAL MATCH (m)-[:TAG]->(tags:Tag)
RETURN m.name as name, mtype.name as type,
mdesc.description as description,
COLLECT(DISTINCT lower(tags.key)) as tags,
COLLECT(DISTINCT dg.name+"://" +d.name) as dashboards
"""
)
def
init
(
self
,
conf
):
# type: (ConfigTree) -> None
"""
Initialize Neo4jExtractor object from configuration and use that for extraction
"""
self
.
conf
=
conf
# extract cypher query from conf, if specified, else use default query
if
Neo4jMetricSearchDataExtractor
.
CYPHER_QUERY_CONFIG_KEY
in
conf
:
self
.
cypher_query
=
conf
.
get_string
(
Neo4jMetricSearchDataExtractor
.
CYPHER_QUERY_CONFIG_KEY
)
else
:
self
.
cypher_query
=
self
.
_add_publish_tag_filter
(
conf
.
get_string
(
JOB_PUBLISH_TAG
,
''
),
Neo4jMetricSearchDataExtractor
.
DEFAULT_NEO4J_CYPHER_QUERY
)
self
.
neo4j_extractor
=
Neo4jExtractor
()
# write the cypher query in configs in Neo4jExtractor scope
key
=
self
.
neo4j_extractor
.
get_scope
()
+
'.'
+
Neo4jExtractor
.
CYPHER_QUERY_CONFIG_KEY
self
.
conf
.
put
(
key
,
self
.
cypher_query
)
# initialize neo4j_extractor from configs
self
.
neo4j_extractor
.
init
(
Scoped
.
get_scoped_conf
(
self
.
conf
,
self
.
neo4j_extractor
.
get_scope
()))
def
close
(
self
):
# type: () -> None
"""
Use close() method specified by neo4j_extractor
to close connection to neo4j cluster
"""
self
.
neo4j_extractor
.
close
()
def
extract
(
self
):
# type: () -> Any
"""
Invoke extract() method defined by neo4j_extractor
"""
return
self
.
neo4j_extractor
.
extract
()
def
get_scope
(
self
):
# type: () -> str
return
'extractor.dashboard_search_data'
def
_add_publish_tag_filter
(
self
,
publish_tag
,
cypher_query
):
"""
Adds publish tag filter into Cypher query
:param publish_tag: value of publish tag.
:param cypher_query:
:return:
"""
if
not
publish_tag
:
publish_tag_filter
=
''
else
:
publish_tag_filter
=
"""WHERE metric.published_tag = '{}'"""
.
format
(
publish_tag
)
return
cypher_query
.
format
(
publish_tag_filter
=
publish_tag_filter
)
example/scripts/dashboard_data_loader.py
deleted
100644 → 0
View file @
aedd87fb
"""
This is a example script which demo how to load data into neo4j without using Airflow DAG.
"""
import
logging
import
os
from
pyhocon
import
ConfigFactory
from
databuilder.extractor.generic_extractor
import
GenericExtractor
from
databuilder.job.job
import
DefaultJob
from
databuilder.loader.file_system_neo4j_csv_loader
import
FsNeo4jCSVLoader
from
databuilder.publisher
import
neo4j_csv_publisher
from
databuilder.publisher.neo4j_csv_publisher
import
Neo4jCsvPublisher
from
databuilder.task.task
import
DefaultTask
LOGGER
=
logging
.
getLogger
(
__name__
)
LOGGER
.
setLevel
(
logging
.
INFO
)
# set env NEO4J_HOST to override localhost
NEO4J_ENDPOINT
=
'bolt://{}:7687'
.
format
(
os
.
getenv
(
'NEO4J_HOST'
,
'localhost'
))
neo4j_endpoint
=
NEO4J_ENDPOINT
neo4j_user
=
'neo4j'
neo4j_password
=
'test'
# Input example
input
=
[
{
'dashboard_name'
:
'Agent'
,
'dashboard_group'
:
'Product - Jobs.cz'
,
'description'
:
'description of Dash'
,
'last_reload_time'
:
'2019-05-30T07:03:35.580Z'
,
'user_id'
:
'roald.amundsen@example.org'
,
'tags'
:
[
'test_tag'
,
'tag2'
]},
{
'dashboard_name'
:
'Atmoskop'
,
'dashboard_group'
:
'Product - Atmoskop'
,
'description'
:
'description of Dash2'
,
'last_reload_time'
:
'2019-05-30T07:07:42.326Z'
,
'user_id'
:
'buzz@example.org'
,
'tags'
:
[]},
{
'dashboard_name'
:
'Dohazovac'
,
'dashboard_group'
:
'Product - Jobs.cz'
,
'description'
:
''
,
'last_reload_time'
:
'2019-05-30T07:07:42.326Z'
,
'user_id'
:
'buzz@example.org'
,
'tags'
:
[
'test_tag'
,
'tag3'
]},
{
'dashboard_name'
:
'PzR'
,
'dashboard_group'
:
''
,
'description'
:
''
,
'last_reload_time'
:
'2019-05-30T07:07:42.326Z'
,
'user_id'
:
''
,
'tags'
:
[]}
]
def
create_dashboard_neo4j_job
(
**
kwargs
):
tmp_folder
=
'/var/tmp/amundsen/table_metadata'
node_files_folder
=
'{tmp_folder}/nodes/'
.
format
(
tmp_folder
=
tmp_folder
)
relationship_files_folder
=
'{tmp_folder}/relationships/'
.
format
(
tmp_folder
=
tmp_folder
)
job_config
=
ConfigFactory
.
from_dict
({
'extractor.generic.{}'
.
format
(
GenericExtractor
.
EXTRACTION_ITEMS
):
iter
(
input
),
'extractor.generic.{}'
.
format
(
'model_class'
):
'databuilder.models.dashboard_metadata.DashboardMetadata'
,
'loader.filesystem_csv_neo4j.{}'
.
format
(
FsNeo4jCSVLoader
.
NODE_DIR_PATH
):
node_files_folder
,
'loader.filesystem_csv_neo4j.{}'
.
format
(
FsNeo4jCSVLoader
.
RELATION_DIR_PATH
):
relationship_files_folder
,
'publisher.neo4j.{}'
.
format
(
neo4j_csv_publisher
.
NODE_FILES_DIR
):
node_files_folder
,
'publisher.neo4j.{}'
.
format
(
neo4j_csv_publisher
.
RELATION_FILES_DIR
):
relationship_files_folder
,
'publisher.neo4j.{}'
.
format
(
neo4j_csv_publisher
.
NEO4J_END_POINT_KEY
):
neo4j_endpoint
,
'publisher.neo4j.{}'
.
format
(
neo4j_csv_publisher
.
NEO4J_USER
):
neo4j_user
,
'publisher.neo4j.{}'
.
format
(
neo4j_csv_publisher
.
NEO4J_PASSWORD
):
neo4j_password
,
'publisher.neo4j.{}'
.
format
(
neo4j_csv_publisher
.
JOB_PUBLISH_TAG
):
'unique_tag'
,
# should use unique tag here like {ds}
})
job
=
DefaultJob
(
conf
=
job_config
,
task
=
DefaultTask
(
extractor
=
GenericExtractor
(),
loader
=
FsNeo4jCSVLoader
()),
publisher
=
Neo4jCsvPublisher
())
return
job
if
__name__
==
"__main__"
:
job
=
create_dashboard_neo4j_job
()
job
.
launch
()
example/scripts/dashboard_es_publisher.py
deleted
100644 → 0
View file @
aedd87fb
"""
This is a example script which demo how to load data into neo4j without using Airflow DAG.
"""
from
pyhocon
import
ConfigFactory
import
os
import
random
import
textwrap
from
databuilder.extractor.neo4j_dashboard_search_data_extractor
import
Neo4jDashboardSearchDataExtractor
from
databuilder.job.job
import
DefaultJob
from
databuilder.loader.file_system_elasticsearch_json_loader
import
FSElasticsearchJSONLoader
from
databuilder.extractor.neo4j_extractor
import
Neo4jExtractor
from
databuilder.publisher.elasticsearch_publisher
import
ElasticsearchPublisher
from
databuilder.task.task
import
DefaultTask
from
elasticsearch
import
Elasticsearch
# set env ES_HOST to override localhost
es
=
Elasticsearch
([
{
'host'
:
os
.
getenv
(
'ES_HOST'
,
'localhost'
)},
])
# set env NEO4J_HOST to override localhost
NEO4J_ENDPOINT
=
'bolt://{}:7687'
.
format
(
os
.
getenv
(
'NEO4J_HOST'
,
'localhost'
))
neo4j_endpoint
=
NEO4J_ENDPOINT
neo4j_user
=
'neo4j'
neo4j_password
=
'test'
DASHBOARD_ES_MAP
=
textwrap
.
dedent
(
"""
{
"mappings":{
"dashboard":{
"properties": {
"dashboard_group": {
"type":"text",
"analyzer": "simple",
"fields": {
"raw": {
"type": "keyword"
}
}
},
"dashboard_name": {
"type":"text",
"analyzer": "simple",
"fields": {
"raw": {
"type": "keyword"
}
}
},
"description": {
"type": "text",
"analyzer": "simple"
},
"last_reload_time": {
"type": "date",
"format": "YYYY-MM-DD'T'HH:mm"
},
"user_id": {
"type": "text",
"analyzer": "simple"
},
"user_name": {
"type": "text",
"analyzer": "simple"
},
"tags": {
"type": "keyword"
}
}
}
}
}
"""
)
# todo: Add a second model
def
create_neo4j_es_job
():
tmp_folder
=
'/var/tmp/amundsen/dashboard/dashboards_search_data.json'
task
=
DefaultTask
(
loader
=
FSElasticsearchJSONLoader
(),
extractor
=
Neo4jDashboardSearchDataExtractor
())
elasticsearch_client
=
es
elasticsearch_new_index_key
=
'dashboards'
elasticsearch_new_index_key_type
=
'dashboard'
elasticsearch_index_alias
=
'dashboard_search_index'
rand
=
str
(
random
.
randint
(
0
,
1000
))
job_config
=
ConfigFactory
.
from_dict
({
'extractor.dashboard_search_data.extractor.neo4j.{}'
.
format
(
Neo4jExtractor
.
GRAPH_URL_CONFIG_KEY
):
neo4j_endpoint
,
'extractor.dashboard_search_data.extractor.neo4j.{}'
.
format
(
Neo4jExtractor
.
MODEL_CLASS_CONFIG_KEY
):
'databuilder.models.dashboard_elasticsearch_document.DashboardESDocument'
,
'extractor.dashboard_search_data.extractor.neo4j.{}'
.
format
(
Neo4jExtractor
.
NEO4J_AUTH_USER
):
neo4j_user
,
'extractor.dashboard_search_data.extractor.neo4j.{}'
.
format
(
Neo4jExtractor
.
NEO4J_AUTH_PW
):
neo4j_password
,
'loader.filesystem.elasticsearch.{}'
.
format
(
FSElasticsearchJSONLoader
.
FILE_PATH_CONFIG_KEY
):
tmp_folder
,
'loader.filesystem.elasticsearch.{}'
.
format
(
FSElasticsearchJSONLoader
.
FILE_MODE_CONFIG_KEY
):
'w'
,
'publisher.elasticsearch.{}'
.
format
(
ElasticsearchPublisher
.
FILE_PATH_CONFIG_KEY
):
tmp_folder
,
'publisher.elasticsearch.{}'
.
format
(
ElasticsearchPublisher
.
FILE_MODE_CONFIG_KEY
):
'r'
,
'publisher.elasticsearch.{}'
.
format
(
ElasticsearchPublisher
.
ELASTICSEARCH_DOC_TYPE_CONFIG_KEY
):
elasticsearch_new_index_key_type
,
'publisher.elasticsearch.{}'
.
format
(
ElasticsearchPublisher
.
ELASTICSEARCH_CLIENT_CONFIG_KEY
):
elasticsearch_client
,
'publisher.elasticsearch.{}'
.
format
(
ElasticsearchPublisher
.
ELASTICSEARCH_NEW_INDEX_CONFIG_KEY
):
elasticsearch_new_index_key
+
str
(
rand
),
'publisher.elasticsearch.{}'
.
format
(
ElasticsearchPublisher
.
ELASTICSEARCH_ALIAS_CONFIG_KEY
):
elasticsearch_index_alias
,
'publisher.elasticsearch.{}'
.
format
(
ElasticsearchPublisher
.
ELASTICSEARCH_MAPPING_CONFIG_KEY
):
DASHBOARD_ES_MAP
})
job
=
DefaultJob
(
conf
=
job_config
,
task
=
task
,
publisher
=
ElasticsearchPublisher
())
return
job
if
__name__
==
"__main__"
:
job
=
create_neo4j_es_job
()
job
.
launch
()
example/scripts/metric_data_loader.py
deleted
100644 → 0
View file @
aedd87fb
"""
This is a example script which demo how to load data into neo4j without using Airflow DAG.
"""
import
logging
import
os
from
pyhocon
import
ConfigFactory
from
databuilder.extractor.generic_extractor
import
GenericExtractor
from
databuilder.job.job
import
DefaultJob
from
databuilder.loader.file_system_neo4j_csv_loader
import
FsNeo4jCSVLoader
from
databuilder.publisher
import
neo4j_csv_publisher
from
databuilder.publisher.neo4j_csv_publisher
import
Neo4jCsvPublisher
from
databuilder.task.task
import
DefaultTask
LOGGER
=
logging
.
getLogger
(
__name__
)
LOGGER
.
setLevel
(
logging
.
INFO
)
# set env NEO4J_HOST to override localhost
NEO4J_ENDPOINT
=
'bolt://{}:7687'
.
format
(
os
.
getenv
(
'NEO4J_HOST'
,
'localhost'
))
neo4j_endpoint
=
NEO4J_ENDPOINT
neo4j_user
=
'neo4j'
neo4j_password
=
'test'
# Input example
input
=
[
{
'dashboard_name'
:
'Agent'
,
'dashboard_group'
:
'Product - Jobs.cz'
,
'name'
:
'Metric 1'
,
'expression'
:
'a/b*(2*x)'
,
'description'
:
'This is description of Metric 1'
,
'type'
:
'MasterMetric'
,
'tags'
:
[
'Dummy Metric TAG'
]},
{
'dashboard_name'
:
'Agent'
,
'dashboard_group'
:
'Product - Jobs.cz'
,
'name'
:
'Metric 2'
,
'expression'
:
'b/a*(2*x)'
,
'description'
:
'M2 This is description of Metric 2'
,
'type'
:
'MasterMetric'
,
'tags'
:
[
'Dummy Metric TAG'
]},
{
'dashboard_name'
:
'Atmoskop'
,
'dashboard_group'
:
'Product - Atmoskop'
,
'name'
:
'Metric 1'
,
'expression'
:
'a/b*(2*x)'
,
'description'
:
'This is description of Metric 1'
,
'type'
:
'MasterMetric'
,
'tags'
:
[
'Dummy Metric TAG1'
]},
{
'dashboard_name'
:
'Atmoskop'
,
'dashboard_group'
:
'Product - Atmoskop'
,
'name'
:
'Metric 3'
,
'expression'
:
'r=b/a*(2*x)'
,
'description'
:
'M3 This is description of Metric 3'
,
'type'
:
'MasterMetric'
,
'tags'
:
[
'Non sense TAG'
]}
]
def
create_metric_neo4j_job
(
**
kwargs
):
tmp_folder
=
'/var/tmp/amundsen/metric_metadata'
node_files_folder
=
'{tmp_folder}/nodes/'
.
format
(
tmp_folder
=
tmp_folder
)
relationship_files_folder
=
'{tmp_folder}/relationships/'
.
format
(
tmp_folder
=
tmp_folder
)
job_config
=
ConfigFactory
.
from_dict
({
'extractor.generic.{}'
.
format
(
GenericExtractor
.
EXTRACTION_ITEMS
):
iter
(
input
),
'extractor.generic.{}'
.
format
(
'model_class'
):
'databuilder.models.metric_metadata.MetricMetadata'
,
'loader.filesystem_csv_neo4j.{}'
.
format
(
FsNeo4jCSVLoader
.
NODE_DIR_PATH
):
node_files_folder
,
'loader.filesystem_csv_neo4j.{}'
.
format
(
FsNeo4jCSVLoader
.
RELATION_DIR_PATH
):
relationship_files_folder
,
'loader.filesystem_csv_neo4j.{}'
.
format
(
FsNeo4jCSVLoader
.
SHOULD_DELETE_CREATED_DIR
):
True
,
'loader.filesystem_csv_neo4j.{}'
.
format
(
FsNeo4jCSVLoader
.
FORCE_CREATE_DIR
):
True
,
'publisher.neo4j.{}'
.
format
(
neo4j_csv_publisher
.
NODE_FILES_DIR
):
node_files_folder
,
'publisher.neo4j.{}'
.
format
(
neo4j_csv_publisher
.
RELATION_FILES_DIR
):
relationship_files_folder
,
'publisher.neo4j.{}'
.
format
(
neo4j_csv_publisher
.
NEO4J_END_POINT_KEY
):
neo4j_endpoint
,
'publisher.neo4j.{}'
.
format
(
neo4j_csv_publisher
.
NEO4J_USER
):
neo4j_user
,
'publisher.neo4j.{}'
.
format
(
neo4j_csv_publisher
.
NEO4J_PASSWORD
):
neo4j_password
,
'publisher.neo4j.{}'
.
format
(
neo4j_csv_publisher
.
JOB_PUBLISH_TAG
):
'unique_tag'
,
# should use unique tag here like {ds}
})
job
=
DefaultJob
(
conf
=
job_config
,
task
=
DefaultTask
(
extractor
=
GenericExtractor
(),
loader
=
FsNeo4jCSVLoader
()),
publisher
=
Neo4jCsvPublisher
())
return
job
if
__name__
==
"__main__"
:
job
=
create_metric_neo4j_job
()
job
.
launch
()
example/scripts/metric_es_publisher.py
deleted
100644 → 0
View file @
aedd87fb
"""
This is a example script which demo how to load data into neo4j without using Airflow DAG.
"""
import
os
import
random
import
textwrap
from
elasticsearch
import
Elasticsearch
from
pyhocon
import
ConfigFactory
from
databuilder.extractor.neo4j_metric_search_data_extractor
import
\
Neo4jMetricSearchDataExtractor
from
databuilder.extractor.neo4j_extractor
import
Neo4jExtractor
from
databuilder.job.job
import
DefaultJob
from
databuilder.loader.file_system_elasticsearch_json_loader
import
\
FSElasticsearchJSONLoader
from
databuilder.publisher.elasticsearch_publisher
import
\
ElasticsearchPublisher
from
databuilder.task.task
import
DefaultTask
# set env ES_HOST to override localhost
es
=
Elasticsearch
([
{
'host'
:
os
.
getenv
(
'ES_HOST'
,
'localhost'
)},
])
# set env NEO4J_HOST to override localhost
NEO4J_ENDPOINT
=
'bolt://{}:7687'
.
format
(
os
.
getenv
(
'NEO4J_HOST'
,
'localhost'
))
neo4j_endpoint
=
NEO4J_ENDPOINT
neo4j_user
=
'neo4j'
neo4j_password
=
'test'
METRIC_ES_MAP
=
textwrap
.
dedent
(
"""
{
"mappings":{
"metric":{
"properties": {
"name": {
"type":"text",
"analyzer": "simple",
"fields": {
"raw": {
"type": "keyword"
}
}
},
"description": {
"type": "text",
"analyzer": "simple"
},
"type": {
"type":"text",
"analyzer": "simple",
"fields": {
"raw": {
"type": "keyword"
}
}
},
"tags": {
"type": "keyword"
},
"dashboards": {
"type":"text",
"analyzer": "simple",
"fields": {
"raw": {
"type": "keyword"
}
}
}
}
}
}
}
"""
)
# todo: Add a second model
def
create_neo4j_es_job
():
tmp_folder
=
'/var/tmp/amundsen/metric/metric_search_data.json'
task
=
DefaultTask
(
loader
=
FSElasticsearchJSONLoader
(),
extractor
=
Neo4jMetricSearchDataExtractor
())
elasticsearch_client
=
es
elasticsearch_new_index_key
=
'metrics'
elasticsearch_new_index_key_type
=
'metric'
elasticsearch_index_alias
=
'metric_search_index'
rand
=
str
(
random
.
randint
(
0
,
1000
))
job_config
=
ConfigFactory
.
from_dict
({
'extractor.dashboard_search_data.extractor.neo4j.{}'
.
format
(
Neo4jExtractor
.
GRAPH_URL_CONFIG_KEY
):
neo4j_endpoint
,
'extractor.dashboard_search_data.extractor.neo4j.{}'
.
format
(
Neo4jExtractor
.
MODEL_CLASS_CONFIG_KEY
):
'databuilder.models.metric_elasticsearch_document.MetricESDocument'
,
'extractor.dashboard_search_data.extractor.neo4j.{}'
.
format
(
Neo4jExtractor
.
NEO4J_AUTH_USER
):
neo4j_user
,
'extractor.dashboard_search_data.extractor.neo4j.{}'
.
format
(
Neo4jExtractor
.
NEO4J_AUTH_PW
):
neo4j_password
,
'loader.filesystem.elasticsearch.{}'
.
format
(
FSElasticsearchJSONLoader
.
FILE_PATH_CONFIG_KEY
):
tmp_folder
,
'loader.filesystem.elasticsearch.{}'
.
format
(
FSElasticsearchJSONLoader
.
FILE_MODE_CONFIG_KEY
):
'w'
,
'publisher.elasticsearch.{}'
.
format
(
ElasticsearchPublisher
.
FILE_PATH_CONFIG_KEY
):
tmp_folder
,
'publisher.elasticsearch.{}'
.
format
(
ElasticsearchPublisher
.
FILE_MODE_CONFIG_KEY
):
'r'
,
'publisher.elasticsearch.{}'
.
format
(
ElasticsearchPublisher
.
ELASTICSEARCH_DOC_TYPE_CONFIG_KEY
):
elasticsearch_new_index_key_type
,
'publisher.elasticsearch.{}'
.
format
(
ElasticsearchPublisher
.
ELASTICSEARCH_CLIENT_CONFIG_KEY
):
elasticsearch_client
,
'publisher.elasticsearch.{}'
.
format
(
ElasticsearchPublisher
.
ELASTICSEARCH_NEW_INDEX_CONFIG_KEY
):
elasticsearch_new_index_key
+
str
(
rand
),
'publisher.elasticsearch.{}'
.
format
(
ElasticsearchPublisher
.
ELASTICSEARCH_ALIAS_CONFIG_KEY
):
elasticsearch_index_alias
,
'publisher.elasticsearch.{}'
.
format
(
ElasticsearchPublisher
.
ELASTICSEARCH_MAPPING_CONFIG_KEY
):
METRIC_ES_MAP
})
job
=
DefaultJob
(
conf
=
job_config
,
task
=
task
,
publisher
=
ElasticsearchPublisher
())
return
job
if
__name__
==
"__main__"
:
job
=
create_neo4j_es_job
()
job
.
launch
()
tests/unit/extractor/test_neo4j_dashboard_search_data_extractor.py
deleted
100644 → 0
View file @
aedd87fb
import
unittest
from
databuilder.extractor.neo4j_dashboard_search_data_extractor
import
Neo4jDashboardSearchDataExtractor
class
TestNeo4jDashboardExtractor
(
unittest
.
TestCase
):
def
test_adding_filter
(
self
):
# type: (Any) -> None
extractor
=
Neo4jDashboardSearchDataExtractor
()
actual
=
extractor
.
_add_publish_tag_filter
(
'foo'
,
'MATCH (dashboard:Dashboard) {publish_tag_filter} RETURN dashboard'
)
self
.
assertEqual
(
actual
,
"""MATCH (dashboard:Dashboard) WHERE dashboard.published_tag = 'foo' RETURN dashboard"""
)
def
test_not_adding_filter
(
self
):
# type: (Any) -> None
extractor
=
Neo4jDashboardSearchDataExtractor
()
actual
=
extractor
.
_add_publish_tag_filter
(
''
,
'MATCH (dashboard:Dashboard) {publish_tag_filter} RETURN dashboard'
)
self
.
assertEqual
(
actual
,
"""MATCH (dashboard:Dashboard) RETURN dashboard"""
)
if
__name__
==
'__main__'
:
unittest
.
main
()
tests/unit/extractor/test_neo4j_metric_search_data_extractor.py
deleted
100644 → 0
View file @
aedd87fb
import
unittest
from
databuilder.extractor.neo4j_metric_search_data_extractor
import
Neo4jMetricSearchDataExtractor
class
TestNeo4jMetricExtractor
(
unittest
.
TestCase
):
def
test_adding_filter
(
self
):
# type: (Any) -> None
extractor
=
Neo4jMetricSearchDataExtractor
()
actual
=
extractor
.
_add_publish_tag_filter
(
'foo'
,
'MATCH (metric:Metric) {publish_tag_filter} RETURN metric'
)
self
.
assertEqual
(
actual
,
"""MATCH (metric:Metric) WHERE metric.published_tag = 'foo' RETURN metric"""
)
def
test_not_adding_filter
(
self
):
# type: (Any) -> None
extractor
=
Neo4jMetricSearchDataExtractor
()
actual
=
extractor
.
_add_publish_tag_filter
(
''
,
'MATCH (metric:Metric) {publish_tag_filter} RETURN metric'
)
self
.
assertEqual
(
actual
,
"""MATCH (metric:Metric) RETURN metric"""
)
if
__name__
==
'__main__'
:
unittest
.
main
()
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment