Skip to main content

Asyncio compatible wraper around the aio-pika library for consumining messages.

Project description

aio-bunny

An asyncio compatible wrapper around aio-pika that brings the follwoing advantages:

  • Simplified API: Abstracts the complexity of defining queues and exchanges, making it easy to set up consumer/producers.
  • Graceful Consumer Shutdown: Provides an API to stop consumers without disrupting unacknowledged tasks, ensuring a smooth and safe shutdown process.
  • Idempotent Consumers: Offers a straightforward API for integrating with external databases (e.g., Redis, MySQL) to ensure idempotency. As recommended in the RabbitMQ documentation, consumers configured with manual acknowledgment and concerned about avoiding duplicate message processing should implement idempotency.
  • Prometheus Integration: Integrates with Prometheus to collect useful metrics for monitoring the state of consumers and producers. You can easily choose whether to enable metric collection based on your needs.

Installation

pip install aio-bunny

Usage examples

In the simplest form, you can do the following to spin up a consumer:

import asyncio

from aio_bunny import Bunny


async def main():
    bunny = Bunny("amqp://guest:guest@127.0.0.1/")

    @bunny.consumer(
        queue="myqueue",
        exchange="myexchange",
        routing_key="key")
    async def on_message(msg: bytes):
        await asyncio.sleep(1)
        print('received:', msg.decode())


    await bunny.start()


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.create_task(main())
    loop.run_forever()


To shutdown the consumers gracefully, all you need to do is to call the stop method on the Bunny instance.
This method will stop the consumer from accepting new messages from the broker, waits for all inflight tasks to finish and exits cleanly.\

So In the following example, we want to stop the application whenever SIGINT OR SIGTERM is received:

import asyncio
import signal

from aio_bunny import Bunny


async def main():
    bunny = Bunny("amqp://guest:guest@127.0.0.1/")

    async def shutdown(signal, loop):
        print(f"Received exit signal: {signal.name}")
        await bunny.stop()  # Stopping the consumers here
        loop.stop()

    loop = asyncio.get_event_loop()
    # Call the shutdown funciton on any of these signals.
    signals = (signal.SIGTERM, signal.SIGINT)
    for s in signals:
        loop.add_signal_handler(s, lambda s=s: asyncio.create_task(shutdown(s, loop)))

    @bunny.consumer(
        queue="myqueue",
        exchange="myexchange",
        routing_key="key")
    async def on_message(msg: bytes):
        await asyncio.sleep(1)
        print('received:', msg.decode())


    await bunny.start()


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.create_task(main())
    loop.run_forever()

Versioning

This software follows Semantic Versioning

Development

Setting up development environment

Clone the project:

git clone repo_url
cd aio-bunny

Create a virtualenv for aio-bunny:

python3 -m venv venv
source venv/bin/activate

Install the requirements:

pip install -e '.[develop]'

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aio_bunny-0.0.3.tar.gz (6.0 kB view details)

Uploaded Source

Built Distribution

aio_bunny-0.0.3-py3-none-any.whl (6.8 kB view details)

Uploaded Python 3

File details

Details for the file aio_bunny-0.0.3.tar.gz.

File metadata

  • Download URL: aio_bunny-0.0.3.tar.gz
  • Upload date:
  • Size: 6.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.11.9

File hashes

Hashes for aio_bunny-0.0.3.tar.gz
Algorithm Hash digest
SHA256 a9b10a3d113b1f39c6ea55d96cb31e87d09164400aa75ba6863637073b4154f4
MD5 c8620ad606315ad13d841b600ab0931b
BLAKE2b-256 b1d1aaa963aa3257a9665632de8320c565a6a9ad9e5f030120f5a4e18d7a92c3

See more details on using hashes here.

File details

Details for the file aio_bunny-0.0.3-py3-none-any.whl.

File metadata

  • Download URL: aio_bunny-0.0.3-py3-none-any.whl
  • Upload date:
  • Size: 6.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.11.9

File hashes

Hashes for aio_bunny-0.0.3-py3-none-any.whl
Algorithm Hash digest
SHA256 1ea6fc73039de9f0c8850f3809ce19dc1ba280be6d3ae962e0b4ebd8df201fc9
MD5 e3a42dcd21317ba4043c4b120f7e666b
BLAKE2b-256 352ba16d4ce06136896f03226c1536c584e3d417a1c1bde2d9e44cc0567ded00

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page