Simple transparent wrapper for a small portion of Pika. It has a nice interface.
Project description
Bunnymq
Talk to RabbitMQ in Python.
The official Python library to interact with RabbitMQ is Pika. However, I have never been a big fan of the interface Pika provides. It is too bloated for simple tasks. This wrapper tries to provide a nicer interface for a very small albeit useful portion of the aforementioned library.
Pull requests that are aimed at
- making the code cleaner, more understandable and pythonic
- better abstraction and interface
- better implemetation
- bug fixes
- better documentation
are most welcome.
When in doubt always refer to the RabbitMQ guides and the Pika docs.
Features
- Automatic serialization and deserialization of messages. Using a custom serializer is almost trivial.
- Multiple consumer patterns.
- Automatic retry while publishing
- Automatic handling of connection failures while consuming
- Automatic handling of message redeliveries because of failure to send acknowledgement at the end of processing. This is a frequent scenario for long running consumer tasks. If you have encountered this problem, do read the details.
- Easy parallelization by starting multiple workers to share the load. No two consumers will ever get the same message.
It is very important that the code be readable. Pull requests that are aimed at making the code cleaner and more understandable are most welcome.
Install
pip install bunnymq
Usage
Wheather you want a producer or a consumer, you create the following object
>>> from bunnymq import Queue
>>> queue = Queue('test')
This creates a queue named test
assuming default values for other parameters listed below.
username
defaults to'guest'
password
defaults to'guest'
serializer
defaults topickle
module.- Any keyword arguments that
pika.ConnectionParameters
takes except the parametercredentials
.
This library works with the default exchange
If a queue exists in the broker with the same name but different properties (created by some other means), an exception will be raised.
There is only one simple way to publish a message to the queue, the put
method. However, there are multiple patterns available for consumers
- Basic interface
- Iterable interface
- Decorator interface (recommended)
These are described below.
Basic Interface
Put
>>> queue.put({'a': 1})
The message is automatically serialized.
The queue, by default, is a priority queue with priorities ranging from 1 (low) through 10 (high). By default the value is 5. To put a message a with a custom priority,
>>> queue.put({'b': 1}, priority=8)
This message will be consumed before the ones with lower priority.
Get
>>> queue.get()
{'a': 1}
The message is automatically deserialized.
>>> q.get()
This will raise an exception:
Exception: The previous message was neither marked done nor requeued.
Mark done
>>> queue.task_done()
This informs the broker that message has been successfully processed. This may also be used where the message was not sucessfully processed but it is not to be retried.
The semantics of this method is somewhat different from the one in
queue.Queue
in the standard library, in that there is noqueue.join
in our case that is waiting for invocation oftask_done
.
Requeue
>>> queue.requeue()
This informs the broker that the message was not sucessfully processed and should be redelivered so that it can be retried.
Queue size
>>> len(queue)
2
Python standard library
queue
module has aQueue.qsize
method and that is the [right] choice. Howeverlen
is convenient. This may change in future.
Iterable Interface
The Queue
class implemets iterable protocol, namely the __iter__
method, which enables the following:
for msg in queue:
try:
# handle the message
except NonRetriableError:
# do some logging, alerting ...
queue.task_done()
except RetriableError:
# do some logging, alerting ...
queue.requeue()
except Exception:
raise
else:
queue.task_done()
Decorator Interface
This one is the recommended usage because it is the cleanest.
@queue.on('message') # <-- marked done automatically, if there are no errors
def process(msg):
pass
@queue.on('error', NotFound, NotNeeded, requeue=False) # <-- marked done automatically
def _(msg, e):
# log the event
pass
@queue.on('error', BlockedContent, requeue=False) # <-- marked done automatically
def _(msg, e):
# send an alert and log the event
pass
@queue.on('error', RateLimited) # <-- requeued automatically
def _(msg, e):
# log the event
pass
if __name__ == '__main__':
queue.consume()
Any number of error handlers can be registered. In the Iterable interface this would make the try/except block extremely ugly.
Workflow
The queue
object has an on
method with which we can register handlers for two types of events:
'message'
: bound to a single callable, that processes the message'error'
: bound to a list of callables, that handle the errors. The error handlers allow us to capture and process domain specific exceptions.
Once a message is consumed, the 'message'
handler is invoked passing the message
- if there are no exceptions the message is marked done.
- If there are errors, the appropriate handler is invoked, depending on the type of the raised exception. The message is marked done or requeued depending on the
requeue
argument. By default it isTrue
. - If none of the handlers match, the message is requeued and the exception is re-raised.
Custom Serializers
A serializer is any object that implements two methods:
dumps
returnsstr
/bytes
loads
that takesbytes
.
Hence json
module can be a drop in replacement.
>>> import json
>>> queue = Queue('test', serializer=json)
Here is an abstract class.
class MySerializer:
def dumps(self, msg) -> bytes:
raise NotImplementedError
def loads(self, content:bytes):
raise NotImplementedError
Multiple Consumers
Let consumer.py
be the module that can be run as a main program.
python consumer.py
Alternatively, the module exists inside a package pkg
, then:
python -m pkg.consumer
This starts a worker. To start another one, open another terminal and invoke this again. Now you have two consumers.
If the application is containeried, the consumers can be run in the background and scaling up and down is trivial with docker-compose.
Redelivery Issues
Pika recommends the consumers be designed to be idempotent.
In the event of network failure (or a node failure), messages can be redelivered, and consumers must be prepared to handle deliveries they have seen in the past.
This is important because if the task takes time and the connection has closed by the time it finishes, the acknoledgement cannot be sent. The reason being
Because delivery tags are scoped per channel, deliveries must be acknowledged on the same channel they were received on.
Assuming the consumer is not idempotent, in case of redelivery of such a message, it should be handled before the consumer recieves it again.
Two situations arise:
- The long running task completes sucessfully and wants to acknoledge. But the connection has closed and the new connection (and a new channel) won't accept the old delivery tag.
- The long running task encounters error and wants to requeue. But the connection has closed and the new connection (and a new channel) won't accept the old delivery tag.
Unacked messages are redelivered by default, so the 2nd situation should not be a problem. However, if one wants to solve the 1st situation, it has to be kept in mind that the consumer might have explicitly requested a requeue.
The current implementation addresses both the situations.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for bunnymq-0.0.2-py2.py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 8a2c6bef2abd9c8119a97c8ee006353177cf3bb5798985e416fbdc83dd10fcd5 |
|
MD5 | 8866f95a70dd64dbaa8110cd40ce82a3 |
|
BLAKE2b-256 | 00fb0533c79b4befb607e1e696d5e7b5eec72436c2e43a2461d10df3ffe3ac8d |