RabbitMQ broker for taskiq
Project description
AioPika broker for taskiq
This library provides you with aio-pika broker for taskiq.
Features:
- Supports delayed messages using dead-letter queues or RabbitMQ delayed message exchange plugin.
- Supports message priorities.
- Supports multiple queues and custom routing.
Usage example:
from taskiq_aio_pika import AioPikaBroker
broker = AioPikaBroker(...)
@broker.task
async def test() -> None:
print("nothing")
Delays
Default delays
To send delayed message, you need to specify queue for delayed messages. You can do it by passing delay_queue parameter to the broker. For example:
from taskiq_aio_pika import AioPikaBroker, Queue, QueueType
broker = AioPikaBroker(
...,
delay_queue=Queue(name="taskiq.delay_queue"),
)
After that you have to specify delay label. You can do it with task decorator, or by using kicker.
In this type of delay we are using additional queue with expiration parameter. After declared time message will be deleted from delay queue and sent to the main queue. For example:
broker = AioPikaBroker(...)
@broker.task(delay=3)
async def delayed_task() -> int:
return 1
async def main():
await broker.startup()
# This message will be received by workers
# After 3 seconds delay.
await delayed_task.kiq()
# This message is going to be received after the delay in 4 seconds.
# Since we overridden the `delay` label using kicker.
await delayed_task.kicker().with_labels(delay=4).kiq()
# This message is going to be send immediately. Since we deleted the label.
await delayed_task.kicker().with_labels(delay=None).kiq()
# Of course the delay is managed by rabbitmq, so you don't
# have to wait delay period before message is going to be sent.
Delays with rabbitmq-delayed-message-exchange plugin
First of all please make sure that your RabbitMQ server has rabbitmq-delayed-message-exchange plugin installed.
Also you need to configure you broker by passing delayed_message_exchange_plugin=True to broker.
This plugin can handle tasks with different delay times well, and the delay based on dead letter queue is suitable for tasks with the same delay time. For example:
broker = AioPikaBroker(
delayed_message_exchange_plugin=True,
)
@broker.task(delay=3)
async def delayed_task() -> int:
return 1
async def main():
await broker.startup()
# This message will be received by workers
# After 3 seconds delay.
await delayed_task.kiq()
# This message is going to be received after the delay in 4 seconds.
# Since we overridden the `delay` label using kicker.
await delayed_task.kicker().with_labels(delay=4).kiq()
Priorities
You can define priorities for messages using priority label. Messages with higher priorities are delivered faster.
Before doing so please read the documentation about what downsides you get by using prioritized queues.
broker = AioPikaBroker(...)
# We can define default priority for tasks.
@broker.task(priority=2)
async def prio_task() -> int:
return 1
async def main():
await broker.startup()
# This message has priority = 2.
await prio_task.kiq()
# This message is going to have priority 4.
await prio_task.kicker().with_labels(priority=4).kiq()
# This message is going to have priority 0.
await prio_task.kicker().with_labels(priority=None).kiq()
Custom Queue and Exchange arguments
You can pass custom arguments to the underlying RabbitMQ queues and exchange declaration by using the Queue/Exchange classes from taskiq_aio_pika. If you used faststream before you are probably familiar with this concept.
These arguments will be merged with the default arguments used by the broker (such as dead-lettering and priority settings). If there are any conflicts, the values you provide will take precedence over the broker's defaults. Example:
from taskiq_aio_pika import AioPikaBroker, Queue, QueueType, Exchange
from aio_pika.abc import ExchangeType
broker = AioPikaBroker(
exchange=Exchange(
name="custom_exchange",
type=ExchangeType.TOPIC,
declare=True,
durable=True,
auto_delete=False,
),
task_queues=[
Queue(
name="custom_queue",
type=QueueType.CLASSIC,
declare=True,
durable=True,
max_priority=10,
routing_key="custom_queue",
)
]
)
This will ensure that the queue is created with your custom arguments, in addition to the broker's defaults.
Multiqueue support
You can define multiple queues for your tasks. Each queue can have its own routing key and other settings. And your workers can listen to multiple queues (or specific queue) as well.
You can check multiqueue usage example in examples folder for more details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file taskiq_aio_pika-0.6.0.tar.gz.
File metadata
- Download URL: taskiq_aio_pika-0.6.0.tar.gz
- Upload date:
- Size: 9.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.7 {"installer":{"name":"uv","version":"0.10.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0a4ec304a5e860e205aaea5077d90d2a009a4842f3ee008b5185c29301992ed9
|
|
| MD5 |
fe1ce4a4f0eba8c2f8bec1be0b612f79
|
|
| BLAKE2b-256 |
d505e9f4e5cbc7f9777a09f493e502242922df2d3e3779364d0292313995d68c
|
File details
Details for the file taskiq_aio_pika-0.6.0-py3-none-any.whl.
File metadata
- Download URL: taskiq_aio_pika-0.6.0-py3-none-any.whl
- Upload date:
- Size: 10.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.7 {"installer":{"name":"uv","version":"0.10.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6bff38b61b24afd7d41b78ea9ffca0702fe9653e82289ca1287b063a53af2145
|
|
| MD5 |
7462ba76df977e666cf2996dded9201c
|
|
| BLAKE2b-256 |
7457b06600675ef8ab6352f30632c0ece20d592f922531b3f490a0559ed792ea
|