Skip to main content

RabbitMQ broker for taskiq

Project description

AioPika broker for taskiq

PyPI - Python Version PyPI PyPI - Downloads

This library provides you with aio-pika broker for taskiq.

Features:

  • Supports delayed messages using dead-letter queues or RabbitMQ delayed message exchange plugin.
  • Supports message priorities.
  • Supports multiple queues and custom routing.

Usage example:

from taskiq_aio_pika import AioPikaBroker

broker = AioPikaBroker(...)

@broker.task
async def test() -> None:
    print("nothing")

Delays

Default delays

To send delayed message, you need to specify queue for delayed messages. You can do it by passing delay_queue parameter to the broker. For example:

from taskiq_aio_pika import AioPikaBroker, Queue, QueueType

broker = AioPikaBroker(
    ...,
    delay_queue=Queue(name="taskiq.delay_queue"),
)

After that you have to specify delay label. You can do it with task decorator, or by using kicker.

In this type of delay we are using additional queue with expiration parameter. After declared time message will be deleted from delay queue and sent to the main queue. For example:

broker = AioPikaBroker(...)

@broker.task(delay=3)
async def delayed_task() -> int:
    return 1

async def main():
    await broker.startup()
    # This message will be received by workers
    # After 3 seconds delay.
    await delayed_task.kiq()

    # This message is going to be received after the delay in 4 seconds.
    # Since we overridden the `delay` label using kicker.
    await delayed_task.kicker().with_labels(delay=4).kiq()

    # This message is going to be send immediately. Since we deleted the label.
    await delayed_task.kicker().with_labels(delay=None).kiq()

    # Of course the delay is managed by rabbitmq, so you don't
    # have to wait delay period before message is going to be sent.

Delays with rabbitmq-delayed-message-exchange plugin

First of all please make sure that your RabbitMQ server has rabbitmq-delayed-message-exchange plugin installed.

Also you need to configure you broker by passing delayed_message_exchange_plugin=True to broker.

This plugin can handle tasks with different delay times well, and the delay based on dead letter queue is suitable for tasks with the same delay time. For example:

broker = AioPikaBroker(
    delayed_message_exchange_plugin=True,
)

@broker.task(delay=3)
async def delayed_task() -> int:
    return 1

async def main():
    await broker.startup()
    # This message will be received by workers
    # After 3 seconds delay.
    await delayed_task.kiq()

    # This message is going to be received after the delay in 4 seconds.
    # Since we overridden the `delay` label using kicker.
    await delayed_task.kicker().with_labels(delay=4).kiq()

Priorities

You can define priorities for messages using priority label. Messages with higher priorities are delivered faster.

Before doing so please read the documentation about what downsides you get by using prioritized queues.

broker = AioPikaBroker(...)

# We can define default priority for tasks.
@broker.task(priority=2)
async def prio_task() -> int:
    return 1

async def main():
    await broker.startup()
    # This message has priority = 2.
    await prio_task.kiq()

    # This message is going to have priority 4.
    await prio_task.kicker().with_labels(priority=4).kiq()

    # This message is going to have priority 0.
    await prio_task.kicker().with_labels(priority=None).kiq()

Custom Queue and Exchange arguments

You can pass custom arguments to the underlying RabbitMQ queues and exchange declaration by using the Queue/Exchange classes from taskiq_aio_pika. If you used faststream before you are probably familiar with this concept.

These arguments will be merged with the default arguments used by the broker (such as dead-lettering and priority settings). If there are any conflicts, the values you provide will take precedence over the broker's defaults. Example:

from taskiq_aio_pika import AioPikaBroker, Queue, QueueType, Exchange
from aio_pika.abc import ExchangeType

broker = AioPikaBroker(
    exchange=Exchange(
        name="custom_exchange",
        type=ExchangeType.TOPIC,
        declare=True,
        durable=True,
        auto_delete=False,
    ),
    task_queues=[
        Queue(
            name="custom_queue",
            type=QueueType.CLASSIC,
            declare=True,
            durable=True,
            max_priority=10,
            routing_key="custom_queue",
        )
    ]
)

This will ensure that the queue is created with your custom arguments, in addition to the broker's defaults.

Multiqueue support

You can define multiple queues for your tasks. Each queue can have its own routing key and other settings. And your workers can listen to multiple queues (or specific queue) as well.

You can check multiqueue usage example in examples folder for more details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

taskiq_aio_pika-0.6.0.tar.gz (9.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

taskiq_aio_pika-0.6.0-py3-none-any.whl (10.8 kB view details)

Uploaded Python 3

File details

Details for the file taskiq_aio_pika-0.6.0.tar.gz.

File metadata

  • Download URL: taskiq_aio_pika-0.6.0.tar.gz
  • Upload date:
  • Size: 9.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.7 {"installer":{"name":"uv","version":"0.10.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for taskiq_aio_pika-0.6.0.tar.gz
Algorithm Hash digest
SHA256 0a4ec304a5e860e205aaea5077d90d2a009a4842f3ee008b5185c29301992ed9
MD5 fe1ce4a4f0eba8c2f8bec1be0b612f79
BLAKE2b-256 d505e9f4e5cbc7f9777a09f493e502242922df2d3e3779364d0292313995d68c

See more details on using hashes here.

File details

Details for the file taskiq_aio_pika-0.6.0-py3-none-any.whl.

File metadata

  • Download URL: taskiq_aio_pika-0.6.0-py3-none-any.whl
  • Upload date:
  • Size: 10.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.7 {"installer":{"name":"uv","version":"0.10.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for taskiq_aio_pika-0.6.0-py3-none-any.whl
Algorithm Hash digest
SHA256 6bff38b61b24afd7d41b78ea9ffca0702fe9653e82289ca1287b063a53af2145
MD5 7462ba76df977e666cf2996dded9201c
BLAKE2b-256 7457b06600675ef8ab6352f30632c0ece20d592f922531b3f490a0559ed792ea

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page