Async RabbitMQ worker utilities
Project description
WorkerLib - асинхронная работа с RabbitMQ
Быстрый старт
import asyncio
from workerlib import WorkerPool
async def task_handler(data: dict) -> bool:
print(f"Обработка: {data}")
return True
async def main():
async with WorkerPool() as pool:
pool.add_worker("tasks", task_handler)
await pool.send("tasks", {"id": 1, "cmd": "start"})
await asyncio.sleep(2)
asyncio.run(main())
Формат сообщений
JSON сообщение Библиотека автоматически сериализует dict в JSON при отправке:
# Отправка простого сообщения
await pool.send("queue", {
"event": "user_created",
"user_id": 123,
"email": "user@example.com",
"timestamp": "2024-01-15T10:30:00Z"
})
# Отправка вложенных структур
await pool.send("queue", {
"type": "order",
"data": {
"order_id": "ORD-12345",
"items": [
{"id": 1, "quantity": 2},
{"id": 2, "quantity": 1}
],
"total": 299.99
},
"metadata": {
"source": "api",
"version": "1.0"
}
})
Основные примеры
- Пул с несколькими воркерами
from workerlib import WorkerPool, ErrorHandlingStrategy
async def main():
async with WorkerPool() as pool:
# Email воркер с DLQ
pool.add_worker(
queue_name="emails",
handler=email_handler,
error_strategy=ErrorHandlingStrategy.RETRY_THEN_DLQ,
prefetch_count=5
)
# Обработчик платежей
pool.add_worker(
queue_name="payments",
handler=payment_handler,
error_strategy=ErrorHandlingStrategy.REQUEUE_END
)
# Отправка задач
await pool.send("emails", {"to": "user@test.com"})
await pool.send("payments", {"amount": 100})
- Кастомное подключение и retry
from workerlib import ConnectionParams, RetryConfig
params = ConnectionParams(
host="rabbit.local",
username="admin",
password="secret"
)
retry_config = RetryConfig(
max_attempts=3,
initial_delay=1.0,
backoff_factor=2.0
)
async with WorkerPool(connection_params=params) as pool:
pool.add_worker(
queue_name="critical",
handler=critical_handler,
retry_config=retry_config
)
- Обработка ошибок
from workerlib import ErrorHandlingStrategy
# Варианты:
# IGNORE - проигнорировать ошибку
# REQUEUE_END - в конец очереди с задержкой
# REQUEUE_FRONT - в начало очереди
# DLQ - в Dead Letter Queue
# RETRY_THEN_DLQ - повторить, затем в DLQ
pool.add_worker(
queue_name="tasks",
handler=my_handler,
error_strategy=ErrorHandlingStrategy.RETRY_THEN_DLQ,
dlq_enabled=True,
requeue_delay=5.0 # задержка повторной обработки
)
- Отдельные компоненты
from workerlib import (
RabbitMQConnection,
RabbitMQQueue,
RabbitMQConsumer,
RabbitMQProducer
)
# Создание вручную
connection = RabbitMQConnection()
await connection.connect()
queue = RabbitMQQueue(connection, QueueConfig(name="my_queue"))
producer = RabbitMQProducer(connection, queue)
await producer.send({"test": "data"})
consumer = RabbitMQConsumer(queue, my_handler)
await consumer.consume()
- Batch отправка
async with WorkerPool() as pool:
messages = [
{"id": i, "data": f"item_{i}"}
for i in range(100)
]
tasks = [
pool.send("batch_queue", msg)
for msg in messages
]
await asyncio.gather(*tasks)
- Метрики
async with WorkerPool() as pool:
pool.add_worker("monitored", handler)
# Отправляем задачи
for i in range(10):
await pool.send("monitored", {"task": i})
# Получаем метрики
metrics = pool.get_metrics("monitored")
print(f"Обработано: {metrics['consumer']['processed']}")
print(f"Ошибок: {metrics['consumer']['failed']}")
- FastAPI интеграция
from fastapi import FastAPI
from workerlib import WorkerPool
app = FastAPI()
worker_pool = WorkerPool(auto_start=False)
@app.on_event("startup")
async def startup():
await worker_pool.start()
worker_pool.add_worker("api_tasks", task_handler)
@app.on_event("shutdown")
async def shutdown():
await worker_pool.stop()
@app.post("/task")
async def create_task(data: dict):
await worker_pool.send("api_tasks", data)
return {"status": "queued"}
Конфигурация
ConnectionParams
ConnectionParams(
host="127.0.0.1",
port=5672,
username="guest",
password="guest",
heartbeat=60,
timeout=10
)
QueueConfig
QueueConfig(
name="queue_name",
durable=True,
prefetch_count=1
)
RetryConfig
RetryConfig(
max_attempts=3,
initial_delay=1.0,
backoff_factor=2.0,
max_delay=60.0
)
Установка
pip install workerlib
Требования: Python 3.8+, aio_pika
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
workerlib-0.3.1.tar.gz
(14.7 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
workerlib-0.3.1-py3-none-any.whl
(11.2 kB
view details)
File details
Details for the file workerlib-0.3.1.tar.gz.
File metadata
- Download URL: workerlib-0.3.1.tar.gz
- Upload date:
- Size: 14.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2831bac95b5fe3a412b4e1968f98dc09ee8c03a03c7ec3565a88771331faebd8
|
|
| MD5 |
0ee374f14056817e2d72b78221a351bd
|
|
| BLAKE2b-256 |
0114c07090c0d8ceabfcade23a0f2f0f69de47561a185e267015423b6fb9a605
|
File details
Details for the file workerlib-0.3.1-py3-none-any.whl.
File metadata
- Download URL: workerlib-0.3.1-py3-none-any.whl
- Upload date:
- Size: 11.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
cca2350f8d1051c0a4f56177c74d6f4e3afa5850c40b1f587e92b6a416fe0b51
|
|
| MD5 |
b8a89a7f17dd8db731c61529c46817a7
|
|
| BLAKE2b-256 |
dbd62e8de1fc65d5cc33e848a80d6f0f08f6ff489b1a312ccbf2290d631e3dbf
|