Simple transparent wrapper for a small portion of Pika. It has a nice interface.
Project description
Bunnymq
Simple transparent wrapper for a small portion of Pika. It has a nice interface.
Install
pip install bunnymq
If pika
works, this will also work.
Usage
Wheather you want a producer or a consumer, you create the following object
>>> from bunnymq import Queue
>>> queue = Queue('test')
This creates a queue named test
assuming default values for other parameters listed below.
username
defaults to'guest'
password
defaults to'guest'
serializer
defaults topickle
module. This can be any object that implements the interfacedumps
andloads
. Hencejson
module will also work out of the box.- Any keyword arguments that
pika.ConnectionParameters
takes except the parametercredentials
.
This library works with the default exchange
Producing to the queue has one interface, the put
method. However, there are multiple interfaces available for consumers
- Basic interface
- Iterable interface
- Decorator interface (recommended)
These are described below.
Basic Interface
Put
>>> queue.put({'a': 1})
The message is automatically serialized.
The queue, by default, is a priority queue with priorities ranging from 1 (low) through 10 (high). By default the value is 5. To put a message a with a custom priority,
>>> queue.put({'b': 1}, priority=8)
This message will be consumed before the ones with lower priority.
Get
>>> queue.get()
{'a': 1}
The message is automatically deserialized.
>>> q.get()
This will raise an exception:
Exception: The previous message was neither marked done nor requeued.
Mark done
>>> queue.task_done()
This sends a basic_ack
.
The semantics of this method is somewhat different from the one in
queue.Queue
in the standard library, in that there is noqueue.join
in our case that is waiting for invocation oftask_done
.
Requeue
>>> queue.requeue()
This sends a basic_reject
with requeue=True
Queue size
>>> len(queue)
2
Iterable Interface
for msg in queue:
try:
# handle the message
except NonRetriableError:
# do some logging, alerting ...
queue.task_done()
except RetriableError:
# do some logging, alerting ...
queue.requeue()
except Exception:
raise
else:
queue.task_done()
Decorator Interface
This one is the recommended usage because it is the cleanest.
@queue.handler
def process(msg):
pass
@queue.error_handler(E1, E2, requeue=False)
def non_retriable_error_1(msg, e):
pass
@queue.error_handler(E3, E4, requeue=False)
def non_retriable_error_2(msg, e):
pass
@queue.error_handler(E5, E6, ...)
def retriable_error(msg, e):
pass
Any number of error handlers can be registered. In the Iterable interface this would make the try/except block extremely ugly.
Workflow
The queue
object has two methods that are registration decorators:
queue.handler
: bound to a single callable, that processes the messagequeue.error_handler
: bound to a list of callables, that handle the errors
Once a message is consumed, queue.handler
callable is invoked passing the message
- if there are no exceptions the message is marked done.
- If there are errors, the appropriate handler is invoked, depending on the type of the raised exception. The message is marked done or requeued depending on the
requeue
argument. By default it isTrue
. - If none of the handlers match, the message is requeued and the exception is raised.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for bunnymq-0.0.1-py2.py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 751d60d5e1d0f7b01ad0d8621a709b3045c42ad3bd3f579d3497e2883f5d1fd8 |
|
MD5 | d28dc23747af770d261fc1ae5ca0bcd0 |
|
BLAKE2b-256 | d7ea301fdfe7e02cb1f39910da6402cf3d8134cd41015f6dc4e3c42d81731180 |