No project description provided
Project description
aioamqp-consumer-best
Usage
import asyncio
from typing import List
from aioamqp_consumer_best import (
ConnectionParams,
Consumer,
Exchange,
Message,
ProcessBulk,
Queue,
QueueBinding,
ToBulks,
load_json,
)
async def callback(messages: List[Message]) -> None:
print(messages)
consumer = Consumer(
middleware=(
load_json
| ToBulks(max_bulk_size=10, bulk_timeout=3.0)
| ProcessBulk(callback)
),
prefetch_count=10,
queue=Queue(
name='test-queue',
bindings=[
QueueBinding(
exchange=Exchange('test-exchange'),
routing_key='test-routing-key',
),
],
),
connection_params=[ # Round robin
ConnectionParams(),
ConnectionParams.from_string('amqp://user@rmq-host:5672/'),
],
)
asyncio.get_event_loop().run_until_complete(consumer.start())
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Close
Hashes for aioamqp-consumer-best-1.1.2.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 465c9d300a32b8d2b7fdea5ec786c8b314998c65051e7d0c9f3c0ce33c42316a |
|
MD5 | aa1c5d26206f7d3c98892e49dfb28ea0 |
|
BLAKE2b-256 | eb0a33816c13daacfafd16252d6be8493af4f290fe8bd0a95988f03fba5660ca |
Close
Hashes for aioamqp_consumer_best-1.1.2-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 8f3eaf38b4fd072a463480192709eec9586a6c701083fe86810fbf837de3be5b |
|
MD5 | f2c027c60d31721401446d1024866c88 |
|
BLAKE2b-256 | 4cf8f0e6f71a2549b006a3dd58182caec153ba876878f71e01d324b2930737b0 |