import os
import json
import math
import sys
import threading
import traceback
import time
import uuid
from pika import BasicProperties
from pika.exceptions import ConnectionClosed, ChannelClosed, IncompatibleProtocolError
from ...helper import log, debug_mode_enabled
from ...model import Message, RemoteSignal
from .exception import NoConnectionError
from .helper import active_connection, fill_in_the_blank, SHARED_DIRECT_EXCHANGE_NAME, SHARED_TOPIC_EXCHANGE_NAME, SHARED_SIGNAL_CONNECTION_LOSS
IMMEDIATE_RETRY_LIMIT = 5
MAX_RETRY_COUNT = 60
MAX_RETRY_TIMEOUT = 30
PING_MESSAGE = 'ping'
[docs]class Consumer(threading.Thread):
""" Message consumer
This is used to handle messages on one particular route/queue.
:param str url: the URL to the server
:param str route: the route to observe
:param callable callback: the callback function / callable object
:param list shared_stream: the internal message queue for thread synchronization
:param bool resumable: the flag to indicate whether the consumption is resumable
:param bool resumable: the flag to indicate whether the messages are distributed evenly across all consumers on the same route
:param dict queue_options: additional queue options
:param dict exchange_options: additional exchange options
:param bool unlimited_retries: the flag to disable limited retry count.
:param callable on_connect: a callback function when the message consumption begins.
:param callable on_disconnect: a callback function when the message consumption is interrupted due to unexpected disconnection.
:param callable on_error: a callback function when the message consumption is interrupted due to exception raised from the main callback function.
:param str controller_id: the associated controller ID
:param dict exchange_options: the additional options for exchange
:param bool auto_acknowledge: the flag to determine whether the consumer should auto-acknowledge any delivery (default: ``False``)
:param bool send_sigterm_on_disconnect: the flag to force the consumer to terminate the process cleanly on disconnection (default: ``True``)
:param float delay_per_message: the delay per message (any negative numbers are regarded as zero, zero or any equivalent value is regarded as "no delay")
:param int max_retries: the maximum total retries the consumer can have
:param int immediate_retry_limit: the maximum immediate retries the consumer can have before it uses the exponential delay
Here is an example for ``on_connect``.
.. code-block:: Python
def on_connect(consumer = None, controller_id = None, route = None, queue_name = None, summary = None):
...
Here is an example for ``on_disconnect``.
.. code-block:: Python
def on_disconnect(consumer = None, controller_id = None, route = None, queue_name = None, summary = None):
...
Here is an example for ``on_error``.
.. code-block:: Python
def on_error(exception, consumer = None, controller_id = None, route = None, queue_name = None, summary = None):
...
"""
def __init__(self, url, route, callback, shared_stream, resumable, distributed, queue_options,
simple_handling, unlimited_retries = False, on_connect = None, on_disconnect = None,
on_error = None, controller_id = None, exchange_options = None, auto_acknowledge = False,
send_sigterm_on_disconnect = True, delay_per_message = 0, max_retries = None,
immediate_retry_limit = None, max_retry_timeout = None):
super().__init__(daemon = True)
queue_options = queue_options if queue_options and isinstance(queue_options, dict) else {}
exchange_options = exchange_options if exchange_options and isinstance(exchange_options, dict) else {}
self.url = url
self.route = route
self.callback = callback
self.resumable = resumable
self.distributed = distributed
self.queue_options = queue_options
self.exchange_options = exchange_options
self.simple_handling = simple_handling
self._retry_count = 0
self._shared_stream = shared_stream
self._channel = None
self._queue_name = None
self._paused = False
self._stopped = False
self._controller_id = controller_id
self._consumer_id = str(uuid.uuid4())
self._max_retries = max_retries or MAX_RETRY_COUNT
self._max_retry_timeout = max(max_retry_timeout or 0, MAX_RETRY_TIMEOUT)
self._immediate_retry_limit = min(immediate_retry_limit or IMMEDIATE_RETRY_LIMIT, self._max_retries)
self._send_sigterm_on_disconnect = send_sigterm_on_disconnect
self._delay_per_message = (
delay_per_message
if (
delay_per_message
and isinstance(delay_per_message, (int, float))
and delay_per_message > 0
)
else 0
)
self._auto_acknowledge = auto_acknowledge
self._unlimited_retries = unlimited_retries
self._on_connect = on_connect
self._on_disconnect = on_disconnect
self._on_error = on_error
self._recovery_queue_name = 'RECOVERY.{}'.format(self.route)
assert not self._on_disconnect or callable(self._on_disconnect), 'The error handler must be callable.'
self._inspecting_property_names = sorted([
'_controller_id',
'_consumer_id',
'_max_retries',
'_max_retry_timeout',
'_immediate_retry_limit',
'_auto_acknowledge',
'_unlimited_retries',
'_on_connect',
'_on_disconnect',
'_on_error',
])
[docs] @staticmethod
def can_handle_route(routing_key):
""" Check if the consumer can handle the given routing key.
.. note:: the default implementation will handle all routes.
:param str routing_key: the routing key
"""
return True
@property
def queue_name(self):
return self._queue_name
@property
def stopped(self):
return self._stopped
@property
def remaining_retries(self):
return self._max_retries - self._retry_count
[docs] def run(self):
log('debug', '{}: Active'.format(self._debug_route_name()))
for property_name in self._inspecting_property_names:
log(
'debug',
'{}: PROP {} = {}'.format(
self._debug_route_name(),
property_name,
getattr(self, property_name) if hasattr(self, property_name) else '(not defined)',
)
)
while not self._stopped:
try:
self._listen()
except NoConnectionError as e:
self._retry_count += 1
remaining_retries = self.remaining_retries
can_immediate_retry = self._retry_count <= self._immediate_retry_limit
retry_delay_factor = self._retry_count - self._immediate_retry_limit - 1
wait_time = min(
(
0
if can_immediate_retry
else math.pow(2, retry_delay_factor if retry_delay_factor < 8 else 8)
),
self._max_retry_timeout,
)
# Notify the unexpected disconnection
# Attempt to retry and skip the rest of error handling routine.
if remaining_retries >= 0:
log(
'info',
'{}: Will reconnect to the queue in {}s ({} attempt(s) left)'.format(
self._debug_route_name(),
wait_time,
remaining_retries,
)
)
# Give a pause between each retry if the code already retries immediate too often.
if wait_time:
time.sleep(wait_time)
log('warning', '{}: Reconnecting...'.format(self._debug_route_name()))
continue
elif self._on_disconnect:
log('warning', '{}: {} the maximum retries (retry #{}/{})'.format(self._debug_route_name(), 'Reached' if self._retry_count == self._max_retries else 'Exceeded', self._retry_count, self._max_retries))
self._handle_complete_disconnection()
if self._unlimited_retries:
log('info', '{}: Will re-listen to the queue in {} second (retry #{})'.format(self._debug_route_name(), wait_time, self._retry_count))
time.sleep(wait_time)
log('warning', '{}: Reconnecting...'.format(self._debug_route_name()))
continue
log('warning', '{}: Unexpected connection loss detected ({})'.format(self._debug_route_name(), e))
self._shared_stream.append(SHARED_SIGNAL_CONNECTION_LOSS)
self._handle_complete_disconnection()
log('debug', '{}: Inactive'.format(self._debug_route_name()))
def resume(self):
if self.stopped:
log('debug', '{}: Already stopped (resume)'.format(self._debug_route_name()))
return
log('debug', '{}: Resuming on listening...'.format(self._debug_route_name()))
self._paused = False
def pause(self):
if self.stopped:
log('debug', '{}: Already stopped (pause)'.format(self._debug_route_name()))
return
log('debug', '{}: Temporarily stop listening...'.format(self._debug_route_name()))
self._paused = True
[docs] def stop(self):
""" Stop consumption """
log('debug', 'Stopping listening to {}...'.format(self._debug_route_name()))
if self._channel:
self._channel.stop_consuming()
def _handle_complete_disconnection(self):
log('warning', '{}: Unable to recover the connection'.format(self._debug_route_name()))
if self._send_sigterm_on_disconnect:
log('warning', '{}: Forwarding to the disconnection handler.'.format(self._debug_route_name()))
self._invoke_callback(self._on_disconnect)
log('error', '{}: Triggering the process termination due to unrecoverable disconnection.'.format(self._debug_route_name()))
os._exit(1)
self._async_invoke_callback(self._on_disconnect)
log('error', '{}: The consumer is terminated but would trigger the process termination.'.format(self._debug_route_name()))
def _async_invoke_callback(self, callable_method, *args, **kwargs):
async_callback = threading.Thread(target = self._invoke_callback, args = [callable_method, *args], kwargs = kwargs, daemon = True)
async_callback.start()
def _invoke_callback(self, callable_method, *args, **kwargs):
params = [*args]
params.append(self)
kw_params = dict(controller_id = self._controller_id, route = self.route, queue_name = self._queue_name, **kwargs)
callable_method(*params, **kw_params)
def _listen(self):
on_connect = self._on_connect
on_disconnect = self._on_disconnect if self.remaining_retries < 1 else None
with active_connection(self.url, on_connect, on_disconnect) as channel:
self._channel = channel
self._queue_name = (self._declare_topic_queue if self.distributed else self._declare_shared_queue)(channel)
self._declare_recovery_queue(channel)
# Declare the callback wrapper for this route.
def callback_wrapper(channel, method_frame, header_frame, body):
time_sequence = [time.time()]
raw_message = body.decode('utf8')
message_id = str(uuid.uuid4())
log('debug', '{}: MESSAGE {}: Receiving a message...'.format(self._debug_route_name(), message_id))
# Process the message
try:
# This is inside the try-catch block to deal with malformed data.
decoded_message = json.loads(raw_message)
remote_signal = None
remote_target = None
if isinstance(decoded_message, dict) and 'remote_signal' in decoded_message:
log('debug', '{}: Received a remote signal'.format(self._debug_route_name()))
remote_signal = decoded_message['remote_signal']
remote_target = decoded_message.get('controller_id', None) or None
if remote_signal != RemoteSignal.PING:
if not remote_target:
log('debug', '{}: Unable to find the remote target.'.format(
self._debug_route_name(),
))
# Acknowledge the message to discard it as it is an invalid remote command.
if not self._auto_acknowledge:
channel.basic_ack(delivery_tag = method_frame.delivery_tag)
log('debug', '{}: Discard to an invalid remote command.'.format(
self._debug_route_name(),
))
return
elif remote_target != self._controller_id:
log('debug', '{}: Ignoring the remote signal (TARGET {}).'.format(
self._debug_route_name(),
remote_target
))
# Not acknowledge the message to requeue it as this remote command
# is not for the current consumer.
channel.basic_nack(delivery_tag = method_frame.delivery_tag)
log('debug', '{}: Ignored the remote signal (TARGET {}).'.format(
self._debug_route_name(),
remote_target
))
return
time_sequence.append(time.time())
if remote_signal == RemoteSignal.PING:
log('debug', '{}: Detected PING signal'.format(self._debug_route_name()))
if not self._auto_acknowledge:
channel.basic_ack(delivery_tag = method_frame.delivery_tag)
log('debug', '{}: Ready (post-ping)'.format(self._debug_route_name()))
return
if remote_signal == RemoteSignal.RESUME:
log('debug', '{}: Receive RESUME signal'.format(self._debug_route_name()))
self.resume()
if not self._auto_acknowledge:
channel.basic_ack(delivery_tag = method_frame.delivery_tag)
log('debug', '{}: Reactivated'.format(self._debug_route_name()))
return
if remote_signal == RemoteSignal.PAUSE:
log('debug', '{}: Receive PAUSE signal'.format(self._debug_route_name()))
self.pause()
if not self._auto_acknowledge:
channel.basic_ack(delivery_tag = method_frame.delivery_tag)
log('debug', '{}: Standing by...'.format(self._debug_route_name()))
return
if self._paused:
log('info', '{}: On STANDBY'.format(self._debug_route_name()))
if not self._auto_acknowledge:
channel.basic_nack(delivery_tag = method_frame.delivery_tag)
log('debug', '{}: Temporarily block itself for a moment'.format(self._debug_route_name()))
time.sleep(3)
log('debug', '{}: Ready (standby)'.format(self._debug_route_name()))
return
message = (
decoded_message
if self.simple_handling
else Message(
decoded_message,
{
'header': header_frame,
'method': method_frame,
}
)
)
self.callback(message)
except Exception as unexpected_error:
self._handle_error_during_consumption(
message_id,
raw_message,
unexpected_error,
traceback.format_exc(),
'error detected while processing MESSAGE {}'.format(message_id),
)
log('error', '{}: MESSAGE {}: Error detected while processing the message'.format(self._debug_route_name(), message_id))
# Acknowledge the delivery when an error occurs to DEQUEUE the message.
if not self._auto_acknowledge:
channel.basic_ack(delivery_tag = method_frame.delivery_tag)
log('warning', '{}: MESSAGE {}: Recovered from the unexpected error'.format(self._debug_route_name(), message_id))
# Acknowledge the delivery after the work is done.
try:
if not self._auto_acknowledge:
channel.basic_ack(delivery_tag = method_frame.delivery_tag)
except Exception as unexpected_error:
self._handle_error_during_consumption(
message_id,
raw_message,
unexpected_error,
traceback.format_exc(),
'error detected while acknowledging MESSAGE {}'.format(message_id),
)
log('error', '{}: MESSAGE {}: Error detected while acknowledging the message delivery ({})'.format(self._debug_route_name(), message_id, method_frame.delivery_tag))
if self._stopped:
log('warning', '{}: Consumer is now terminating.'.format(self._debug_route_name()))
return
if self._delay_per_message:
log('debug', '{}: Pausing for {:.3f}s (PAUSED)'.format(self._debug_route_name(), self._delay_per_message))
time.sleep(self._delay_per_message)
log('debug', '{}: Back to action (RESUMED)'.format(self._debug_route_name()))
log('debug', '{}: Ready (OK)'.format(self._debug_route_name()))
log(
'debug',
'{}: Listening... (ACK: {})'.format(
self._debug_route_name(),
'AUTO' if self._auto_acknowledge else 'MANUAL',
),
)
channel.basic_consume(
callback_wrapper,
self._queue_name,
no_ack = self._auto_acknowledge, # No delivery acknowledgement needed
)
# NOTE there is a bug in start_consuming that prevents stop_consuming from cleanly
# stopping message consumption. The following is a hack suggested in StackOverflow.
# channel.start_consuming()
try:
while channel._consumer_infos:
channel.connection.process_data_events(time_limit = 1)
if self._retry_count:
self._retry_count = 0
if self._on_connect:
self._async_invoke_callback(self._on_connect)
self._stopped = True
except ConnectionClosed as e:
raise NoConnectionError('the connection has been absurbly disconnected while pending for message')
log('debug', 'Stopped listening to {}'.format(self._debug_route_name()))
def _handle_error_during_consumption(self, message_id, raw_message, error, execution_trace, summary):
with active_connection(self.url, self._on_connect, self._on_disconnect) as channel:
error_info = {
'type' : type(error).__name__,
'message' : str(error),
'traceback' : execution_trace,
}
log('error', '{}: Unexpected error detected: {type}: {message}\n{traceback}'.format(
self._debug_route_name(),
**error_info,
))
# Store the message that cause error in the recovery queue.
republishing_options = {
'exchange' : '',
'routing_key' : self._recovery_queue_name,
'properties' : BasicProperties(content_type = 'application/json'),
'body' : json.dumps(
{
'controller' : self._controller_id,
'error' : error_info,
'message' : raw_message,
'when' : time.time(),
},
indent = 4,
sort_keys = True,
),
}
channel.basic_publish(**republishing_options)
if isinstance(error, (ConnectionClosed, ChannelClosed)):
log('error', '{}: Connection Error: {type}: {message}\n{traceback}'.format(
self._debug_route_name(),
**error_info,
))
raise NoConnectionError('unexpected disconnected while handling a message ({})'.format(summary))
if self._on_error:
self._async_invoke_callback(self._on_error, error, summary = summary)
def _debug_route_name(self):
segments = [
'ROUTE {}'.format(self.route),
'CONTROLLER {}'.format(self._controller_id),
]
# if debug_mode_enabled:
# segments.append('CONSUMER {}'.format(self._consumer_id))
if self.route != self._queue_name:
segments.append('QUEUE {}'.format(self._queue_name))
return '/'.join(segments)
def _declare_shared_queue(self, channel):
queue_name = self._declare_queue(
channel,
{
'auto_delete' : not self.resumable,
'durable' : self.resumable,
'queue' : self.route,
}
)
log('info', '[_declare_shared_queue] CONTROLLER {}: Declared a shared queue "{}"'.format(self._controller_id, queue_name))
exchange_options = dict(
exchange = self.exchange_options.get('name', SHARED_DIRECT_EXCHANGE_NAME),
exchange_type = self.exchange_options.get('type', 'direct'),
passive = self.exchange_options.get('passive', False),
durable = self.exchange_options.get('durable', True),
auto_delete = self.exchange_options.get('auto_delete', False),
)
self._bind_queue(channel, queue_name, exchange_options)
return self.route
def _declare_recovery_queue(self, channel):
queue_name = self._declare_queue(
channel,
{
'auto_delete': not self.resumable,
'durable' : self.resumable,
'queue' : self._recovery_queue_name,
}
)
log('info', '[_declare_recovery_queue] CONTROLLER {}: Declared a recovery queue for ROUTE {}'.format(self._controller_id, queue_name))
return self.route
def _declare_topic_queue(self, channel):
# Currently not supporting resumability.
temp_queue_name = self._declare_queue(
channel,
{
'auto_delete': True,
'queue' : '',
}
)
log('info', '[_declare_topic_queue] CONTROLLER {}: Declared a distributed queue "{}"'.format(self._controller_id, temp_queue_name))
exchange_options = dict(
exchange = self.exchange_options.get('name', SHARED_TOPIC_EXCHANGE_NAME),
exchange_type = self.exchange_options.get('type', 'topic'),
passive = self.exchange_options.get('passive', False),
durable = self.exchange_options.get('durable', True),
auto_delete = self.exchange_options.get('auto_delete', False),
)
self._bind_queue(channel, temp_queue_name, exchange_options)
return temp_queue_name
def _declare_queue(self, channel, default_queue_options):
queue_options = fill_in_the_blank(
default_queue_options,
self.queue_options or {}
)
response = channel.queue_declare(**queue_options)
queue_name = response.method.queue
return queue_name
def _bind_queue(self, channel, queue_name, exchange_options):
assert 'exchange' in exchange_options
exchange_name = exchange_options['exchange']
debugging_info = (self._controller_id, queue_name, self.route, exchange_name)
channel.exchange_declare(**exchange_options)
log('info', '[_bind_queue] CONTROLLER {}: Binding a queue "{}" to route {} on exchange {}'.format(*debugging_info))
channel.queue_bind(queue_name, exchange_name, self.route)
log('info', '[_bind_queue] CONTROLLER {}: Bound a queue "{}" to route {} on exchange {}'.format(*debugging_info))