Skip to main content

Simple transparent wrapper for a small portion of Pika. It has a nice interface.

Project description

Bunnymq

Talk to RabbitMQ in Python.

The official Python library to interact with RabbitMQ is Pika. However, I have never been a big fan of the interface Pika provides. It is too bloated for simple tasks. This wrapper tries to provide a nicer interface for a very small albeit useful portion of the aforementioned library.

Pull requests that are aimed at

  • making the code cleaner, more understandable and pythonic
  • better abstraction and interface
  • better implemetation
  • bug fixes
  • better documentation

are most welcome.

When in doubt always refer to the RabbitMQ guides and the Pika docs.

Features

  • Automatic serialization and deserialization of messages. Using a custom serializer is almost trivial.
  • Multiple consumer patterns.
  • Automatic retry while publishing
  • Automatic handling of connection failures while consuming
  • Automatic handling of message redeliveries because of failure to send acknowledgement at the end of processing. This is a frequent scenario for long running consumer tasks. If you have encountered this problem, do read the details.
  • Easy parallelization by starting multiple workers to share the load. No two consumers will ever get the same message.

It is very important that the code be readable. Pull requests that are aimed at making the code cleaner and more understandable are most welcome.

Install

pip install bunnymq

Usage

Wheather you want a producer or a consumer, you create the following object

>>> from bunnymq import Queue
>>> queue = Queue('test') 

This creates a queue named test assuming default values for other parameters listed below.

  1. username defaults to 'guest'
  2. password defaults to 'guest'
  3. serializer defaults to pickle module.
  4. Any keyword arguments that pika.ConnectionParameters takes except the parameter credentials.

This library works with the default exchange

If a queue exists in the broker with the same name but different properties (created by some other means), an exception will be raised.

There is only one simple way to publish a message to the queue, the put method. However, there are multiple patterns available for consumers

These are described below.

Basic Interface

Put

>>> queue.put({'a': 1})

The message is automatically serialized.

The queue, by default, is a priority queue with priorities ranging from 1 (low) through 10 (high). By default the value is 5. To put a message a with a custom priority,

>>> queue.put({'b': 1}, priority=8)

This message will be consumed before the ones with lower priority.

Get

>>> queue.get()
{'a': 1}

The message is automatically deserialized.

>>> q.get()

This will raise an exception:

Exception: The previous message was neither marked done nor requeued.

Mark done

>>> queue.task_done()

This informs the broker that message has been successfully processed. This may also be used where the message was not sucessfully processed but it is not to be retried.

The semantics of this method is somewhat different from the one in queue.Queue in the standard library, in that there is no queue.join in our case that is waiting for invocation of task_done.

Requeue

>>> queue.requeue()

This informs the broker that the message was not sucessfully processed and should be redelivered so that it can be retried.

Queue size

>>> len(queue)
2

Python standard library queue module has a Queue.qsize method and that is the [right] choice. However len is convenient. This may change in future.

Iterable Interface

The Queue class implemets iterable protocol, namely the __iter__ method, which enables the following:

for msg in queue:
    try:
        # handle the message
    except NonRetriableError:
        # do some logging, alerting ...
        queue.task_done()
    except RetriableError:
        # do some logging, alerting ...
        queue.requeue()
    except Exception:
        raise
    else:
        queue.task_done()

Decorator Interface

This one is the recommended usage because it is the cleanest.

@queue.on('message')  # <-- marked done automatically, if there are no errors
def process(msg):
    pass


@queue.on('error', NotFound, NotNeeded, requeue=False)  # <-- marked done automatically
def _(msg, e):
    # log the event
    pass


@queue.on('error', BlockedContent, requeue=False)  # <-- marked done automatically
def _(msg, e):
    # send an alert and log the event
    pass


@queue.on('error', RateLimited)  # <-- requeued automatically
def _(msg, e):
    # log the event
    pass


if __name__ == '__main__':
    queue.consume()

Any number of error handlers can be registered. In the Iterable interface this would make the try/except block extremely ugly.

Workflow

The queue object has an on method with which we can register handlers for two types of events:

  1. 'message': bound to a single callable, that processes the message
  2. 'error': bound to a list of callables, that handle the errors. The error handlers allow us to capture and process domain specific exceptions.

Once a message is consumed, the 'message' handler is invoked passing the message

  1. if there are no exceptions the message is marked done.
  2. If there are errors, the appropriate handler is invoked, depending on the type of the raised exception. The message is marked done or requeued depending on the requeue argument. By default it is True.
  3. If none of the handlers match, the message is requeued and the exception is re-raised.

Custom Serializers

A serializer is any object that implements two methods:

  1. dumps returns str/bytes
  2. loads that takes bytes.

Hence json module can be a drop in replacement.

>>> import json
>>> queue = Queue('test', serializer=json)

Here is an abstract class.

class MySerializer:
    def dumps(self, msg) -> bytes:
        raise NotImplementedError

    def loads(self, content:bytes):
        raise NotImplementedError

Multiple Consumers

Let consumer.py be the module that can be run as a main program.

python consumer.py

Alternatively, the module exists inside a package pkg, then:

python -m pkg.consumer

This starts a worker. To start another one, open another terminal and invoke this again. Now you have two consumers.

If the application is containeried, the consumers can be run in the background and scaling up and down is trivial with docker-compose.

Redelivery Issues

Pika recommends the consumers be designed to be idempotent.

In the event of network failure (or a node failure), messages can be redelivered, and consumers must be prepared to handle deliveries they have seen in the past.

This is important because if the task takes time and the connection has closed by the time it finishes, the acknoledgement cannot be sent. The reason being

Because delivery tags are scoped per channel, deliveries must be acknowledged on the same channel they were received on.

Assuming the consumer is not idempotent, in case of redelivery of such a message, it should be handled before the consumer recieves it again.

Two situations arise:

  1. The long running task completes sucessfully and wants to acknoledge. But the connection has closed and the new connection (and a new channel) won't accept the old delivery tag.
  2. The long running task encounters error and wants to requeue. But the connection has closed and the new connection (and a new channel) won't accept the old delivery tag.

Unacked messages are redelivered by default, so the 2nd situation should not be a problem. However, if one wants to solve the 1st situation, it has to be kept in mind that the consumer might have explicitly requested a requeue.

The current implementation addresses both the situations.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

bunnymq-0.0.2.tar.gz (7.1 kB view hashes)

Uploaded Source

Built Distribution

bunnymq-0.0.2-py2.py3-none-any.whl (6.9 kB view hashes)

Uploaded Python 2 Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page