Unverified Commit 500f6252 authored by Jin Hyuk Chang's avatar Jin Hyuk Chang Committed by GitHub

[DPTOOLS-2312] Add callback registration in Publisher (#57)

* [DPTOOLS-2312] Add callback registration in Publisher

* Increment version
parent 95635d5b
......@@ -274,3 +274,10 @@ job = DefaultJob(
publisher=ElasticsearchPublisher())
job.launch()
```
#### [Callback](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/callback/call_back.py "Callback")
Callback interface is built upon a [Observer pattern](https://en.wikipedia.org/wiki/Observer_pattern "Observer pattern") where the participant want to take any action when target's state changes.
Publisher is the first one adopting Callback where registered Callback will be called either when publish succeeded or when publish failed. In order to register callback, Publisher provides [register_call_back](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/publisher/base_publisher.py#L50 "register_call_back") method.
One use case is for Extractor that needs to commit when job is finished (e.g: Kafka). Having Extractor register a callback to Publisher to commit when publish is successful, extractor can safely commit by implementing commit logic into [on_success](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/callback/call_back.py#L18 "on_success") method.
\ No newline at end of file
import abc
import logging
import six
from typing import List, Optional # noqa: F401
LOGGER = logging.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta)
class Callback(object):
"""
A callback interface that expected to fire "on_success" if the operation is successful, else "on_failure" if
operation failed.
"""
@abc.abstractmethod
def on_success(self):
# type: () -> None
"""
A call back method that will be called when operation is successful
:return: None
"""
pass
@abc.abstractmethod
def on_failure(self):
# type: () -> None
"""
A call back method that will be called when operation failed
:return: None
"""
pass
def notify_callbacks(callbacks, is_success):
"""
A Utility method that notifies callback. If any callback fails it will still go through all the callbacks,
and raise the last exception it experienced.
:param callbacks:
:param is_success:
:return:
"""
# type: (List[Callback], bool) -> None
if not callbacks:
LOGGER.info('No callbacks to notify')
return
LOGGER.info('Notifying callbacks')
last_exception = None # type: Optional[Exception]
for callback in callbacks:
try:
if is_success:
callback.on_success()
else:
callback.on_failure()
except Exception as e:
LOGGER.exception('Failed while notifying callback')
last_exception = e
if last_exception:
raise last_exception
......@@ -25,6 +25,7 @@ class DefaultJob(Job):
To configure statsd itself, use environment variable: https://statsd.readthedocs.io/en/v3.2.1/configure.html
"""
def __init__(self,
conf,
task,
......@@ -53,7 +54,7 @@ class DefaultJob(Job):
def launch(self):
# type: () -> None
"""
Launch a job by initializing job, run task and publish
Launch a job by initializing job, run task and publish.
:return:
"""
......@@ -71,6 +72,7 @@ class DefaultJob(Job):
self.publisher.init(Scoped.get_scoped_conf(self.conf, self.publisher.get_scope()))
Job.closer.register(self.publisher.close)
self.publisher.publish()
except Exception as e:
is_success = False
raise e
......
......@@ -3,6 +3,8 @@ import abc
from pyhocon import ConfigTree # noqa: F401
from databuilder import Scoped
from databuilder.callback import call_back
from databuilder.callback.call_back import Callback # noqa: F401
class Publisher(Scoped):
......@@ -18,16 +20,43 @@ class Publisher(Scoped):
and push to Neo4j.
"""
def __init__(self):
self.call_backs = [] # type: List[Callback]
@abc.abstractmethod
def init(self, conf):
# type: (ConfigTree) -> None
pass
@abc.abstractmethod
def publish(self):
try:
self.publish_impl()
except Exception as e:
call_back.notify_callbacks(self.call_backs, is_success=False)
raise e
call_back.notify_callbacks(self.call_backs, is_success=True)
@abc.abstractmethod
def publish_impl(self):
# type: () -> None
"""
An implementation of publish method. Subclass of publisher is expected to write publish logic by overriding
this method
:return: None
"""
pass
def register_call_back(self, callback):
# type: (Callback) -> None
"""
Register any callback method that needs to be notified when publisher is either able to successfully publish
or failed to publish
:param callback:
:return: None
"""
self.call_backs.append(callback)
def get_scope(self):
# type: () -> str
return 'publisher'
......@@ -36,13 +65,13 @@ class Publisher(Scoped):
class NoopPublisher(Publisher):
def __init__(self):
# type: () -> None
pass
super(NoopPublisher, self).__init__()
def init(self, conf):
# type: (ConfigTree) -> None
pass
def publish(self):
def publish_impl(self):
# type: () -> None
pass
......
......@@ -107,7 +107,7 @@ class ElasticsearchPublisher(Publisher):
def __init__(self):
# type: () -> None
pass
super(ElasticsearchPublisher, self).__init__()
def init(self, conf):
# type: (ConfigTree) -> None
......@@ -139,7 +139,7 @@ class ElasticsearchPublisher(Publisher):
# return empty list on exception
return []
def publish(self):
def publish_impl(self):
# type: () -> None
"""
Use Elasticsearch Bulk API to load data from file to a {new_index}.
......
......@@ -101,7 +101,7 @@ class Neo4jCsvPublisher(Publisher):
def __init__(self):
# type: () -> None
pass
super(Neo4jCsvPublisher, self).__init__()
def init(self, conf):
# type: (ConfigTree) -> None
......@@ -146,7 +146,7 @@ class Neo4jCsvPublisher(Publisher):
path = conf.get_string(path_key)
return [join(path, f) for f in listdir(path) if isfile(join(path, f))]
def publish(self):
def publish_impl(self):
# type: () -> None
"""
Publishes Nodes first and then Relations
......
from setuptools import setup, find_packages
__version__ = '1.1.0'
__version__ = '1.2.0'
setup(
......
import unittest
from mock import MagicMock
from databuilder.callback import call_back
class TestCallBack(unittest.TestCase):
def test_success_notify(self):
# type: () -> None
callback1 = MagicMock()
callback2 = MagicMock()
callbacks = [callback1, callback2]
call_back.notify_callbacks(callbacks, is_success=True)
self.assertTrue(callback1.on_success.called)
self.assertTrue(not callback1.on_failure.called)
self.assertTrue(callback2.on_success.called)
self.assertTrue(not callback2.on_failure.called)
def test_failure_notify(self):
# type: () -> None
callback1 = MagicMock()
callback2 = MagicMock()
callbacks = [callback1, callback2]
call_back.notify_callbacks(callbacks, is_success=False)
self.assertTrue(not callback1.on_success.called)
self.assertTrue(callback1.on_failure.called)
self.assertTrue(not callback2.on_success.called)
self.assertTrue(callback2.on_failure.called)
def test_notify_failure(self):
# type: () -> None
callback1 = MagicMock()
callback2 = MagicMock()
callback2.on_success.side_effect = Exception('Boom')
callback3 = MagicMock()
callbacks = [callback1, callback2, callback3]
try:
call_back.notify_callbacks(callbacks, is_success=True)
self.assertTrue(False)
except Exception:
self.assertTrue(True)
self.assertTrue(callback1.on_success.called)
self.assertTrue(callback2.on_success.called)
self.assertTrue(callback3.on_success.called)
if __name__ == '__main__':
unittest.main()
import unittest
from mock import MagicMock
from databuilder.publisher.base_publisher import Publisher, NoopPublisher
class TestPublisher(unittest.TestCase):
def testCallback(self):
# type: () -> None
publisher = NoopPublisher()
callback = MagicMock()
publisher.register_call_back(callback)
publisher.publish()
self.assertTrue(callback.on_success.called)
def testFailureCallback(self):
# type: () -> None
publisher = FailedPublisher()
callback = MagicMock()
publisher.register_call_back(callback)
try:
publisher.publish()
except Exception:
pass
self.assertTrue(callback.on_failure.called)
class FailedPublisher(Publisher):
def __init__(self):
# type: () -> None
super(FailedPublisher, self).__init__()
def init(self, conf):
# type: (ConfigTree) -> None
pass
def publish_impl(self):
# type: () -> None
raise Exception('Bomb')
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment