RabbitMQ client based on the pika library.
RabbitMQ client helpers based on pika
This project provides helper classes for using RabbitMQ in Python. It is
pika, which is an awesome no-dependency client library for
RabbitMQ. Similarly, this project strives for zero dependencies (except for
By using this project, users should be able to get started with RabbitMQ in
Python instantly, by simply instantiating and starting a
RMQConsumer extends the
RMQConnection base class with only one extra
consume. Consume can be passed parameters for declaring queues and
exchanges, as well as binding them together, and consume parameters, all of
which have corresponding kwargs in the pika library. The idea is not to
re-invent the wheel, but simply the process of declaring a queue -> declaring an
exchange -> binding the exchange and queue together -> consuming from the queue.
Here is an example:
from rabbitmq_client import RMQConsumer, ConsumeParams, QueueParams def on_message(msg, ack=None): ... consumer = RMQConsumer() consumer.start() consumer.consume(ConsumeParams(on_message), queue_params=QueueParams("queue_name"))
The flow of declaring, binding, and consuming is quite straightforward. The above example will declare a queue with the name "queue_name" and consume from it.
NOTE! Although the above may look synchronous, it is not. Start is
and any consume started while the consumer is not fully started will simply be
delayed until it is. When a consume has been successfully started, the bound
callback will receive a
ConsumeOK object containing the resulting
Acknowledging received messages
By default, received messages need to be acknowledged when received. By calling
ack kwarg function a message is acknowledged and won't be sent again.
If a message isn't acknowledged using this function, it will be re-sent by
RabbitMQ on consuming from a queue again.
from rabbitmq_client import RMQConsumer, ConsumeParams, QueueParams from some_other_module import handle_msg def on_message(msg, ack=None): error = handle_msg(msg) if not error: ack() consumer = RMQConsumer() consumer.start() consumer.consume(ConsumeParams(on_message), queue_params=QueueParams("queue_name"))
To enable automatic acknowledgement of messages, pass the
ConsumeParams, set to
ack kwarg is unset.
RMQProducer extends the
RMQConnection base class with two additional
activate_confirm_mode. Publish is used, as it
sounds, to publish messages towards queues and/or exchanges. The confirm
mode activation method enabled confirm mode so that users can verify that
messages have been delivered successfully.
from rabbitmq_client import RMQProducer, ExchangeParams def on_confirm(confirmation): ... producer = RMQProducer() producer.start() producer.activate_confirm_mode(on_confirm) # Or don't, depends on your needs producer.publish(b"body", exchange_params=ExchangeParams("exchange_name"), routing_key="some.routing.key")
activate_confirm_mode isn't synchronous either, but you don't have to
worry about that. Calling
activate_confirm_mode will lead
to the publish not happening until confirm mode has been activated
successfully. The callback passed to
activate_confirm_mode will also
ConfirmModeOK once confirm mode is on. Any publish between
activate_confirm_mode and the producer receiving a
confirm_select_ok from RabbitMQ will be buffered and not issues until
confirm mode is on. When confirm mode is on,
publish also returns a key that
clients can use to correlate successful delivered with calls to publish.
publish call with key X is confirmed, the callback passed to
activate_confirm_mode will be called with X.
Abstract connection helper
RMQConnection class can be subclassed to get a head start in
Channel objects as it wraps them
and provides an easy-to-use interface as well as event hooks on important
RMQConnection lifecycle hooks
RMQConnection requires the implementer to override three methods:
on_ready is called when
RMQConnection has established a connection and
opened a channel.
on_close is called when either the connection, or the channel closes for
any reason. This means the implementer may receive two calls for one failed
connection, one for the channel and one for the connection itself. This
makes it important that
on_close is made idempotent.
on_error is called when a recent action failed, such as an exchange
declaration failure. These hooks are meant to enable the implementer to react
to the connection state and restore an operating state. The
abstract base class is used by the
rabbitmq_client project to implement its
RMQConnection interface methods
In addition to the hooks that need to be implemented by implementing classes,
RMQConnection provides three public methods that can be used to interact
with the connection object:
start initiates the connection, establishing a
if that's successful, opening a
pika.Channel for the opened connection. Once
a channel has been opened,
RMQConnection will issue a call to
Subsequent calls to
start have no effect if the connection has already been
restart closes the open connection and ensures that it is started again once
is has been fully closed.
restart is only meant to be used on successfully
established connections, it will have no effect on closed connections.
restart is meant to be used as a means to change
on the fly.
stop permanently closes an open connection and will have no effect on a
closed connection. A connection for which
stop has been called cannot be
on_close is called once the connection is completely stopped.
Aside from the connection-related methods, the
RMQConnection also exposes
interations with the
pika.Channel, named similarily. See here for what is
exposed: Pika docs.
RMQConnection will re-establish lost connections, but not lost channels.
Reconnections will not be done for any reason though, among the reasons for
These two exceptions cover the cases where the broker has been shut down, either expectedly or unexpectedly, or when the connection is lost for some other reason.
Again, if the channel is lost, but the connection remains intact,
RMQConnection will not recover the channel.
Reconnection attempts will be made with an increasing delay between attempts. The first attempt is instantaneous, the second is delayed by 1 second, the third by 2 seconds, etc. After the 9th attempt, the following reconnects will be made at 30 second intervals.
rabbitmq_client follows Python logging standards and is by default disabled.
To enable logging, attach a handler to
import logging logging.getLogger("rabbitmq_client").addHandler(logging.StreamHandler())
By default, a
logging.NullHandler() is attached to this logger.
Release history Release notifications | RSS feed
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Hashes for rabbitmq_client-2.4.0-py2.py3-none-any.whl