Unverified Commit 3b89a78f authored by Jin Hyuk Chang's avatar Jin Hyuk Chang Committed by GitHub

[DPTOOLS-1903] Remove stale data in ES index (#15)

parent 37c700d9
...@@ -63,6 +63,7 @@ class Neo4jExtractor(Extractor): ...@@ -63,6 +63,7 @@ class Neo4jExtractor(Extractor):
""" """
Create an iterator to execute sql. Create an iterator to execute sql.
""" """
LOGGER.info('Executing query {}'.format(self.cypher_query))
result = tx.run(self.cypher_query) result = tx.run(self.cypher_query)
return result return result
......
...@@ -6,6 +6,7 @@ from pyhocon import ConfigTree # noqa: F401 ...@@ -6,6 +6,7 @@ from pyhocon import ConfigTree # noqa: F401
from databuilder import Scoped from databuilder import Scoped
from databuilder.extractor.base_extractor import Extractor from databuilder.extractor.base_extractor import Extractor
from databuilder.extractor.neo4j_extractor import Neo4jExtractor from databuilder.extractor.neo4j_extractor import Neo4jExtractor
from databuilder.publisher.neo4j_csv_publisher import JOB_PUBLISH_TAG
class Neo4jSearchDataExtractor(Extractor): class Neo4jSearchDataExtractor(Extractor):
...@@ -18,6 +19,7 @@ class Neo4jSearchDataExtractor(Extractor): ...@@ -18,6 +19,7 @@ class Neo4jSearchDataExtractor(Extractor):
DEFAULT_NEO4J_CYPHER_QUERY = textwrap.dedent( DEFAULT_NEO4J_CYPHER_QUERY = textwrap.dedent(
""" """
MATCH (db:Database)<-[:CLUSTER_OF]-(cluster:Cluster)<-[:SCHEMA_OF]-(schema:Schema)<-[:TABLE_OF]-(table:Table) MATCH (db:Database)<-[:CLUSTER_OF]-(cluster:Cluster)<-[:SCHEMA_OF]-(schema:Schema)<-[:TABLE_OF]-(table:Table)
{publish_tag_filter}
OPTIONAL MATCH (table)-[:DESCRIPTION]->(table_description:Description) OPTIONAL MATCH (table)-[:DESCRIPTION]->(table_description:Description)
OPTIONAL MATCH (table)-[read:READ_BY]->(user:User) OPTIONAL MATCH (table)-[read:READ_BY]->(user:User)
OPTIONAL MATCH (table)-[:COLUMN]->(cols:Column) OPTIONAL MATCH (table)-[:COLUMN]->(cols:Column)
...@@ -44,8 +46,11 @@ class Neo4jSearchDataExtractor(Extractor): ...@@ -44,8 +46,11 @@ class Neo4jSearchDataExtractor(Extractor):
self.conf = conf self.conf = conf
# extract cypher query from conf, if specified, else use default query # extract cypher query from conf, if specified, else use default query
self.cypher_query = conf.get_string(Neo4jSearchDataExtractor.CYPHER_QUERY_CONFIG_KEY, if Neo4jSearchDataExtractor.CYPHER_QUERY_CONFIG_KEY in conf:
Neo4jSearchDataExtractor.DEFAULT_NEO4J_CYPHER_QUERY) self.cypher_query = conf.get_string(Neo4jSearchDataExtractor.CYPHER_QUERY_CONFIG_KEY)
else:
self.cypher_query = self._add_publish_tag_filter(conf.get_string(JOB_PUBLISH_TAG, ''),
Neo4jSearchDataExtractor.DEFAULT_NEO4J_CYPHER_QUERY)
self.neo4j_extractor = Neo4jExtractor() self.neo4j_extractor = Neo4jExtractor()
# write the cypher query in configs in Neo4jExtractor scope # write the cypher query in configs in Neo4jExtractor scope
...@@ -72,3 +77,18 @@ class Neo4jSearchDataExtractor(Extractor): ...@@ -72,3 +77,18 @@ class Neo4jSearchDataExtractor(Extractor):
def get_scope(self): def get_scope(self):
# type: () -> str # type: () -> str
return 'extractor.search_data' return 'extractor.search_data'
def _add_publish_tag_filter(self, publish_tag, cypher_query):
"""
Adds publish tag filter into Cypher query
:param publish_tag: value of publish tag.
:param cypher_query:
:return:
"""
# type: (str, str) -> str
if not publish_tag:
publish_tag_filter = ''
else:
publish_tag_filter = """WHERE table.published_tag = '{}'""".format(publish_tag)
return cypher_query.format(publish_tag_filter=publish_tag_filter)
...@@ -35,7 +35,6 @@ class TblColUsgAggExtractor(Extractor): ...@@ -35,7 +35,6 @@ class TblColUsgAggExtractor(Extractor):
def init(self, conf): def init(self, conf):
# type: (ConfigTree) -> None # type: (ConfigTree) -> None
self._extractor = conf.get(RAW_EXTRACTOR) # type: Extractor self._extractor = conf.get(RAW_EXTRACTOR) # type: Extractor
self._extractor.init(Scoped.get_scoped_conf(conf, self._extractor.get_scope())) self._extractor.init(Scoped.get_scoped_conf(conf, self._extractor.get_scope()))
......
from setuptools import setup, find_packages from setuptools import setup, find_packages
__version__ = '1.0.5' __version__ = '1.0.6'
setup( setup(
......
import unittest
from databuilder.extractor.neo4j_search_data_extractor import Neo4jSearchDataExtractor
class TestNeo4jExtractor(unittest.TestCase):
def test_adding_filter(self):
# type: (Any) -> None
extractor = Neo4jSearchDataExtractor()
actual = extractor._add_publish_tag_filter('foo', 'MATCH (table:Table) {publish_tag_filter} RETURN table')
self.assertEqual(actual, """MATCH (table:Table) WHERE table.published_tag = 'foo' RETURN table""")
def test_not_adding_filter(self):
# type: (Any) -> None
extractor = Neo4jSearchDataExtractor()
actual = extractor._add_publish_tag_filter('', 'MATCH (table:Table) {publish_tag_filter} RETURN table')
self.assertEqual(actual, """MATCH (table:Table) RETURN table""")
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment