Skip to main content

No project description provided

Project description

Event Bus in Python with RabbitMQ: Exploring Synchronous and Asynchronous Solutions

Today we're going to build an event bus with Python. It's an event bus according to my personal needs. The idea is to create a scalable event bus, with RabbitMQ as the message broker but easy to replace with another message broker such as MQTT or Redis. In fact, I've started with a memomry-based message broker. I'm not going to use never this on-memory message broker, but it was a good start to understand the basics of the event bus.

That's the on memory version:

import logging

from bus_queue.backend.memory.bus import MemoryEventBus as Bus
from bus_queue import EventBus

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X')

logger = logging.getLogger(__name__)


def callback(topic, msg):
    logger.info(f"Received: topic: {topic} msg: {msg}")


def main():
    backend = Bus()
    bus = EventBus(backend)

    bus.subscribe("test", callback)

    bus.publish("test", dict(hola="Gonzalo"))
    bus.wait()


if __name__ == "__main__":
    main()

This on-memory version uses this implementation:

from time import sleep
from typing import Callable, Dict, List, Any

from bus_queue import Backend


class MemoryEventBus(Backend):
    def __init__(self):
        self.subscribers: Dict[str, List[Callable[[str, Any], None]]] = {}

    def publish(self, topic: str, message: str) -> None:
        if topic in self.subscribers:
            for callback in self.subscribers[topic]:
                callback(topic, message)

    def broadcast(self, topic: str, payload: Any):
        self.publish(topic, payload)

    def subscribe(self, topic: str, callback: Callable[[str, Any], None]) -> None:
        if topic not in self.subscribers:
            self.subscribers[topic] = []
        self.subscribers[topic].append(callback)

    def wait(self):
        while True:
            sleep(1)

This implementation is a synchronous version. I also want to create an asynchronous version.

import asyncio
import logging

from bus_queue.backend.memory.assync_bus import AsyncMemoryEventBus as Bus
from bus_queue import AsyncEventBus

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X')

for l in ['asyncio', ]:
    logging.getLogger(l).setLevel(logging. WARNING)

logger = logging.getLogger(__name__)


async def callback(topic, msg):
    logger.info(f"Received: topic: {topic} msg: {msg}")


async def main():
    backend = Bus()
    bus = AsyncEventBus(backend)

    await bus.subscribe("test", callback)

    await bus.publish("test", dict(hola="Gonzalo"))
    await bus.wait()


if __name__ == "__main__":
    asyncio.run(main())
import asyncio
from typing import Callable, Dict, List, Any, Awaitable

from bus_queue import AsyncBackend


class AsyncMemoryEventBus(AsyncBackend):
    def __init__(self):
        self.subscribers: Dict[str, List[Callable[[str, Any], Awaitable[None]]]] = {}

    async def publish(self, topic: str, message: str):
        if topic in self.subscribers:
            tasks = [
                asyncio.create_task(subscriber(topic, message))
                for subscriber in self.subscribers[topic]
            ]
            await asyncio.gather(*tasks)

    async def broadcast(self, topic: str, message: str):
        await self.publish(topic, message)

    async def subscribe(self, topic: str, handler: Callable[[str, Any], Awaitable[None]]):
        if topic not in self.subscribers:
            self.subscribers[topic] = []
        self.subscribers[topic].append(handler)

    async def wait(self):
        await asyncio.Event().wait()

But this on-memory version is not useful for me. I want to use RabbitMQ as the message broker. I'm going to create also a synchronous and an asynchronous version also. In this version I´m going to create two kind of ways to publish messages. One way is a simple publish, and the other way is a broadcast. The broadcast is going to send the message to all the subscribers of the topic, and the publishing is going to send the message to only one subscriber, using a round-robin strategy.

The synchronous version: The listener:

import logging

from bus_queue import EventBus
from bus_queue.backend.rabbit.bus import RabbitEventBus as Bus

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X')

for l in ['pika', ]:
    logging.getLogger(l).setLevel(logging.WARNING)

logger = logging.getLogger(__name__)


def callback(topic, msg):
    logger.info(f"Received: topic: {topic} msg: {msg}")


def main():
    backend = Bus("amqp://guest:guest@localhost:5672/")
    bus = EventBus(backend)

    bus.subscribe("test", callback)
    bus.wait()


if __name__ == "__main__":
    main()

And the publisher:

import logging

