Skip to main content

A package for async communication with RabbitMQ

Project description

RabbitMQUtils

Example for reader

Consumer

from aio_rabbitmq_utils import RabbitMQConsumeInputDeviceManager, RabbitMQInputConsumeDevice


async def example():
    input_device_manager = RabbitMQConsumeInputDeviceManager(
        hosts=["the", "rabbit", "hosts", ", will", "connect", "to", "only", "one"],
        user="user",
        password="password",
        vhost="/",
        prefetch_count=10,
    )
    await input_device_manager.connect()
    input_device: RabbitMQInputConsumeDevice = await input_device_manager.get_device("some_queue_name")
    await input_device.connect()
    data, headers, transaction = await input_device.read()
    
    # do something
    
    # To ack the message (remove from queue)
    await transaction.commit()
    # To nack the message (re-queue the message)
    await transaction.rollback()

Basic Get

from aio_rabbitmq_utils import RabbitMQMultiConnectionBasicGetInputDeviceManager, RabbitMQInputBasicGetDevice


async def example():
    input_device_manager = RabbitMQMultiConnectionBasicGetInputDeviceManager(
        hosts=["the", "rabbit", "hosts", ", will", "connect", "to", "only", "one"],
        user="user",
        password="password",
        vhost="/",
        max_connections=10,
        max_channels=50,
    )
    await input_device_manager.connect()
    input_device: RabbitMQInputBasicGetDevice = await input_device_manager.get_device("some_queue_name")
    await input_device.connect()
    data, headers, transaction = await input_device.read()
    
    # do something
    
    # To ack the message (remove from queue)
    await transaction.commit()
    # To nack the message (re-queue the message)
    await transaction.rollback()

Example for writer

from io import BytesIO
from aio_rabbitmq_utils import RabbitMQOutputDeviceManager, RabbitMQOutputDevice


async def example():
    output_device_manager = RabbitMQOutputDeviceManager(
        hosts=["the", "rabbit", "hosts", ", will", "connect", "to", "only", "one"],
        user="user",
        password="password",
        vhost="/",
        exchange_name="",
    )
    await output_device_manager.connect()
    output_device: RabbitMQOutputDevice = await output_device_manager.get_device("some_routing_key")
    await output_device.connect()
    success = await output_device.send(
        BytesIO(b"Hi"),
        {"some": "headers"},
    )
    if success:
        print("Message sent")
    else:
        raise Exception("Failed to send the message")

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aio_rabbitmq_utils-1.0.7.tar.gz (7.8 kB view details)

Uploaded Source

Built Distribution

aio_rabbitmq_utils-1.0.7-py3-none-any.whl (12.1 kB view details)

Uploaded Python 3

File details

Details for the file aio_rabbitmq_utils-1.0.7.tar.gz.

File metadata

  • Download URL: aio_rabbitmq_utils-1.0.7.tar.gz
  • Upload date:
  • Size: 7.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.12.6

File hashes

Hashes for aio_rabbitmq_utils-1.0.7.tar.gz
Algorithm Hash digest
SHA256 fa29692b23a003218be1325865d020458da4fe7c8317a83ef8ddd963fca4bac3
MD5 a3eadd7b57e8c6223b5d10a61d9b268e
BLAKE2b-256 aeb1e4c159cd382127a91aee390fdd8e2e3597355a6cb563b3bc3eb73fe16277

See more details on using hashes here.

File details

Details for the file aio_rabbitmq_utils-1.0.7-py3-none-any.whl.

File metadata

File hashes

Hashes for aio_rabbitmq_utils-1.0.7-py3-none-any.whl
Algorithm Hash digest
SHA256 d201bb00aea05569319aa95bf559816b416b19f38b7a2f36d55a8d4724d64ba5
MD5 a88d6dfb599d8d05f89949aa2d574fc0
BLAKE2b-256 48c3ae6e5ee3557b1c0c85ab1f8a6a436018bc962122e4ba78af65ec401c563b

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page