Skip to main content

A package for async communication with RabbitMQ

Project description

RabbitMQUtils

Example for reader

Consumer

from aio_rabbitmq_utils import RabbitMQConsumeInputDeviceManager, RabbitMQInputConsumeDevice


async def example():
    input_device_manager = RabbitMQConsumeInputDeviceManager(
        hosts=["the", "rabbit", "hosts", ", will", "connect", "to", "only", "one"],
        user="user",
        password="password",
        vhost="/",
        prefetch_count=10,
    )
    await input_device_manager.connect()
    input_device: RabbitMQInputConsumeDevice = await input_device_manager.get_device("some_queue_name")
    await input_device.connect()
    data, headers, transaction = await input_device.read()
    
    # do something
    
    # To ack the message (remove from queue)
    await transaction.commit()
    # To nack the message (re-queue the message)
    await transaction.rollback()

Basic Get

from aio_rabbitmq_utils import RabbitMQMultiConnectionBasicGetInputDeviceManager, RabbitMQInputBasicGetDevice


async def example():
    input_device_manager = RabbitMQMultiConnectionBasicGetInputDeviceManager(
        hosts=["the", "rabbit", "hosts", ", will", "connect", "to", "only", "one"],
        user="user",
        password="password",
        vhost="/",
        max_connections=10,
        max_channels=50,
    )
    await input_device_manager.connect()
    input_device: RabbitMQInputBasicGetDevice = await input_device_manager.get_device("some_queue_name")
    await input_device.connect()
    data, headers, transaction = await input_device.read()
    
    # do something
    
    # To ack the message (remove from queue)
    await transaction.commit()
    # To nack the message (re-queue the message)
    await transaction.rollback()

Example for writer

from io import BytesIO
from aio_rabbitmq_utils import RabbitMQOutputDeviceManager, RabbitMQOutputDevice


async def example():
    output_device_manager = RabbitMQOutputDeviceManager(
        hosts=["the", "rabbit", "hosts", ", will", "connect", "to", "only", "one"],
        user="user",
        password="password",
        vhost="/",
        exchange_name="",
    )
    await output_device_manager.connect()
    output_device: RabbitMQOutputDevice = await output_device_manager.get_device("some_routing_key")
    await output_device.connect()
    success = await output_device.send(
        BytesIO(b"Hi"),
        {"some": "headers"},
    )
    if success:
        print("Message sent")
    else:
        raise Exception("Failed to send the message")

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aio_rabbitmq_utils-1.0.8.tar.gz (8.5 kB view details)

Uploaded Source

Built Distribution

aio_rabbitmq_utils-1.0.8-py3-none-any.whl (12.9 kB view details)

Uploaded Python 3

File details

Details for the file aio_rabbitmq_utils-1.0.8.tar.gz.

File metadata

  • Download URL: aio_rabbitmq_utils-1.0.8.tar.gz
  • Upload date:
  • Size: 8.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.12.6

File hashes

Hashes for aio_rabbitmq_utils-1.0.8.tar.gz
Algorithm Hash digest
SHA256 8a2db61d3470b21ce3554e2484e20640ba47f487197700feaa9357846c671289
MD5 459b2dc2af5ee1ef348b279bf8682759
BLAKE2b-256 9e86cc4db92aef0da32559bf008980e9b6939e902899809fac5767333f770ab7

See more details on using hashes here.

File details

Details for the file aio_rabbitmq_utils-1.0.8-py3-none-any.whl.

File metadata

File hashes

Hashes for aio_rabbitmq_utils-1.0.8-py3-none-any.whl
Algorithm Hash digest
SHA256 a60a715cf40e493ef5d8621aca49c9aa4bf38691e383bdab50dce828566ba2dd
MD5 1f82d910a806f986379e6e10c8b8f5bf
BLAKE2b-256 ede627a47a2e862dee695f19283a92fca4a961360afcf38bd211b9c9827284c4

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page