Unverified Commit 319ebcc1 authored by Jin Hyuk Chang's avatar Jin Hyuk Chang Committed by GitHub

Replace Threadpool to Pool on SqlToTblColUsageTransformer (#99)

* Replace Threadpool to Pool

* Update

* Update
parent 739c6383
try:
import copy_reg
except Exception:
import copyreg as copy_reg
import logging
from multiprocessing.pool import ThreadPool, TimeoutError
import types
from multiprocessing.pool import Pool, TimeoutError
from pyhocon import ConfigTree # noqa: F401
from typing import Any, Optional, List, Iterable # noqa: F401
......@@ -14,6 +20,19 @@ from databuilder.transformer.base_transformer import Transformer
LOGGER = logging.getLogger(__name__)
def _pickle_method(m):
"""
Pool needs to pickle method in order to pass it to separate process. This method is to define how to pickle method.
"""
if m.im_self is None:
return getattr, (m.im_class, m.im_func.func_name)
else:
return getattr, (m.im_self, m.im_func.func_name)
copy_reg.pickle(types.MethodType, _pickle_method)
class SqlToTblColUsageTransformer(Transformer):
"""
Note that it currently only supports Presto SQL.
......@@ -44,7 +63,7 @@ class SqlToTblColUsageTransformer(Transformer):
self._sql_stmt_attr = conf.get_string(SqlToTblColUsageTransformer.SQL_STATEMENT_ATTRIBUTE_NAME)
self._user_email_attr = conf.get_string(SqlToTblColUsageTransformer.USER_EMAIL_ATTRIBUTE_NAME)
self._tbl_to_schema_mapping = self._create_schema_by_table_mapping()
self._worker_pool = ThreadPool(processes=1)
self._worker_pool = Pool(processes=1)
self._time_out_sec = conf.get_int(SqlToTblColUsageTransformer.COLUMN_EXTRACTION_TIMEOUT_SEC, 10)
LOGGER.info('Column extraction timeout: {} seconds'.format(self._time_out_sec))
self._log_all_extraction_failures = conf.get_bool(SqlToTblColUsageTransformer.LOG_ALL_EXTRACTION_FAILURES,
......@@ -66,7 +85,7 @@ class SqlToTblColUsageTransformer(Transformer):
LOGGER.exception('Timed out while getting column usage from query: {}'.format(stmt))
LOGGER.info('Killing the thread.')
self._worker_pool.terminate()
self._worker_pool = ThreadPool(processes=1)
self._worker_pool = Pool(processes=1)
LOGGER.info('Killed the thread.')
return None
except Exception:
......
from setuptools import setup, find_packages
__version__ = '1.3.5'
__version__ = '1.3.6'
setup(
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment