Consumer utility for AMQP
Project description
aioamqp-consumer-best
Usage
import asyncio
from typing import List
from aioamqp_consumer_best import (
ConnectionParams,
Consumer,
Exchange,
Message,
ProcessBulk,
Queue,
QueueBinding,
ToBulks,
load_json,
)
async def callback(messages: List[Message]) -> None:
print(messages)
consumer = Consumer(
middleware=(
load_json
| ToBulks(max_bulk_size=10, bulk_timeout=3.0)
| ProcessBulk(callback)
),
prefetch_count=10,
queue=Queue(
name='test-queue',
bindings=[
QueueBinding(
exchange=Exchange('test-exchange'),
routing_key='test-routing-key',
),
],
),
connection_params=[ # Round robin
ConnectionParams(),
ConnectionParams.from_string('amqp://user@rmq-host:5672/'),
],
)
asyncio.run(consumer.start())
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Close
Hashes for aioamqp-consumer-best-2.2.1.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | c029e94bb87cfdec37311ffa5a0e94d336eb403b36e28386665989b814c31fff |
|
MD5 | d5dc57864ed30ef4c7bc5846d0553674 |
|
BLAKE2b-256 | e8ea5a0a8c894d510da06df14b2f7642c7ff513755c89b725c99fd20ff2a4fef |
Close
Hashes for aioamqp_consumer_best-2.2.1-py2.py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 662594ab910c5b1d60fc4dcd54c358495fd81169f93cc8d13afc0ecbf81ab93e |
|
MD5 | 52a4d13a2d3b86577b7b519b34bb2335 |
|
BLAKE2b-256 | 93a903c1368f4e383e304ebda159b6713fefeb53d0054df9c6e8b307651fa7df |