A package for async communication with RabbitMQ
Project description
RabbitMQUtils
Example for reader
Consumer
from aio_rabbitmq_utils import RabbitMQConsumeInputDeviceManager, RabbitMQInputConsumeDevice
async def example():
input_device_manager = RabbitMQConsumeInputDeviceManager(
hosts=["the", "rabbit", "hosts", ", will", "connect", "to", "only", "one"],
user="user",
password="password",
vhost="/",
prefetch_count=10,
)
await input_device_manager.connect()
input_device: RabbitMQInputConsumeDevice = await input_device_manager.get_device("some_queue_name")
await input_device.connect()
data, headers, transaction = await input_device.read()
# do something
# To ack the message (remove from queue)
await transaction.commit()
# To nack the message (re-queue the message)
await transaction.rollback()
Basic Get
from aio_rabbitmq_utils import RabbitMQMultiConnectionBasicGetInputDeviceManager, RabbitMQInputBasicGetDevice
async def example():
input_device_manager = RabbitMQMultiConnectionBasicGetInputDeviceManager(
hosts=["the", "rabbit", "hosts", ", will", "connect", "to", "only", "one"],
user="user",
password="password",
vhost="/",
max_connections=10,
max_channels=50,
)
await input_device_manager.connect()
input_device: RabbitMQInputBasicGetDevice = await input_device_manager.get_device("some_queue_name")
await input_device.connect()
data, headers, transaction = await input_device.read()
# do something
# To ack the message (remove from queue)
await transaction.commit()
# To nack the message (re-queue the message)
await transaction.rollback()
Example for writer
from io import BytesIO
from aio_rabbitmq_utils import RabbitMQOutputDeviceManager, RabbitMQOutputDevice
async def example():
output_device_manager = RabbitMQOutputDeviceManager(
hosts=["the", "rabbit", "hosts", ", will", "connect", "to", "only", "one"],
user="user",
password="password",
vhost="/",
exchange_name="",
)
await output_device_manager.connect()
output_device: RabbitMQOutputDevice = await output_device_manager.get_device("some_routing_key")
await output_device.connect()
success = await output_device.send(
BytesIO(b"Hi"),
{"some": "headers"},
)
if success:
print("Message sent")
else:
raise Exception("Failed to send the message")
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file aio_rabbitmq_utils-1.0.8.tar.gz
.
File metadata
- Download URL: aio_rabbitmq_utils-1.0.8.tar.gz
- Upload date:
- Size: 8.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.12.6
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 8a2db61d3470b21ce3554e2484e20640ba47f487197700feaa9357846c671289 |
|
MD5 | 459b2dc2af5ee1ef348b279bf8682759 |
|
BLAKE2b-256 | 9e86cc4db92aef0da32559bf008980e9b6939e902899809fac5767333f770ab7 |
File details
Details for the file aio_rabbitmq_utils-1.0.8-py3-none-any.whl
.
File metadata
- Download URL: aio_rabbitmq_utils-1.0.8-py3-none-any.whl
- Upload date:
- Size: 12.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.12.6
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | a60a715cf40e493ef5d8621aca49c9aa4bf38691e383bdab50dce828566ba2dd |
|
MD5 | 1f82d910a806f986379e6e10c8b8f5bf |
|
BLAKE2b-256 | ede627a47a2e862dee695f19283a92fca4a961360afcf38bd211b9c9827284c4 |