Source code for vireo.drivers.rabbitmq.driver

import json
import threading
import time
import uuid

from pika            import BasicProperties
from pika.exceptions import ConnectionClosed, ChannelClosed

from ...helper import fill_in_the_blank, log

from .consumer  import Consumer
from .exception import NoConnectionError, SubscriptionNotAllowedError
from .helper    import active_connection, SHARED_TOPIC_EXCHANGE_NAME, SHARED_SIGNAL_CONNECTION_LOSS


[docs]class Driver(object): """ Driver for RabbitMQ :param str url: the URL to the server :param list consumer_classes: the list of :class:`.consumer.Consumer`-based classes :param bool unlimited_retries: the flag to disable limited retry count. :param callable on_connect: a callback function when the message consumption begins. :param callable on_disconnect: a callback function when the message consumption is interrupted due to unexpected disconnection. :param callable on_error: a callback function when the message consumption is interrupted due to exception raised from the main callback function. Here is an example for ``on_connect``. .. code-block:: Python def on_connect(consumer = None): ... Here is an example for ``on_disconnect``. .. code-block:: Python def on_disconnect(consumer = None): ... Here is an example for ``on_error``. .. code-block:: Python def on_error(exception, consumer = None): ... """ def __init__(self, url, consumer_classes = None, unlimited_retries = False, on_connect = None, on_disconnect = None, on_error = None): for consumer_class in consumer_classes or []: assert isinstance(consumer_class, Consumer), 'This ({}) needs to be a subclass of vireo.drivers.rabbitmq.Consumer.'.format(consumer_class) self._url = url self._consumer_classes = consumer_classes or [] self._async_listener = None self._shared_stream = [] self._consumers = [] self._has_term_signal = False self._active_routes = [] self._unlimited_retries = unlimited_retries self._on_connect = on_connect self._on_disconnect = on_disconnect self._on_error = on_error def set_on_connect(self, on_connect): self._on_connect = on_connect def set_on_disconnect(self, on_disconnect): self._on_disconnect = on_disconnect def set_on_error(self, on_error): self._on_error = on_error
[docs] def setup_async_cleanup(self): """ Prepare to cleanly join all consumers asynchronously. """ if self._async_listener and self._async_listener.is_alive(): raise SubscriptionNotAllowedError('Unable to consume messages as this driver is currently active.') self._async_listener = threading.Thread(target = self.join) self._async_listener.start()
[docs] def stop_consuming(self): """ Send the signal to stop consumption. """ self._has_term_signal = True
[docs] def join(self): """ Synchronously join all consumers.""" try: while True: if self._has_term_signal: log('warning', 'Stopping all route listeners') break if SHARED_SIGNAL_CONNECTION_LOSS in self._shared_stream: log('error', 'Unexpected connection loss detected') log('warning', 'Terminating all route listeners') break time.sleep(1) except KeyboardInterrupt: log('warning', 'SIGTERM received') log('debug', 'Terminating all route listeners') connection_losed = SHARED_SIGNAL_CONNECTION_LOSS in self._shared_stream for consumer in self._consumers: if not connection_losed: if not consumer.is_alive(): log('info', 'Route {}: Already stopped listening (not alive).'.format(consumer.route)) continue log('warning', 'Route {}: Sending the signal to stop listening.'.format(consumer.route)) consumer.stop() try: log('debug', 'Route {}: Terminating the listener.'.format(consumer.route)) consumer._stop() except AssertionError: # this is raised if the thread lock is still locked. log('warning', 'Route {}: Probably already stopped'.format(consumer.route)) if not consumer.is_alive(): log('info', 'Route {}: Termination confirmed (killed)'.format(consumer.route)) continue log('debug', 'Route {}: Waiting the listener to join back to the parent thread.'.format(consumer.route)) consumer.join() log('info', 'Route {}: Termination confirmed (joined).'.format(consumer.route)) if connection_losed: raise NoConnectionError('Unexpectedly losed the connection during message consumption')
[docs] def publish(self, route, message, options = None): """ Synchronously publish a message :param str route: the route :param str message: the message :param dict options: additional options for basic_publish """ default_parameters = { 'exchange' : '', 'routing_key' : route or '', 'body' : json.dumps(message), 'properties' : BasicProperties(content_type = 'application/json'), } options = fill_in_the_blank(options or {}, default_parameters) with active_connection(self._url, self._on_connect, self._on_disconnect) as channel: try: log('debug', 'Publishing: route={} message={} options={}'.format(route, message, options)) channel.basic_publish(**options) log('debug', 'Published: route={} message={} options={}'.format(route, message, options)) except ConnectionClosed: raise NoConnectionError('Unexpectedly losed the connection while publishing a message')
def declare_queue_with_delegation(self, origin_queue_name, ttl, fallback_queue_name = None, common_queue_options = None, exchange_options = None): actual_fallback_queue_name = fallback_queue_name or '{}.delegated'.format(origin_queue_name) exchange_name = 'fallback/{}/{}'.format(origin_queue_name, actual_fallback_queue_name) exchange_options = fill_in_the_blank(exchange_options or {}, {'internal': True}) common_queue_options = common_queue_options or {} default_fallback_queue_options = {'auto_delete': False} default_origin_queue_options = { 'auto_delete' : False, 'arguments' : { 'x-dead-letter-exchange' : exchange_name, 'x-dead-letter-routing-key' : actual_fallback_queue_name, 'x-message-ttl' : ttl, } } fallback_queue_options = fill_in_the_blank(dict(common_queue_options), default_fallback_queue_options) origin_queue_options = fill_in_the_blank(dict(common_queue_options), default_origin_queue_options) fill_in_the_blank(exchange_options, {'exchange': exchange_name, 'exchange_type': 'direct'}) with active_connection(self._url, self._on_connect, self._on_disconnect) as channel: try: channel.exchange_declare(**exchange_options) self.declare_queue(actual_fallback_queue_name, fallback_queue_options) channel.queue_bind( queue = actual_fallback_queue_name, exchange = exchange_name, ) self.declare_queue(origin_queue_name, origin_queue_options) except ConnectionClosed: if self._on_disconnect: async_callback = threading.Thread(target = self._on_disconnect, daemon = True) async_callback.start() raise NoConnectionError('Unexpectedly losed the connection while orchestrating queues and exchange for delegation')
[docs] def broadcast(self, route, message, options = None): """ Broadcast a message to a particular route. :param str route: the route :param str message: the message :param dict options: additional options for basic_publish """ default_parameters = { 'exchange' : '', 'routing_key' : route or '', 'body' : json.dumps(message), 'properties' : BasicProperties(content_type = 'application/json'), } options = fill_in_the_blank(options or {}, default_parameters) if 'exchange' not in options or not options['exchange']: options['exchange'] = SHARED_TOPIC_EXCHANGE_NAME exchange_name = options['exchange'] with active_connection(self._url, self._on_connect, self._on_disconnect) as channel: try: log('debug', 'Declaring a shared topic exchange') channel.exchange_declare( exchange = exchange_name, exchange_type = 'topic', passive = False, durable = True, auto_delete = False, ) log('debug', 'Declared a shared topic exchange') log('debug', 'Broadcasting: route={} message={} options={}'.format(route, message, options)) channel.basic_publish(**options) log('debug', 'Broadcasted: route={} message={} options={}'.format(route, message, options)) except ConnectionClosed: if self._on_disconnect: async_callback = threading.Thread(target = self._on_disconnect, daemon = True) async_callback.start() raise NoConnectionError('Unexpectedly losed the connection while broadcasting an event')
def observe(self, route, callback, resumable, distributed, options = None, simple_handling = True, controller_id = None): consumer_class = Consumer for overriding_consumer_class in self._consumer_classes: if overriding_consumer_class.can_handle_route(route): consumer_class = overriding_consumer_class break if not controller_id: controller_id = str(uuid.uuid4()) log('info', 'Observer on {} will have the self-assigned controller ID {}'.format(route, controller_id)) consumer = consumer_class(self._url, route, callback, self._shared_stream, resumable, distributed, options, simple_handling, self._unlimited_retries, self._on_connect, self._on_disconnect, self._on_error, controller_id) self._consumers.append(consumer) consumer.start() return consumer