import json
import threading
import traceback
import time
from pika import BasicProperties
from pika.exceptions import ConnectionClosed, ChannelClosed, IncompatibleProtocolError
from ...helper import log
from ...model import Message, RemoteSignal
from .exception import NoConnectionError
from .helper import active_connection, fill_in_the_blank, SHARED_TOPIC_EXCHANGE_NAME, SHARED_SIGNAL_CONNECTION_LOSS
MAX_RETRY_COUNT = 120
PING_MESSAGE = 'ping'
[docs]class Consumer(threading.Thread):
""" Message consumer
This is used to handle messages on one particular route/queue.
:param str url: the URL to the RabbitMQ server
:param str route: the route to observe
:param callable callback: the callback function / callable object
:param list shared_stream: the internal message queue for thread synchronization
:param bool resumable: the flag to indicate whether the consumption is resumable
:param bool resumable: the flag to indicate whether the messages are distributed evenly across all consumers on the same route
:param dict queue_options: additional queue options
:param bool unlimited_retries: the flag to disable limited retry count.
:param callable on_connect: a callback function when the message consumption begins.
:param callable on_disconnect: a callback function when the message consumption is interrupted due to unexpected disconnection.
:param callable on_error: a callback function when the message consumption is interrupted due to exception raised from the main callback function.
Here is an example for ``on_connect``.
.. code-block:: Python
def on_connect(consumer = None):
...
Here is an example for ``on_disconnect``.
.. code-block:: Python
def on_disconnect(consumer = None):
...
Here is an example for ``on_error``.
.. code-block:: Python
def on_error(exception, consumer = None):
...
"""
def __init__(self, url, route, callback, shared_stream, resumable, distributed, queue_options,
simple_handling, unlimited_retries = False, on_connect = None, on_disconnect = None,
on_error = None, controller_id = None):
super().__init__(daemon = True)
self.url = url
self.route = route
self.callback = callback
self.resumable = resumable
self.distributed = distributed
self.queue_options = queue_options
self.simple_handling = simple_handling
self._retry_count = 0
self._shared_stream = shared_stream
self._channel = None
self._queue_name = None
self._paused = False
self._stopped = False
self._controller_id = controller_id
self._unlimited_retries = unlimited_retries
self._on_connect = on_connect
self._on_disconnect = on_disconnect
self._on_error = on_error
self._recovery_queue_name = 'RECOVERY.{}'.format(self.route)
assert not self._on_disconnect or callable(self._on_disconnect), 'The error handler must be callable.'
@staticmethod
[docs] def can_handle_route(routing_key):
""" Check if the consumer can handle the given routing key.
.. note:: the default implementation will handle all routes.
:param str routing_key: the routing key
"""
return True
@property
def queue_name(self):
return self._queue_name
@property
def stopped(self):
return self._stopped
def run(self):
log('debug', '{}: Active'.format(self._debug_route_name()))
while not self._stopped:
try:
self._listen()
except NoConnectionError as e:
self._retry_count += 1
if self._on_disconnect:
self._async_execute(self._on_disconnect)
log('error', '{}: Passed the error information occurred to the error handler'.format(self._debug_route_name()))
if self._retry_count < MAX_RETRY_COUNT:
log('info', '{}: Will re-listen to the queue in 1 second ({} attempt(s) left)'.format(self._debug_route_name(), MAX_RETRY_COUNT - self._retry_count))
time.sleep(1)
log('warning', 'Reconnecting to listen to {}'.format(self._debug_route_name()))
continue
if self._unlimited_retries:
log('info', '{}: Will re-listen to the queue in 5 second (unlimited retries)'.format(self._debug_route_name()))
time.sleep(5)
log('warning', 'Reconnecting to listen to {}'.format(self._debug_route_name()))
continue
log('warning', '{}: Unexpected connection loss detected ({})'.format(self._debug_route_name(), e))
self._shared_stream.append(SHARED_SIGNAL_CONNECTION_LOSS)
log('debug', '{}: Inactive'.format(self._debug_route_name()))
def resume(self):
if self.stopped:
log('debug', '{}: Already stopped (resume)'.format(self._debug_route_name()))
return
log('debug', '{}: Resuming on listening...'.format(self._debug_route_name()))
self._paused = False
def pause(self):
if self.stopped:
log('debug', '{}: Already stopped (pause)'.format(self._debug_route_name()))
return
log('debug', '{}: Temporarily stop listening...'.format(self._debug_route_name()))
self._paused = True
[docs] def stop(self):
""" Stop consumption """
log('debug', 'Stopping listening to {}...'.format(self._debug_route_name()))
self._channel.stop_consuming()
def _async_execute(self, callable_method, *args):
params = [*args]
params.append(self)
async_callback = threading.Thread(target = callable_method, args = params, daemon = True)
async_callback.start()
def _listen(self):
with active_connection(self.url, self._on_connect, self._on_disconnect) as channel:
self._channel = channel
self._queue_name = self._declare_topic_queue(channel) if self.distributed else self._declare_shared_queue(channel)
self._declare_recovery_queue(channel)
# Declare the callback wrapper for this route.
def callback_wrapper(channel, method_frame, header_frame, body):
time_sequence = [time.time()]
raw_message = body.decode('utf8')
log('info', '{}: Processing {}...'.format(self._debug_route_name(), raw_message))
try:
# This is inside the try-catch block to deal with malformed data.
decoded_message = json.loads(raw_message)
remote_signal = None
remote_target = None
if isinstance(decoded_message, dict) and 'remote_signal' in decoded_message:
log('debug', '{}: Received a remote signal'.format(self._debug_route_name()))
remote_signal = decoded_message['remote_signal']
remote_target = decoded_message.get('controller_id', None) or None
if remote_signal != RemoteSignal.PING and (not remote_target or remote_target != self._controller_id):
log('debug', '{}: Ignoring the remote signal (TARGET {}).'.format(
self._debug_route_name(),
remote_target or '(UNDEFINED)'
))
channel.basic_ack(delivery_tag = method_frame.delivery_tag)
log('debug', '{}: Ignored the remote signal (TARGET {}).'.format(
self._debug_route_name(),
remote_target or '(UNDEFINED)'
))
return
time_sequence.append(time.time())
if remote_signal == RemoteSignal.PING:
log('debug', '{}: Detected PING signal'.format(self._debug_route_name()))
channel.basic_ack(delivery_tag = method_frame.delivery_tag)
log('debug', '{}: Ready (post-ping)'.format(self._debug_route_name()))
return
if remote_signal == RemoteSignal.RESUME:
log('debug', '{}: Receive RESUME signal'.format(self._debug_route_name()))
self.resume()
channel.basic_ack(delivery_tag = method_frame.delivery_tag)
log('debug', '{}: Reactivated'.format(self._debug_route_name()))
return
if remote_signal == RemoteSignal.PAUSE:
log('debug', '{}: Receive PAUSE signal'.format(self._debug_route_name()))
self.pause()
channel.basic_ack(delivery_tag = method_frame.delivery_tag)
log('debug', '{}: Standing by...'.format(self._debug_route_name()))
return
if self._paused:
log('info', '{}: On STANDBY'.format(self._debug_route_name()))
channel.basic_nack(delivery_tag = method_frame.delivery_tag)
log('debug', '{}: Temporarily block itself for a moment'.format(self._debug_route_name()))
time.sleep(3)
log('debug', '{}: Ready (standby)'.format(self._debug_route_name()))
return
message = (
decoded_message
if self.simple_handling
else Message(
decoded_message,
{
'header': header_frame,
'method': method_frame,
}
)
)
self.callback(message)
# Acknowledge the delivery after the work is done.
channel.basic_ack(delivery_tag = method_frame.delivery_tag)
except Exception as unexpected_error:
error_info = {
'type' : type(unexpected_error).__name__,
'message' : str(unexpected_error),
'traceback' : traceback.format_exc(),
}
log('error', '{}: Exception raised while processing the message: {type}: {message}\n{traceback}'.format(
self._debug_route_name(),
**error_info,
))
# Store the message that cause error in the recovery queue.
republishing_options = {
'exchange' : '',
'routing_key' : self._recovery_queue_name,
'properties' : BasicProperties(content_type = 'application/json'),
'body' : json.dumps(
{
'controller' : self._controller_id,
'error' : error_info,
'message' : raw_message,
'when' : time.time(),
},
indent = 4
),
}
channel.basic_publish(**republishing_options)
# Acknowledge the delivery when an error occurs to DEQUEUE the message.
channel.basic_ack(delivery_tag = method_frame.delivery_tag)
if self._on_error:
self._async_execute(self._on_error, unexpected_error)
log('warning', '{}: Recovered from the unexpected error'.format(self._debug_route_name()))
log('debug', '{}: Ready (OK)'.format(self._debug_route_name()))
log('debug', '{}: Listening...'.format(self._debug_route_name()))
channel.basic_consume(callback_wrapper, self._queue_name)
# NOTE there is a bug in start_consuming that prevents stop_consuming from cleanly
# stopping message consumption. The following is a hack suggested in StackOverflow.
# channel.start_consuming()
try:
while channel._consumer_infos:
channel.connection.process_data_events(time_limit = 1)
if self._retry_count:
self._retry_count = 0
if self._on_connect:
self._async_execute(self._on_connect)
self._stopped = True
except ConnectionClosed as e:
raise NoConnectionError('The connection has been absurbly disconnected.')
log('debug', 'Stopped listening to {}'.format(self._debug_route_name()))
def _debug_route_name(self):
result = 'ROUTE {}/{}'.format(self._controller_id, self.route)
if self._queue_name:
return '{} (QUEUE {})'.format(result, self._queue_name)
return result
def _declare_shared_queue(self, channel):
queue_options = fill_in_the_blank(
{
'auto_delete': not self.resumable,
'durable' : self.resumable,
'queue' : self.route,
},
self.queue_options or {}
)
channel.queue_declare(**queue_options)
log('info', 'CONTROLLER {}: Declared a queue "{}"'.format(self._controller_id, self.route))
return self.route
def _declare_recovery_queue(self, channel):
queue_options = fill_in_the_blank(
{
'auto_delete': not self.resumable,
'durable' : self.resumable,
'queue' : self._recovery_queue_name,
},
self.queue_options or {}
)
channel.queue_declare(**queue_options)
log('info', 'CONTROLLER {}: Declared a recovery queue for ROUTE {}'.format(self._controller_id, self.route))
return self.route
def _declare_topic_queue(self, channel):
# Currently not supporting resumability.
queue_options = fill_in_the_blank(
{
'auto_delete': True,
'queue' : '',
},
self.queue_options or {}
)
channel.exchange_declare(
exchange = SHARED_TOPIC_EXCHANGE_NAME,
exchange_type = 'topic',
passive = False,
durable = True,
auto_delete = False,
)
response = channel.queue_declare(**queue_options)
temp_queue_name = response.method.queue
log('info', 'CONTROLLER {}: Declared a temporary queue "{}"'.format(self._controller_id, temp_queue_name))
log('info', 'CONTROLLER {}: Binding a temporary queue "{}" to route {}'.format(self._controller_id, temp_queue_name, self.route))
channel.queue_bind(temp_queue_name, SHARED_TOPIC_EXCHANGE_NAME, self.route)
log('info', 'CONTROLLER {}: Bound a temporary queue "{}" to route {}'.format(self._controller_id, temp_queue_name, self.route))
return temp_queue_name