Skip to main content

A Message Queue Client API Supporting Apache Pulsar, RabbitMQ, and NATS.io

Project description

PyPI GitHub release (latest by date including pre-releases) Versions PyPI - License GitHub issues GitHub pull requests

MQClient

MQClient is a powerful and flexible message-queue client API that provides a unified interface for working with multiple messaging systems, including Apache Pulsar, RabbitMQ, and NATS.io. It is designed for resilient, asynchronous message publishing and consumption.

Features

  • Unified API – Work seamlessly with different message brokers using a consistent interface.
  • Pluggable Broker Support – Easily swap between supported brokers without changing application logic.
  • Automatic Error Handling – Built-in support for message acknowledgments, retries, and failure recovery.
  • Flexible Consumer Patterns – Supports streaming consumers, batch processing, concurrent message handling, and more.

Installation

You must choose the message broker protocol at install time, these are pulsar, rabbitmq,and nats:

pip install oms-mqclient[pulsar]  

or

pip install oms-mqclient[rabbitmq]  

or

pip install oms-mqclient[nats]  

Usage

Initializing a Queue

To use MQClient, instantiate a Queue with the required broker client:

from mqclient.queue import Queue
import os

# Ensure that broker_client matches what was installed
broker_client = "rabbitmq"  # Change this to "pulsar" or "nats" if installed accordingly

queue = Queue(broker_client=broker_client, name="my_queue", auth_token=os.getenv('MY_QUEUE_AUTH'))

Use Cases / Patterns / Recipes

The most common use case of MQClient is to open a pub and/or sub stream.

Streaming Publisher

Use open_pub() to open a pub stream.

async def stream_publisher(queue: Queue):
    """Publish all messages."""
    async with queue.open_pub() as pub:
        while True:
            msg = await generate_message()
            await pub.send(msg)
            print(f"Sent: {msg}")

Serialization

pub.send() only accepts JSON-serializable data.

Any non-compliant data will need to pre-serialized prior to pub.send(). Then, every "consumer code" will need to implement the inverse function.

One way to do this is:

import base64
import pickle
from typing import Any


def encode_pkl_b64_data(my_data: Any) -> dict:
    """Encode a Python object to message-friendly dict."""
    print(f"want to send: {my_data}")
    out = {'my-data': base64.b64encode(pickle.dumps(my_data)).decode()}
    print("data is now ready to be sent with `pub.send()`!")
    return out


def decode_pkl_b64_data(b64_string: dict) -> Any:
    """Decode a message-friendly dict back to a Python object."""
    print("attempting to read the data just gotten from the `open_sub` iterator...")
    my_data = pickle.loads(base64.b64decode(b64_string))['my-data']
    print(f"got: {my_data}")
    return my_data

Streaming Consumer

Use open_sub() to open a sub stream. Each message will be automatically acknowledged upon the following iteration. If an Exception is raised, the message will immediately be nacked. By default, any un-excepted exceptions will be excepted by the open_sub() context manager. This can be turned off by setting Queue.except_errors to False.

async def stream_consumer(queue: Queue):
    """Consume messages until timeout."""
    async with queue.open_sub() as sub:
        async for msg in sub:
            print(f"Received: {msg}")
            await process_message(msg)  # may raise an Exception -> auto nack

Less Common Use Cases / Patterns / Recipes

Consuming a Single Message

The most common use case is to open an open_sub() stream to receive messages due to the overhead of opening a sub. Nonetheless, open_sub_one() can be used to consume a single message.

async def consume_one(queue: Queue):
    """Get one message only."""
    async with queue.open_sub_one() as msg:
        print(f"Received: {msg}")

Consuming Messages in Batches and/or Concurrently

Since open_sub()'s built-in ack/nack mechanism enforces one-by-one message consumption—i.e., the previous message must be acked/nacked before an additional message can be consumed—you will need to use open_sub_manual_acking() to manually acknowledge (or nack) messages.

Warning: If a message is not acked/nacked within a certain time, it may be re-enqueued. Client code will need to account for this. The exact behavior of this depends on the broker server configuration.

Batch Processing
async def batch_processing_consumer(queue: Queue):
    """Manually process messages in batches before acking."""
    batch_size = 5
    messages_pending_ack = []

    async with queue.open_sub_manual_acking() as sub:
        async for msg in sub.iter_messages():
            messages_pending_ack.append(msg)

            if len(messages_pending_ack) < batch_size:
                continue  # need more messages!

            try:
                await process_batch([m.data for m in messages_pending_ack])
            except Exception:
                print("Batch processing failed, nacking all messages")
                for m in messages_pending_ack:
                    await sub.nack(m)
            else:
                print("Success!")
                for m in messages_pending_ack:
                    await sub.ack(m)
            finally:
                messages_pending_ack = []
Concurrent Processing
import asyncio


async def concurrent_processing_consumer(queue: Queue):
    """Process messages concurrently and ack/nack as soon as one is done."""
    async with queue.open_sub_manual_acking() as sub:
        tasks = {}

        async for msg in sub.iter_messages():
            task = asyncio.create_task(process_message(msg.data))
            tasks[task] = msg  # Track task-to-message mapping

            # Wait for at least one task to complete
            done, _ = await asyncio.wait(tasks.keys(), return_when=asyncio.FIRST_COMPLETED)

            for finished_task in done:
                msg = tasks.pop(finished_task)
                try:
                    await finished_task  # Raises if task failed
                except Exception:
                    print(f"Processing failed for {msg}, nacking")
                    await sub.nack(msg)
                else:
                    print(f"Successfully processed {msg}, acking")
                    await sub.ack(msg)

Configuration

MQClient supports various configurations via environment variables or direct parameters:

Parameter Description Default Value
broker_url Connection URL for the message broker localhost
queue_name Name of the message queue autogenerated
prefetch Number of messages to prefetch 1
timeout Time in seconds to wait for a message 60
retries Number of retry attempts on failure 2 (i.e., 3 attempts total)

Contributing

Contributions are welcome! Feel free to submit issues or pull requests to improve MQClient.

License

This project is licensed under the MIT License. See the LICENSE file for details.


For more details, visit the repository.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

oms_mqclient-2.7.1.tar.gz (54.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

oms_mqclient-2.7.1-py3-none-any.whl (51.7 kB view details)

Uploaded Python 3

File details

Details for the file oms_mqclient-2.7.1.tar.gz.

File metadata

  • Download URL: oms_mqclient-2.7.1.tar.gz
  • Upload date:
  • Size: 54.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for oms_mqclient-2.7.1.tar.gz
Algorithm Hash digest
SHA256 ce0303ac079c7063cc63c30f445b212bf4ebb7b3a3b7e1798ac234e1e210c8ea
MD5 a4b35e887d9457a8c064d4dc0957a0a7
BLAKE2b-256 2372f6eff1b762bcd3b1eab766e4c0e0d6799172d5d1afdf89dd37ba2569b355

See more details on using hashes here.

File details

Details for the file oms_mqclient-2.7.1-py3-none-any.whl.

File metadata

  • Download URL: oms_mqclient-2.7.1-py3-none-any.whl
  • Upload date:
  • Size: 51.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for oms_mqclient-2.7.1-py3-none-any.whl
Algorithm Hash digest
SHA256 d85a789710a1129e58c01c4659536479a769bf1e90606f95c4b0633fcdffc22d
MD5 4a089e32eb331c8178e5bc4168fa1d20
BLAKE2b-256 9fc23c5398f04402387486774206be2def96a769b3534ace05cf655ec45e3214

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page