from bus_queue.backend.rabbit.bus import RabbitEventBus as Bus
from bus_queue import EventBus

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X')

for l in ['pika',]:
    logging.getLogger(l).setLevel(logging. WARNING)

logger = logging.getLogger(__name__)


def main():
    backend = Bus("amqp://guest:guest@localhost:5672/")
    bus = EventBus(backend)

    bus.publish("test", dict(hola="Gonzalo"))
    bus.broadcast("test", "Hola, broadcast")


if __name__ == "__main__":
    main()

The implementation is like that:

import logging
from typing import Callable, Dict, Any, List

import pika

from bus_queue import Backend

logger = logging.getLogger(__name__)


def get_broadcast_exchange_from_topic(topic: str):
    return f"broadcast_{topic}"


class RabbitEventBus(Backend):
    def __init__(self, rabbitmq_url: str, max_retries: int = 3):
        self.rabbitmq_url = rabbitmq_url
        self.subscribers: Dict[str, List[Callable[[str, Any], None]]] = {}
        self.connection = None
        self.channel = None
        self.max_retries = max_retries

    def connect(self):
        self.connection = pika.BlockingConnection(pika.URLParameters(self.rabbitmq_url))
        self.channel = self.connection.channel()

    def broadcast(self, topic: str, payload: Any):
        if self.channel is None:
            self.connect()
        exchange = get_broadcast_exchange_from_topic(topic)
        self.channel.exchange_declare(exchange=exchange, exchange_type='fanout')
        self.channel.basic_publish(exchange=exchange, routing_key='', body=payload.encode())

    def publish(self, topic: str, payload: Any):
        if self.channel is None:
            self.connect()
        self.channel.basic_publish(exchange='', routing_key=topic, body=payload.encode())

    def subscribe(self, topic: str, handler: Callable[[str, Any], None]):
        if topic not in self.subscribers:
            self.subscribers[topic] = []
        self.subscribers[topic].append(handler)

        if self.channel is None:
            self.connect()

        self.channel.queue_declare(queue=topic, auto_delete=True)
        exchange = get_broadcast_exchange_from_topic(topic)
        self.channel.exchange_declare(exchange=exchange, exchange_type='fanout')

        result = self.channel.queue_declare(queue='', exclusive=True)
        queue_name = result.method.queue

        self.channel.queue_bind(exchange=exchange, queue=queue_name)

        def on_message(ch, method, properties, body):
            for subscriber in self.subscribers[topic]:
                try:
                    subscriber(topic, body.decode())
                    ch.basic_ack(delivery_tag=method.delivery_tag)
                except Exception as ex:
                    logger.exception(ex)
                    if method.delivery_tag <= self.max_retries:
                        logger.info(f"Retrying message ({method.delivery_tag}/{self.max_retries})")
                        self.channel.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
                    else:
                        logger.info(f"Max retries. max_retries: {self.max_retries})")
                        self.channel.basic_ack(delivery_tag=method.delivery_tag)

        self.channel.basic_consume(queue=topic, on_message_callback=on_message, auto_ack=False)
        self.channel.basic_consume(queue=queue_name, on_message_callback=on_message, auto_ack=False)

    def wait(self):
        if self.channel is None:
            self.connect()
        self.channel.basic_qos(prefetch_count=1)
        self.channel.start_consuming()

And the asynchronous version:

The listener:

import asyncio
import logging

from bus_queue import AsyncEventBus
from bus_queue.backend.rabbit.assync_bus import AsyncRabbitEventBus as Bus

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X')

for l in ['asyncio', 'aio-pika']:
    logging.getLogger(l).setLevel(logging. WARNING)

logger = logging.getLogger(__name__)


async def callback(topic, msg):
    logger.info(f"Received: topic: {topic} msg: {msg}")


async def main():
    backend = Bus("amqp://guest:guest@localhost:5672/")
    bus = AsyncEventBus(backend)

    await bus.subscribe("test", callback)
    await bus.wait()


if __name__ == "__main__":
    asyncio.run(main())

The implementation is like that:

import asyncio
import logging
from typing import Callable, Dict, List, Any, Awaitable

import aio_pika

from bus_queue import AsyncBackend

logger = logging.getLogger(__name__)


def get_broadcast_exchange_from_topic(topic: str):
    return f"broadcast_{topic}"


