Skip to main content

No project description provided

Project description

ploomby

PyPI version Python Downloads

ploomby is a wrapper over aio_pika, which provides a more intuitive and simpler way to declare message handlers coming from RabbitMQ.


⚙️ Install

pip install ploomby

🚀 Quickstart

from pydantic import BaseModel

from ploomby.registry import HandlersRegistry, MessageConsumerRegistry
from ploomby.rabbit import RabbitConsumerFactory

handlers_registry = HandlersRegistry()
consumer_factory = RabbitConsumerFactory("amqp://admin:admin@localhost:5672")
consumer_registry = MessageConsumerRegistry(handlers_registry, consumer_factory)


class CreateUserDTO(BaseModel):
    name: str


@handlers_registry.register("create_user")
async def create_user(dto: CreateUserDTO):
    await ...


consumer = consumer_registry.register("queue1", "task_name")

After registering of consumer it starts to listen resourse(in RabbitMQ case it is a queue) immediately.


📦 Features

This lib let you to manage your handlers too flexible.

Creating registries for handlers requires nothing. It just need to decorate our handlers using register() method.

def register(self, key: MessageKeyType)

This method requires only key that will be used to identify message coming from broker anf take corresponding handler from registry.


Consumer registry takes over the management of the consumers and their creation and that is cause we need to use factories create consumers in setup we need. ploompy provides factory for creating simpliest consumer's implementation for RabbitMQ using aio_pika. But you can define your factory to define rules of setuping creating consumers. Factory just should implement interface of

class MessageConsumerFactory(Protocol):
    async def create(self, message_key_name: str, *args, **kwargs) -> MessageConsumer: ...

where interface of consumer looks like

class MessageConsumer(Protocol):
    """
    Interface of consumer that should be implemented to use in registries.
    Message key name is a value that consumer should to use to get value from message headers(or other metadata)
    to identify incoming messsage and get corresponding handler using get_handler_func provided in consume() 
    """
    message_key_name: str

    async def connect(self) -> None: ...
    async def disconnect(self) -> None: ...

    async def consume(self, listen_for: str, get_handler_func: HandlerDependencyType):
        """
        Starts consume events/messages from resourse

        :param listen_for: Name of representation of what the consumer is subscribed to
        :type listen_for: str
        :param get_handler_func: Wrapper that retrieves message key and returns function, that handles raw data from broker
        :type get_handler_func: HandlerDependencyType

        examples get_handler_func::

            def get_order_handler(key: str) -> Callable[[str | bytes], Awaitable[None]]:
                handlers = {
                    "order.created": handle_order_created,
                    "order.updated": handle_order_updated,
                    "order.cancelled": handle_order_cancelled,
                }
                return handlers.get(key.decode("utf-8"), default_handler)

            async def handle_order_created(raw_data: str | bytes) -> None:
                order_data = json.loads(raw_data)
                # Process order creation logic
                await process_new_order(order_data)

            async def handle_order_updated(raw_data: str | bytes) -> None:
                order_data = json.loads(raw_data)
                # Process order update logic
                await update_order(order_data)

            async def default_handler(raw_data: str | bytes) -> None:
                logger.warning(f"Unhandled message key with data: {raw_data}")
        """

Yes, also you can define implementation of consumer.


Next we make a MessageConsumerRegistry. We need literally register():

async def register(
    self,
    listen_for: str,
    message_key_name: str,
    consumers_count: int = 1,
    *factory_create_args,
    **factory_create_kwargs
):
    """
    Uses provided factory to create consumer instance and subscribe it on provided resource.
    If want to use not built-in factories just define it according to required interface of factore and provide to registry

    :param listen_for: Name of representation of what the consumer is subscribed to
    :type listen_for: str
    :param message_key_name: Value that consumer should to use to get value from message headers(or other metadata)
    to identify incoming messsage and get corresponding handler using get_handler_func provided in consume() 
    :type message_key_name: str
    :param consumers_count: Define count of consumers that will listen resource and will looking for message key name
    :param factory_create_args: Args using to provide to create method of consumer factory
    :param factory_create_kwargs: Kwargs using to provide to create method of consumer factory
    """

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ploomby-0.1.8-py3-none-any.whl (10.5 kB view details)

Uploaded Python 3

File details

Details for the file ploomby-0.1.8-py3-none-any.whl.

File metadata

  • Download URL: ploomby-0.1.8-py3-none-any.whl
  • Upload date:
  • Size: 10.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.2

File hashes

Hashes for ploomby-0.1.8-py3-none-any.whl
Algorithm Hash digest
SHA256 8fd5c885fed5ec1cbdcf26f9330075c03f7c83f6ec7f8c5cfff9f9d4c873a843
MD5 a582f0814a9f820fdba310b9bed2ca50
BLAKE2b-256 f1a36b1afe45aa2eb23276d0d0f10fa33f6ba1a4f505525e6011444137481cd6

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page