Skip to main content

RabbitMQ broker for taskiq

Project description

AioPika broker for taskiq

This lirary provides you with aio-pika broker for taskiq.

Usage:

from taskiq_aio_pika import AioPikaBroker

broker = AioPikaBroker()

@broker.task
async def test() -> None:
    print("nothing")

Non-obvious things

You can send delayed messages and set priorities to messages using labels.

Delays

To send delayed message, you have to specify delay label. You can do it with task decorator, or by using kicker. For example:

broker = AioPikaBroker()

@broker.task(delay=3)
async def delayed_task() -> int:
    return 1

async def main():
    await broker.startup()
    # This message will be received by workers
    # After 3 seconds delay.
    await delayed_task.kiq()

    # This message is going to be received after the delay in 4 seconds.
    # Since we overriden the `delay` label using kicker.
    await delayed_task.kicker().with_labels(delay=4).kiq()

    # This message is going to be send immediately. Since we deleted the label.
    await delayed_task.kicker().with_labels(delay=None).kiq()

    # Of course the delay is managed by rabbitmq, so you don't
    # have to wait delay period before message is going to be sent.

Priorities

You can define priorities for messages using priority label. Messages with higher priorities are delivered faster. But to use priorities you need to define max_priority of the main queue, by passing max_priority parameter in broker's init. This parameter sets maximum priority for the queue and declares it as the prority queue.

Before doing so please read the documentation about what downsides you get by using prioritized queues.

broker = AioPikaBroker(max_priority=10)

# We can define default priority for tasks.
@broker.task(priority=2)
async def prio_task() -> int:
    return 1

async def main():
    await broker.startup()
    # This message has priority = 2.
    await prio_task.kiq()

    # This message is going to have priority 4.
    await prio_task.kicker().with_labels(priority=4).kiq()

    # This message is going to have priority 0.
    await prio_task.kicker().with_labels(priority=None).kiq()

Configuration

AioPikaBroker parameters:

  • url - url to rabbitmq. If None, "amqp://guest:guest@localhost:5672" is used.
  • result_backend - custom result backend.
  • task_id_generator - custom task_id genertaor.
  • exchange_name - name of exchange that used to send messages.
  • exchange_type - type of the exchange. Used only if declare_exchange is True.
  • queue_name - queue that used to get incoming messages.
  • routing_key - that used to bind that queue to the exchange.
  • declare_exchange - whether you want to declare new exchange if it doesn't exist.
  • max_priority - maximum priority for messages.
  • delay_queue_name - custom delay queue name. This queue is used to deliver messages with delays.
  • dead_letter_queue_name - custom dead letter queue name. This queue is used to receive negatively acknowleged messages from the main queue.
  • qos - number of messages that worker can prefetch.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

taskiq-aio-pika-0.0.6.tar.gz (8.0 kB view details)

Uploaded Source

Built Distribution

taskiq_aio_pika-0.0.6-py3-none-any.whl (7.8 kB view details)

Uploaded Python 3

File details

Details for the file taskiq-aio-pika-0.0.6.tar.gz.

File metadata

  • Download URL: taskiq-aio-pika-0.0.6.tar.gz
  • Upload date:
  • Size: 8.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.15 CPython/3.9.13 Linux/5.15.0-1017-azure

File hashes

Hashes for taskiq-aio-pika-0.0.6.tar.gz
Algorithm Hash digest
SHA256 5d92be01a00463526915445e91af454e6ae98b08e7e5c840d75664ddeb4c08fd
MD5 027f9ff4c644c4bb5502113c7486243b
BLAKE2b-256 ac6f472c23cccdb09a597bcdb4a4d8f332242cd926d2b81acd4d406d75abe5e3

See more details on using hashes here.

File details

Details for the file taskiq_aio_pika-0.0.6-py3-none-any.whl.

File metadata

  • Download URL: taskiq_aio_pika-0.0.6-py3-none-any.whl
  • Upload date:
  • Size: 7.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.15 CPython/3.9.13 Linux/5.15.0-1017-azure

File hashes

Hashes for taskiq_aio_pika-0.0.6-py3-none-any.whl
Algorithm Hash digest
SHA256 e64d80e66bbe42dc75eec9eb1c44b46cc8f8a58a4ce7cb43cf3819e945a223fd
MD5 b7b6915984b7165ede55c496527d95a2
BLAKE2b-256 d91877066689a1766094d001c2274aef476a6c9e1e16d67ec8652ec3e09e7968

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page