Skip to main content

Consumer utility for AMQP

Project description

aioamqp-consumer-best

PyPI version PyPI - Python Version Build Status codecov

Usage

import asyncio
from typing import List

from aioamqp_consumer_best import (
    ConnectionParams,
    Consumer,
    Exchange,
    Message,
    ProcessBulk,
    Queue,
    QueueBinding,
    ToBulks,
    load_json,
)


async def callback(messages: List[Message]) -> None:
    print(messages)


consumer = Consumer(
    middleware=(
        load_json
        | ToBulks(max_bulk_size=10, bulk_timeout=3.0)
        | ProcessBulk(callback)
    ),
    prefetch_count=10,
    queue=Queue(
        name='test-queue',
        bindings=[
            QueueBinding(
                exchange=Exchange('test-exchange'),
                routing_key='test-routing-key',
            ),
        ],
    ),
    connection_params=[  # Round robin
        ConnectionParams(),
        ConnectionParams.from_string('amqp://user@rmq-host:5672/'),
    ],
)

asyncio.run(consumer.start())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aioamqp-consumer-best-2.4.0.tar.gz (11.4 kB view details)

Uploaded Source

Built Distribution

aioamqp_consumer_best-2.4.0-py3-none-any.whl (11.2 kB view details)

Uploaded Python 3

File details

Details for the file aioamqp-consumer-best-2.4.0.tar.gz.

File metadata

  • Download URL: aioamqp-consumer-best-2.4.0.tar.gz
  • Upload date:
  • Size: 11.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.11.5

File hashes

Hashes for aioamqp-consumer-best-2.4.0.tar.gz
Algorithm Hash digest
SHA256 0bb6cff9c369c9a501bbceb42c78ea9714d6e1cc19281f011a246eec91f7bb03
MD5 3a1d097b0da47d0765319dafb4384420
BLAKE2b-256 8fcf3a4b94c2295aac02a1552906be458d362c52b660ca638c7e563cef7993f5

See more details on using hashes here.

File details

Details for the file aioamqp_consumer_best-2.4.0-py3-none-any.whl.

File metadata

File hashes

Hashes for aioamqp_consumer_best-2.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 137be99e3fd5dcbc5409dcb5b0d4795ff7d3e20470704f7c1fe27da813f3d176
MD5 432af1d5e7b38a3ae28ccd0173d7cfc3
BLAKE2b-256 1d17026450f843518f298aaaa95965c316815ddd5b038e42188274e6c270342f

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page