No project description provided
Project description
ploomby
ploomby is a wrapper over aio_pika, which provides a more intuitive and simpler way to declare message handlers coming from RabbitMQ.
⚙️ Install
pip install ploomby
🚀 Quickstart
from pydantic import BaseModel
from ploomby.registry import HandlersRegistry, MessageConsumerRegistry
from ploomby.rabbit import RabbitConsumerFactory
handlers_registry = HandlersRegistry()
consumer_factory = RabbitConsumerFactory("amqp://admin:admin@localhost:5672")
consumer_registry = MessageConsumerRegistry(handlers_registry, consumer_factory)
class CreateUserDTO(BaseModel):
name: str
@handlers_registry.register("create_user")
async def create_user(dto: CreateUserDTO):
await ...
consumer = consumer_registry.register("queue1", "task_name")
After registering of consumer it starts to listen resourse(in RabbitMQ case it is a queue) immediately.
📦 Features
This lib let you to manage your handlers too flexible.
Creating registries for handlers requires nothing. It just need to decorate our handlers using register() method.
def register(self, key: MessageKeyType)
This method requires only key that will be used to identify message coming from broker anf take corresponding handler from registry.
Consumer registry takes over the management of the consumers and their creation and that is cause we need to use factories create consumers in setup we need. ploompy provides factory for creating simpliest consumer's implementation for RabbitMQ using aio_pika. But you can define your factory to define rules of setuping creating consumers. Factory just should implement interface of
class MessageConsumerFactory(Protocol):
async def create(self, message_key_name: str, *args, **kwargs) -> MessageConsumer: ...
where interface of consumer looks like
class MessageConsumer(Protocol):
"""
Interface of consumer that should be implemented to use in registries.
Message key name is a value that consumer should to use to get value from message headers(or other metadata)
to identify incoming messsage and get corresponding handler using get_handler_func provided in consume()
"""
message_key_name: str
async def connect(self) -> None: ...
async def disconnect(self) -> None: ...
async def consume(self, listen_for: str, get_handler_func: HandlerDependencyType):
"""
Starts consume events/messages from resourse
:param listen_for: Name of representation of what the consumer is subscribed to
:type listen_for: str
:param get_handler_func: Wrapper that retrieves message key and returns function, that handles raw data from broker
:type get_handler_func: HandlerDependencyType
examples get_handler_func::
def get_order_handler(key: str) -> Callable[[str | bytes], Awaitable[None]]:
handlers = {
"order.created": handle_order_created,
"order.updated": handle_order_updated,
"order.cancelled": handle_order_cancelled,
}
return handlers.get(key.decode("utf-8"), default_handler)
async def handle_order_created(raw_data: str | bytes) -> None:
order_data = json.loads(raw_data)
# Process order creation logic
await process_new_order(order_data)
async def handle_order_updated(raw_data: str | bytes) -> None:
order_data = json.loads(raw_data)
# Process order update logic
await update_order(order_data)
async def default_handler(raw_data: str | bytes) -> None:
logger.warning(f"Unhandled message key with data: {raw_data}")
"""
Yes, also you can define implementation of consumer.
Next we make a MessageConsumerRegistry. We need literally register():
async def register(
self,
listen_for: str,
message_key_name: str,
consumers_count: int = 1,
*factory_create_args,
**factory_create_kwargs
):
"""
Uses provided factory to create consumer instance and subscribe it on provided resource.
If want to use not built-in factories just define it according to required interface of factore and provide to registry
:param listen_for: Name of representation of what the consumer is subscribed to
:type listen_for: str
:param message_key_name: Value that consumer should to use to get value from message headers(or other metadata)
to identify incoming messsage and get corresponding handler using get_handler_func provided in consume()
:type message_key_name: str
:param consumers_count: Define count of consumers that will listen resource and will looking for message key name
:param factory_create_args: Args using to provide to create method of consumer factory
:param factory_create_kwargs: Kwargs using to provide to create method of consumer factory
"""
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file ploomby-0.1.8-py3-none-any.whl.
File metadata
- Download URL: ploomby-0.1.8-py3-none-any.whl
- Upload date:
- Size: 10.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8fd5c885fed5ec1cbdcf26f9330075c03f7c83f6ec7f8c5cfff9f9d4c873a843
|
|
| MD5 |
a582f0814a9f820fdba310b9bed2ca50
|
|
| BLAKE2b-256 |
f1a36b1afe45aa2eb23276d0d0f10fa33f6ba1a4f505525e6011444137481cd6
|