Skip to main content

No project description provided

Project description

Event Bus in Python with RabbitMQ: Exploring Synchronous and Asynchronous Solutions

Today we're going to build an event bus with Python. It's an event bus according to my personal needs. The idea is to create a scalable event bus, with RabbitMQ as the message broker but easy to replace with another message broker such as MQTT or Redis. In fact, I've started with a memomry-based message broker. I'm not going to use never this on-memory message broker, but it was a good start to understand the basics of the event bus.

That's the on memory version:

import logging

from bus_queue.backend.memory.bus import MemoryEventBus as Bus
from bus_queue import EventBus

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X')

logger = logging.getLogger(__name__)


def callback(topic, msg):
    logger.info(f"Received: topic: {topic} msg: {msg}")


def main():
    backend = Bus()
    bus = EventBus(backend)

    bus.subscribe("test", callback)

    bus.publish("test", dict(hola="Gonzalo"))
    bus.wait()


if __name__ == "__main__":
    main()

This on-memory version uses this implementation:

from time import sleep
from typing import Callable, Dict, List, Any

from bus_queue import Backend


class MemoryEventBus(Backend):
    def __init__(self):
        self.subscribers: Dict[str, List[Callable[[str, Any], None]]] = {}

    def publish(self, topic: str, message: str) -> None:
        if topic in self.subscribers:
            for callback in self.subscribers[topic]:
                callback(topic, message)

    def broadcast(self, topic: str, payload: Any):
        self.publish(topic, payload)

    def subscribe(self, topic: str, callback: Callable[[str, Any], None]) -> None:
        if topic not in self.subscribers:
            self.subscribers[topic] = []
        self.subscribers[topic].append(callback)

    def wait(self):
        while True:
            sleep(1)

This implementation is a synchronous version. I also want to create an asynchronous version.

import asyncio
import logging

from bus_queue.backend.memory.assync_bus import AsyncMemoryEventBus as Bus
from bus_queue import AsyncEventBus

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X')

for l in ['asyncio', ]:
    logging.getLogger(l).setLevel(logging. WARNING)

logger = logging.getLogger(__name__)


async def callback(topic, msg):
    logger.info(f"Received: topic: {topic} msg: {msg}")


async def main():
    backend = Bus()
    bus = AsyncEventBus(backend)

    await bus.subscribe("test", callback)

    await bus.publish("test", dict(hola="Gonzalo"))
    await bus.wait()


if __name__ == "__main__":
    asyncio.run(main())
import asyncio
from typing import Callable, Dict, List, Any, Awaitable

from bus_queue import AsyncBackend


class AsyncMemoryEventBus(AsyncBackend):
    def __init__(self):
        self.subscribers: Dict[str, List[Callable[[str, Any], Awaitable[None]]]] = {}

    async def publish(self, topic: str, message: str):
        if topic in self.subscribers:
            tasks = [
                asyncio.create_task(subscriber(topic, message))
                for subscriber in self.subscribers[topic]
            ]
            await asyncio.gather(*tasks)

    async def broadcast(self, topic: str, message: str):
        await self.publish(topic, message)

    async def subscribe(self, topic: str, handler: Callable[[str, Any], Awaitable[None]]):
        if topic not in self.subscribers:
            self.subscribers[topic] = []
        self.subscribers[topic].append(handler)

    async def wait(self):
        await asyncio.Event().wait()

But this on-memory version is not useful for me. I want to use RabbitMQ as the message broker. I'm going to create also a synchronous and an asynchronous version also. In this version I´m going to create two kind of ways to publish messages. One way is a simple publish, and the other way is a broadcast. The broadcast is going to send the message to all the subscribers of the topic, and the publishing is going to send the message to only one subscriber, using a round-robin strategy.

The synchronous version: The listener:

import logging

from bus_queue import EventBus
from bus_queue.backend.rabbit.bus import RabbitEventBus as Bus

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X')

for l in ['pika', ]:
    logging.getLogger(l).setLevel(logging.WARNING)

logger = logging.getLogger(__name__)


def callback(topic, msg):
    logger.info(f"Received: topic: {topic} msg: {msg}")


def main():
    backend = Bus("amqp://guest:guest@localhost:5672/")
    bus = EventBus(backend)

    bus.subscribe("test", callback)
    bus.wait()


if __name__ == "__main__":
    main()

And the publisher:

import logging