class AsyncRabbitEventBus(AsyncBackend):
    def __init__(self, rabbitmq_url: str, max_retries: int = 3):
        self.subscribers: Dict[str, List[Callable[[str, Any], Awaitable[None]]]] = {}
        self.rabbitmq_url = rabbitmq_url
        self.max_retries = max_retries

    async def broadcast(self, topic: str, payload: Any):
        connection = await aio_pika.connect_robust(self.rabbitmq_url)
        exchange_type = aio_pika.ExchangeType.FANOUT
        exchange = get_broadcast_exchange_from_topic(topic)
        async with connection:
            channel = await connection.channel()
            exchange = await channel.declare_exchange(exchange, exchange_type)
            await exchange.publish(
                aio_pika.Message(body=payload.encode()),
                routing_key=topic
            )

    async def publish(self, topic: str, payload: Any):
        connection = await aio_pika.connect_robust(self.rabbitmq_url)
        async with connection:
            channel = await connection.channel()
            await channel.default_exchange.publish(
                aio_pika.Message(body=payload.encode()),
                routing_key=topic
            )

    async def subscribe(self, topic: str, handler: Callable[[str, Any], Awaitable[None]]):
        if topic not in self.subscribers:
            self.subscribers[topic] = []
        self.subscribers[topic].append(handler)
        exchange = get_broadcast_exchange_from_topic(topic)

        connection = await aio_pika.connect_robust(self.rabbitmq_url)
        async with connection:
            channel = await connection.channel()

            direct_queue = await channel.declare_queue(topic, auto_delete=True)
            broadcast_exchange = await channel.declare_exchange(exchange, aio_pika.ExchangeType.FANOUT)
            broadcast_queue = await channel.declare_queue('', exclusive=True)
            await broadcast_queue.bind(broadcast_exchange)

            async def process_queue(queue_iter):
                async for message in queue_iter:
                    try:
                        await handler(topic, message.body.decode())
                        await message.ack()
                    except Exception as ex:
                        if message.delivery_tag <= self.max_retries:
                            logger.info(f"Retrying message ({message.delivery_tag}/{self.max_retries})")
                            await message.nack(requeue=True)
                        else:
                            logger.exception(ex)
                            logger.info(
                                f"Max retries. Discarding event (max_retries: {self.max_retries})")
                            await message.ack()

            async with direct_queue.iterator() as direct_queue_iter, broadcast_queue.iterator() as broadcast_queue_iter:
                await asyncio.gather(
                    process_queue(direct_queue_iter),
                    process_queue(broadcast_queue_iter)
                )

    async def wait(self):
        await asyncio.Event().wait()

And that's all. The library can be installed with poetry in both versions: async and sync. You can use pip or poetry to install the library.

For the sync version:
```bash
poetry add bus_queue --extras "sync"
pip install bus_queue[sync]

and for the async version:

poetry add bus_queue --extras "async"
pip install bus_queue[async]

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

bus_queue-0.5.1.tar.gz (5.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

bus_queue-0.5.1-py3-none-any.whl (8.0 kB view details)

Uploaded Python 3

File details

Details for the file bus_queue-0.5.1.tar.gz.

File metadata

  • Download URL: bus_queue-0.5.1.tar.gz
  • Upload date:
  • Size: 5.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.2 CPython/3.9.13 Darwin/24.2.0

File hashes

Hashes for bus_queue-0.5.1.tar.gz
Algorithm Hash digest
SHA256 8ef05083e642209be93b7a058c7807ffa73e745bedef6051b28e4f3ba64f7db8
MD5 5291c3e78e299b8ad49877e1753a65b4
BLAKE2b-256 8aeadbeaa62d0d9d8149eaa7e15c5dd7a7d9834f12524f43da8630ed15652ce5

See more details on using hashes here.

File details

Details for the file bus_queue-0.5.1-py3-none-any.whl.

File metadata

  • Download URL: bus_queue-0.5.1-py3-none-any.whl
  • Upload date:
  • Size: 8.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.2 CPython/3.9.13 Darwin/24.2.0

File hashes

Hashes for bus_queue-0.5.1-py3-none-any.whl
Algorithm Hash digest
SHA256 e92659cec96cd4571f8ad6699132ed37131a1fe201bddc9af879be08ea0d68fa
MD5 18aff257aabcf7b2b692eb5ae3fb3e69
BLAKE2b-256 75996c355ae192ece7aad942aad3f12b56184effff1a4177d7752c35fd1eca55

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page