Consumer utility for AMQP
Project description
aioamqp-consumer-best
Usage
import asyncio
from typing import List
from aioamqp_consumer_best import (
ConnectionParams,
Consumer,
Exchange,
Message,
ProcessBulk,
Queue,
QueueBinding,
ToBulks,
load_json,
)
async def callback(messages: List[Message]) -> None:
print(messages)
consumer = Consumer(
middleware=(
load_json
| ToBulks(max_bulk_size=10, bulk_timeout=3.0)
| ProcessBulk(callback)
),
prefetch_count=10,
queue=Queue(
name='test-queue',
bindings=[
QueueBinding(
exchange=Exchange('test-exchange'),
routing_key='test-routing-key',
),
],
),
connection_params=[ # Round robin
ConnectionParams(),
ConnectionParams.from_string('amqp://user@rmq-host:5672/'),
],
)
asyncio.run(consumer.start())
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Close
Hashes for aioamqp-consumer-best-2.2.2.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 3a5ea052ef08fd56a90871438bd39470b5452f14b4486beaf939e3bd45e15c69 |
|
MD5 | 28be0533cc0e90b92f7d8ecce476dc13 |
|
BLAKE2b-256 | 64346e499910bd74891db7e3fcac2c37a2162d383a0163c23904a8a1e2b27bd9 |
Close
Hashes for aioamqp_consumer_best-2.2.2-py2.py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 92d4a508b8352ea9b56d003c7a64a83d5045339e08bc17733ce971af4ce21e14 |
|
MD5 | 18dd975d372d67ba737fac6a8971e448 |
|
BLAKE2b-256 | 52dc11bf197d3023f5b282bcd47aa2e3e16bd7d278b1f28f2fd6f0474f920140 |