from bus_queue.backend.rabbit.bus import RabbitEventBus as Bus
from bus_queue import EventBus

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X')

for l in ['pika',]:
    logging.getLogger(l).setLevel(logging. WARNING)

logger = logging.getLogger(__name__)


def main():
    backend = Bus("amqp://guest:guest@localhost:5672/")
    bus = EventBus(backend)

    bus.publish("test", dict(hola="Gonzalo"))
    bus.broadcast("test", "Hola, broadcast")


if __name__ == "__main__":
    main()

The implementation is like that:

import logging
from typing import Callable, Dict, Any, List

import pika

from bus_queue import Backend

logger = logging.getLogger(__name__)


def get_broadcast_exchange_from_topic(topic: str):
    return f"broadcast_{topic}"


class RabbitEventBus(Backend):
    def __init__(self, rabbitmq_url: str, max_retries: int = 3):
        self.rabbitmq_url = rabbitmq_url
        self.subscribers: Dict[str, List[Callable[[str, Any], None]]] = {}
        self.connection = None
        self.channel = None
        self.max_retries = max_retries

    def connect(self):
        self.connection = pika.BlockingConnection(pika.URLParameters(self.rabbitmq_url))
        self.channel = self.connection.channel()

    def broadcast(self, topic: str, payload: Any):
        if self.channel is None:
            self.connect()
        exchange = get_broadcast_exchange_from_topic(topic)
        self.channel.exchange_declare(exchange=exchange, exchange_type='fanout')
        self.channel.basic_publish(exchange=exchange, routing_key='', body=payload.encode())

    def publish(self, topic: str, payload: Any):
        if self.channel is None:
            self.connect()
        self.channel.basic_publish(exchange='', routing_key=topic, body=payload.encode())

    def subscribe(self, topic: str, handler: Callable[[str, Any], None]):
        if topic not in self.subscribers:
            self.subscribers[topic] = []
        self.subscribers[topic].append(handler)

        if self.channel is None:
            self.connect()

        self.channel.queue_declare(queue=topic, auto_delete=True)
        exchange = get_broadcast_exchange_from_topic(topic)
        self.channel.exchange_declare(exchange=exchange, exchange_type='fanout')

        result = self.channel.queue_declare(queue='', exclusive=True)
        queue_name = result.method.queue

        self.channel.queue_bind(exchange=exchange, queue=queue_name)

        def on_message(ch, method, properties, body):
            for subscriber in self.subscribers[topic]:
                try:
                    subscriber(topic, body.decode())
                    ch.basic_ack(delivery_tag=method.delivery_tag)
                except Exception as ex:
                    logger.exception(ex)
                    if method.delivery_tag <= self.max_retries:
                        logger.info(f"Retrying message ({method.delivery_tag}/{self.max_retries})")
                        self.channel.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
                    else:
                        logger.info(f"Max retries. max_retries: {self.max_retries})")
                        self.channel.basic_ack(delivery_tag=method.delivery_tag)

        self.channel.basic_consume(queue=topic, on_message_callback=on_message, auto_ack=False)
        self.channel.basic_consume(queue=queue_name, on_message_callback=on_message, auto_ack=False)

    def wait(self):
        if self.channel is None:
            self.connect()
        self.channel.basic_qos(prefetch_count=1)
        self.channel.start_consuming()

And the asynchronous version:

The listener:

import asyncio
import logging

from bus_queue import AsyncEventBus
from bus_queue.backend.rabbit.assync_bus import AsyncRabbitEventBus as Bus

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X')

for l in ['asyncio', 'aio-pika']:
    logging.getLogger(l).setLevel(logging. WARNING)

logger = logging.getLogger(__name__)


async def callback(topic, msg):
    logger.info(f"Received: topic: {topic} msg: {msg}")


async def main():
    backend = Bus("amqp://guest:guest@localhost:5672/")
    bus = AsyncEventBus(backend)

    await bus.subscribe("test", callback)
    await bus.wait()


if __name__ == "__main__":
    asyncio.run(main())

The implementation is like that:

import asyncio
import logging
from typing import Callable, Dict, List, Any, Awaitable

import aio_pika

from bus_queue import AsyncBackend

logger = logging.getLogger(__name__)


def get_broadcast_exchange_from_topic(topic: str):
    return f"broadcast_{topic}"


