import os from pyhocon import ConfigTree # noqa: F401 from databuilder.loader.base_loader import Loader from databuilder.models.elasticsearch_document import ElasticsearchDocument class FSElasticsearchJSONLoader(Loader): """ Loader class to produce Elasticsearch bulk load file to Local FileSystem """ FILE_PATH_CONFIG_KEY = 'file_path' FILE_MODE_CONFIG_KEY = 'mode' def init(self, conf): # type: (ConfigTree) -> None """ :param conf: :return: """ self.conf = conf self.file_path = self.conf.get_string(FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY) self.file_mode = self.conf.get_string(FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY, 'w') file_dir = self.file_path.rsplit('/', 1)[0] self._ensure_directory_exists(file_dir) self.file_handler = open(self.file_path, self.file_mode) def _ensure_directory_exists(self, path): # type: (str) -> None """ Check to ensure file directory exists; create the directories otherwise :param path: :return: None """ if os.path.exists(path): return # nothing to do here os.makedirs(path) def load(self, record): # type: (ElasticsearchDocument) -> None """ Write a record in json format to file :param record: :return: """ if not record: return if not isinstance(record, ElasticsearchDocument): raise Exception("Record not of type 'ElasticsearchDocument'!") self.file_handler.write(record.to_json()) self.file_handler.flush() def close(self): # type: () -> None """ close the file handler :return: """ self.file_handler.close() def get_scope(self): # type: () -> str return 'loader.filesystem.elasticsearch'