Unverified Commit 95635d5b authored by Jin Hyuk Chang's avatar Jin Hyuk Chang Committed by GitHub

[DPTOOLS-2299] Separate Index creation step from Node publish (#56)

* [DPTOOLS-2299] Separate Index creation step from Node publish

* Update
parent bfb69f03
......@@ -68,7 +68,6 @@ RELATION_REQUIRED_KEYS = {RELATION_START_LABEL, RELATION_START_KEY,
RELATION_END_LABEL, RELATION_END_KEY,
RELATION_TYPE, RELATION_REVERSE_TYPE}
DEFAULT_CONFIG = ConfigFactory.from_dict({NEO4J_TRANSCATION_SIZE: 500,
NEO4J_RELATIONSHIP_CREATION_CONFIRM: False,
NEO4J_MAX_CONN_LIFE_TIME_SEC: 50})
......@@ -99,6 +98,7 @@ class Neo4jCsvPublisher(Publisher):
#TODO User UNWIND batch operation for better performance
"""
def __init__(self):
# type: () -> None
pass
......@@ -155,6 +155,10 @@ class Neo4jCsvPublisher(Publisher):
start = time.time()
LOGGER.info('Creating indices using Node files: {}'.format(self._node_files))
for node_file in self._node_files:
self._create_indices(node_file=node_file)
LOGGER.info('Publishing Node files: {}'.format(self._node_files))
while True:
try:
......@@ -178,6 +182,24 @@ class Neo4jCsvPublisher(Publisher):
# type: () -> str
return 'publisher.neo4j'
def _create_indices(self, node_file):
"""
Go over the node file and try creating unique index
:param node_file:
:return:
"""
# type: (str) -> None
LOGGER.info('Creating indices. (Existing indices will be ignored)')
with open(node_file, 'r') as node_csv:
for node_record in csv.DictReader(node_csv):
label = node_record[NODE_LABEL_KEY]
if label not in self.labels:
self._try_create_index(label)
self.labels.add(label)
LOGGER.info('Indices have been created.')
def _publish_node(self, node_file):
# type: (str) -> None
"""
......@@ -199,16 +221,6 @@ class Neo4jCsvPublisher(Publisher):
tx = self._session.begin_transaction()
with open(node_file, 'r') as node_csv:
for count, node_record in enumerate(csv.DictReader(node_csv)):
label = node_record[NODE_LABEL_KEY]
# If label is seen for the first time, try creating unique index
if label not in self.labels:
tx.commit() # Transaction needs to be committed as index update will make transaction to abort.
LOGGER.info('Committed {} records'.format(count + 1))
self._try_create_index(label)
self.labels.add(label)
tx = self._session.begin_transaction()
stmt = self.create_node_merge_statement(node_record=node_record)
tx = self._execute_statement(stmt, tx, count)
......@@ -358,7 +370,7 @@ ON MATCH SET {update_prop_body}""".format(create_prop_body=create_prop_body,
LOGGER.debug('Executing statement: {}'.format(stmt))
if six.PY2:
result = tx.run(unicode(stmt, errors='ignore')) # noqa
result = tx.run(unicode(stmt, errors='ignore')) # noqa
else:
result = tx.run(str(stmt).encode('utf-8', 'ignore'))
if expect_result and not result.single():
......
......@@ -51,8 +51,8 @@ class TestPublish(unittest.TestCase):
self.assertEqual(mock_run.call_count, 6)
# 2 node files, 1 relation file, and 2 more commits before index creation
self.assertEqual(mock_commit.call_count, 5)
# 2 node files, 1 relation file
self.assertEqual(mock_commit.call_count, 3)
if __name__ == '__main__':
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment