Unverified Commit 0ab3f467 authored by Tao Feng's avatar Tao Feng Committed by GitHub

Provide configuration to control steps for ES publisher (#256)

* Provide configuration on control steps for ES publisher

* change name

* address comment
parent 54ef0ff5
...@@ -29,6 +29,9 @@ class ElasticsearchPublisher(Publisher): ...@@ -29,6 +29,9 @@ class ElasticsearchPublisher(Publisher):
ELASTICSEARCH_ALIAS_CONFIG_KEY = 'alias' ELASTICSEARCH_ALIAS_CONFIG_KEY = 'alias'
ELASTICSEARCH_MAPPING_CONFIG_KEY = 'mapping' ELASTICSEARCH_MAPPING_CONFIG_KEY = 'mapping'
# config to control how many max documents to publish at a time
ELASTICSEARCH_PUBLISHER_BATCH_SIZE = 'batch_size'
# Specifying default mapping for elasticsearch index # Specifying default mapping for elasticsearch index
# Documentation: https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping.html # Documentation: https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping.html
# Setting type to "text" for all fields that would be used in search # Setting type to "text" for all fields that would be used in search
...@@ -137,7 +140,8 @@ class ElasticsearchPublisher(Publisher): ...@@ -137,7 +140,8 @@ class ElasticsearchPublisher(Publisher):
self.elasticsearch_mapping = self.conf.get(ElasticsearchPublisher.ELASTICSEARCH_MAPPING_CONFIG_KEY, self.elasticsearch_mapping = self.conf.get(ElasticsearchPublisher.ELASTICSEARCH_MAPPING_CONFIG_KEY,
ElasticsearchPublisher.DEFAULT_ELASTICSEARCH_INDEX_MAPPING) ElasticsearchPublisher.DEFAULT_ELASTICSEARCH_INDEX_MAPPING)
self.elasticsearch_batch_size = self.conf.get(ElasticsearchPublisher.ELASTICSEARCH_PUBLISHER_BATCH_SIZE,
10000)
self.file_handler = open(self.file_path, self.file_mode) self.file_handler = open(self.file_path, self.file_mode)
def _fetch_old_index(self): def _fetch_old_index(self):
...@@ -172,17 +176,25 @@ class ElasticsearchPublisher(Publisher): ...@@ -172,17 +176,25 @@ class ElasticsearchPublisher(Publisher):
# Bulk load JSON format is defined here: # Bulk load JSON format is defined here:
# https://www.elastic.co/guide/en/elasticsearch/reference/6.2/docs-bulk.html # https://www.elastic.co/guide/en/elasticsearch/reference/6.2/docs-bulk.html
bulk_actions = [] bulk_actions = []
cnt = 0
# create new index with mapping
self.elasticsearch_client.indices.create(index=self.elasticsearch_new_index, body=self.elasticsearch_mapping)
for action in actions: for action in actions:
index_row = dict(index=dict(_index=self.elasticsearch_new_index, index_row = dict(index=dict(_index=self.elasticsearch_new_index,
_type=self.elasticsearch_type)) _type=self.elasticsearch_type))
bulk_actions.append(index_row) bulk_actions.append(index_row)
bulk_actions.append(action) bulk_actions.append(action)
cnt += 1
# create new index with mapping if cnt == self.elasticsearch_batch_size:
self.elasticsearch_client.indices.create(index=self.elasticsearch_new_index, body=self.elasticsearch_mapping) self.elasticsearch_client.bulk(bulk_actions)
LOGGER.info('Publish {} of records to ES'.format(str(cnt)))
# bulk upload data cnt = 0
self.elasticsearch_client.bulk(bulk_actions) bulk_actions = []
# Do the final bulk actions
if bulk_actions:
self.elasticsearch_client.bulk(bulk_actions)
# fetch indices that have {elasticsearch_alias} as alias # fetch indices that have {elasticsearch_alias} as alias
elasticsearch_old_indices = self._fetch_old_index() elasticsearch_old_indices = self._fetch_old_index()
......
...@@ -2,7 +2,7 @@ import os ...@@ -2,7 +2,7 @@ import os
from setuptools import setup, find_packages from setuptools import setup, find_packages
__version__ = '2.5.9' __version__ = '2.5.10'
requirements_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'requirements.txt') requirements_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'requirements.txt')
with open(requirements_path) as requirements_file: with open(requirements_path) as requirements_file:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment