Source code for vireo.observer

from uuid import uuid4

from .core   import Core
from .helper import log
from .model  import RemoteSignal

ASYNC_START = 1
SYNC_START  = 2


[docs]class UnknownRunningModeError(RuntimeError): """ Error for unknown running mode """
[docs]class Observer(Core): """ Event Observer """ def __init__(self, driver): Core.__init__(self, driver) self._identifier = str(uuid4()) self._broadcast_event_to_observer_map = {} self._normal_event_to_observer_map = {} log('debug', 'Observer ID: {}'.format(self.id)) @property def id(self) -> str: """ Observer Identifier """ return self._identifier
[docs] def on(self, event_name, callback, resumable = False, simple_handling = True, options = None, delay_per_message = 0, max_retries = None, immediate_retry_limit = None, max_retry_timeout = None): """ Listen to an event with a callback function. :param str event_name: the name of the event :param callable callback: the callback callable :param bool resumable: the flag to indicate whether the event consumption can be resumed (as the data stream will never be deleted). :param bool simple_handling: the flag to instruct the code to return the content of the message, instead of returning the whole :class:`vireo.model.Message` object. :param dict options: the extra options to the method ``observe`` of the driver :param float delay_per_message: the delay per message (any negative numbers are regarded as zero, zero or any equivalent value is regarded as "no delay") :param int max_retries: maximum allowed retry count :param int immediate_retry_limit: allowed immediate retry count :param int max_retry_timeout: maximum retry timeout The callback is a callable object, e.g., function, class method and lambda object, which takes only one parameter which is a JSON-decoded object. For example, .. code-block:: python def on_foo(self, message): print('on_foo:', message) app.on('foo', on_foo) app.on('foo.lambda', lambda x: print('foo_lambda:', x)) Here is an example for ``error_handler``. .. code-block:: Python def error_handler(consumer, exception): ... """ extra_options = dict( simple_handling = simple_handling, controller_id = self.id, delay_per_message = delay_per_message, max_retries = max_retries, immediate_retry_limit = immediate_retry_limit, max_retry_timeout = max_retry_timeout, ) internal_observer = self._driver.observe(event_name, callback, resumable, False, options, **extra_options) self._register_event_handler(self._normal_event_to_observer_map, event_name, internal_observer) return internal_observer
[docs] def on_broadcast(self, event_name, callback, simple_handling = True, options = None, delay_per_message = 0, max_retries = None, immediate_retry_limit = None, max_retry_timeout = None): """ Listen to an distributed event with a callback function. :param str event_name: the name of the event :param callable callback: the callback callable :param bool simple_handling: the flag to instruct the code to return the content of the message, instead of returning the whole :class:`vireo.model.Message` object. :param dict options: the extra options to the method ``observe`` of the driver :param float delay_per_message: the delay per message (any negative numbers are regarded as zero, zero or any equivalent value is regarded as "no delay") :param int max_retries: maximum allowed retry count :param int immediate_retry_limit: allowed immediate retry count :param int max_retry_timeout: maximum retry timeout The callback is a callable object, e.g., function, class method and lambda object, which takes only one parameter which is a JSON-decoded object. For example, .. code-block:: python def on_foo(self, message): print('on_foo:', message) app.on('foo', on_foo) app.on('foo.lambda', lambda x: print('foo_lambda:', x)) Here is an example for ``error_handler``. .. code-block:: Python def error_handler(consumer, exception): ... """ extra_options = dict( simple_handling = simple_handling, controller_id = self.id, delay_per_message = delay_per_message, max_retries = max_retries, immediate_retry_limit = immediate_retry_limit, max_retry_timeout = max_retry_timeout, ) internal_observer = self._driver.observe(event_name, callback, False, True, options, **extra_options) self._register_event_handler(self._broadcast_event_to_observer_map, event_name, internal_observer) return internal_observer
def pause_on(self, event_name, remote_identifier = None): if not remote_identifier: self._remote_to_all_observers_on(self._normal_event_to_observer_map, event_name, 'pause') return self.emit( event_name, { 'remote_signal' : RemoteSignal.PAUSE, 'controller_id' : remote_identifier, } ) def resume_on(self, event_name, remote_identifier = None): if not remote_identifier: self._remote_to_all_observers_on(self._normal_event_to_observer_map, event_name, 'resume') return self.emit( event_name, { 'remote_signal' : RemoteSignal.RESUME, 'controller_id' : remote_identifier, } ) def pause_on_broadcast(self, event_name, remote_identifier = None): if not remote_identifier: self._remote_to_all_observers_on(self._broadcast_event_to_observer_map, event_name, 'pause') return self.broadcast( event_name, { 'remote_signal' : RemoteSignal.PAUSE, 'controller_id' : remote_identifier, } ) def resume_on_broadcast(self, event_name, remote_identifier = None): if not remote_identifier: self._remote_to_all_observers_on(self._broadcast_event_to_observer_map, event_name, 'resume') return self.broadcast( event_name, { 'remote_signal' : RemoteSignal.RESUME, 'controller_id' : remote_identifier, } )
[docs] def join(self, running_mode = SYNC_START): """ Wait for all handlers to stop. There are two mode: synchronous (``vireo.observer.SYNC_START``) and asynchronous (``vireo.observer.ASYNC_START``) joins. .. code-block:: python app.join(ASYNC_START) """ if running_mode == ASYNC_START: self._driver.setup_async_cleanup() return if running_mode == SYNC_START: self._driver.join() return raise UnknownRunningModeError(running_mode)
[docs] def stop(self): """ Send the signal to all handlers to stop observation. .. warning:: This method does not block the caller thread while waiting all handlers to stop. .. code-block:: python app.stop() """ if self._driver: self._driver.stop_consuming()
def _register_event_handler(self, observer_map, event_name, observer): if event_name not in observer_map: observer_map[event_name] = [] observer_map[event_name].append(observer) def _find_all_event_handlers(self, observer_map, event_name, observer): if event_name not in observer_map: return [] return observer_map[event_name] def _remote_to_all_observers_on(self, observer_map, event_name, remote_command): internal_observers = self._find_all_event_handlers(observer_map, event_name) for internal_observer in internal_observers: if not hasattr(internal_observer, remote_command): continue actual_remote_command = getattr(internal_observer, remote_command) if not callable(actual_remote_command): continue actual_remote_command()