class AsyncRabbitEventBus(AsyncBackend):
    def __init__(self, rabbitmq_url: str, max_retries: int = 3):
        self.subscribers: Dict[str, List[Callable[[str, Any], Awaitable[None]]]] = {}
        self.rabbitmq_url = rabbitmq_url
        self.max_retries = max_retries

    async def broadcast(self, topic: str, payload: Any):
        connection = await aio_pika.connect_robust(self.rabbitmq_url)
        exchange_type = aio_pika.ExchangeType.FANOUT
        exchange = get_broadcast_exchange_from_topic(topic)
        async with connection:
            channel = await connection.channel()
            exchange = await channel.declare_exchange(exchange, exchange_type)
            await exchange.publish(
                aio_pika.Message(body=payload.encode()),
                routing_key=topic
            )

    async def publish(self, topic: str, payload: Any):
        connection = await aio_pika.connect_robust(self.rabbitmq_url)
        async with connection:
            channel = await connection.channel()
            await channel.default_exchange.publish(
                aio_pika.Message(body=payload.encode()),
                routing_key=topic
            )

    async def subscribe(self, topic: str, handler: Callable[[str, Any], Awaitable[None]]):
        if topic not in self.subscribers:
            self.subscribers[topic] = []
        self.subscribers[topic].append(handler)
        exchange = get_broadcast_exchange_from_topic(topic)

        connection = await aio_pika.connect_robust(self.rabbitmq_url)
        async with connection:
            channel = await connection.channel()

            direct_queue = await channel.declare_queue(topic, auto_delete=True)
            broadcast_exchange = await channel.declare_exchange(exchange, aio_pika.ExchangeType.FANOUT)
            broadcast_queue = await channel.declare_queue('', exclusive=True)
            await broadcast_queue.bind(broadcast_exchange)

            async def process_queue(queue_iter):
                async for message in queue_iter:
                    try:
                        await handler(topic, message.body.decode())
                        await message.ack()
                    except Exception as ex:
                        if message.delivery_tag <= self.max_retries:
                            logger.info(f"Retrying message ({message.delivery_tag}/{self.max_retries})")
                            await message.nack(requeue=True)
                        else:
                            logger.exception(ex)
                            logger.info(
                                f"Max retries. Discarding event (max_retries: {self.max_retries})")
                            await message.ack()

            async with direct_queue.iterator() as direct_queue_iter, broadcast_queue.iterator() as broadcast_queue_iter:
                await asyncio.gather(
                    process_queue(direct_queue_iter),
                    process_queue(broadcast_queue_iter)
                )

    async def wait(self):
        await asyncio.Event().wait()

And that's all. The library can be installed with poetry in both versions: async and sync. You can use pip or poetry to install the library.

For the sync version:
```bash
poetry add bus_queue --extras "sync"
pip install bus_queue[sync]

and for the async version:

poetry add bus_queue --extras "async"
pip install bus_queue[async]

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

bus_queue-0.5.3.tar.gz (5.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

bus_queue-0.5.3-py3-none-any.whl (8.0 kB view details)

Uploaded Python 3

File details

Details for the file bus_queue-0.5.3.tar.gz.

File metadata

  • Download URL: bus_queue-0.5.3.tar.gz
  • Upload date:
  • Size: 5.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.2 CPython/3.9.13 Darwin/24.2.0

File hashes

Hashes for bus_queue-0.5.3.tar.gz
Algorithm Hash digest
SHA256 89a77c0f2e388bf8c68d1bf72455df648b22e01a4e72e305a82cbe7d851bb6c5
MD5 4a1d201fb9f32a715c245201dda1f4fe
BLAKE2b-256 00ea8af93972c36b5fb1af5b8754cc9a610d345a2f5b465ab82dd77d63c5278d

See more details on using hashes here.

File details

Details for the file bus_queue-0.5.3-py3-none-any.whl.

File metadata

  • Download URL: bus_queue-0.5.3-py3-none-any.whl
  • Upload date:
  • Size: 8.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.2 CPython/3.9.13 Darwin/24.2.0

File hashes

Hashes for bus_queue-0.5.3-py3-none-any.whl
Algorithm Hash digest
SHA256 f39ae02823c7466f1e1e8561449a14d46514f200fa9f40aac785d8d572c12747
MD5 d6f88359b8037c2c755571e8fc3665a8
BLAKE2b-256 c66490f7458a69f07799ebe1e2d96ab727634f0183838966dc903b14e496141b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page