Skip to main content

Python STOMP client with pleasant API

Project description

stompman

A Python client for STOMP asynchronous messaging protocol that is:

  • asynchronous,
  • not abandoned,
  • has pleasant and comprehensible API. Also: async generators, match statements, heavy typing coverage and no callbacks.

There were no such one—and now there're is.

How To Use

Before you start using stompman, make sure you have it installed:

pip install stompman
poetry add stompman

Initialize a client:

async with stompman.Client(
    servers=[
        stompman.ConnectionParameters(host="171.0.0.1", port=61616, login="user1", passcode="passcode1"),
        stompman.ConnectionParameters(host="172.0.0.1", port=61616, login="user2", passcode="passcode2"),
    ],
    # Optional parameters with sensible defaults:
    heartbeat=stompman.Heartbeat(will_send_interval_ms=1000, want_to_receive_interval_ms=1000),
    connect_retry_attempts=3,
    connect_retry_interval=1,
    connect_timeout=2,
    connection_confirmation_timeout=2,
    read_timeout=2,
) as client:
    ...

Sending Messages

To send a message, use the following code:

await client.send(body=b"hi there!", destination="DLQ", headers={"persistent": "true"})

Or, to send messages in a transaction:

async with client.enter_transaction() as transaction:
    for _ in range(10):
        await client.send(body=b"hi there!", destination="DLQ", transaction=transaction)
        await asyncio.sleep(0.1)

Listening for Messages

Now, let's subscribe to a queue and listen for messages.

Notice that listen_to_events() is not bound to a destination: it will listen to all subscribed destinations. If you want separate subscribtions, create separate clients for that.

async with client.subscribe("DLQ"):
    async for event in client.listen_to_events():
        ...

...—and that's where it gets interesting.

Before learning how to processing messages from server, we need to understand how other libraries do it. They use callbacks. Damn callbacks in asynchronous programming.

I wanted to avoid them, and came up with an elegant solution: combining async generator and match statement. Here how it looks like:

async for event in client.listen_to_events():
    match event:
        case stompman.MessageEvent(body=body):
            print(f"message: {body!s}")
            await event.ack()
        case stompman.ErrorEvent(message_header=short_description, body=body):
            print(f"{short_description}:\n{body!s}")

More complex example, that involves handling all possible events:

async with asyncio.TaskGroup() as task_group:
    async for event in client.listen_to_events():
        match event:
            case stompman.MessageEvent(body=body):
                # Validate message ASAP and ack/nack, so that server won't assume we're not reliable
                try:
                    validated_message = MyMessageModel.model_validate_json(body)
                except ValidationError:
                    await event.nack()
                    raise

                await event.ack()
                task_group.create_task(run_business_logic(validated_message))
            case stompman.ErrorEvent(message_header=short_description, body=body):
                logger.error(
                    "Received an error from server", short_description=short_description, body=body, event=event
                )
            case stompman.HeartbeatEvent():
                task_group.create_task(update_healthcheck_status())

Cleaning Up

stompman takes care of cleaning up resources automatically. When you leave the context of async context managers stompman.Client(), client.subscribe(), or client.enter_transaction(), the necessary frames will be sent to the server.

Handling Connectivity Issues

  • If multiple servers are provided, stompman will attempt to connect to each one simultaneously and use the first that succeeds.
  • If all servers fail to connect, an stompman.FailedAllConnectAttemptsError will be raised. There're no need to handle it, if you should tune retry and timeout parameters to your needs.
  • If a connection is lost, a stompman.ReadTimeoutError will be raised. You'll need to implement reconnect logic manually. Implementing reconnect logic in the library would be too complex, since there're no global state and clean-ups are automatic (e.g. it won't be possible to re-subscribe to destination because client doesn't keep track of subscriptions).

...and caveats

  • stompman only runs on Python 3.11 and newer.
  • It implements STOMP 1.2 — the latest version of the protocol.
  • The client-individual ack mode is used, which means that server requires ack or nack. In contrast, with client ack mode server assumes you don't care about messages that occured before you connected. And, with auto ack mode server assumes client successfully received the message.
  • Heartbeats are required, and sent automatically on listen_to_events() (defaults to 1 second).

Also, I want to pointed out that:

  • Protocol parsing is inspired by aiostomp (meaning: consumed by me and refactored from).
  • stompman is tested and used with Artemis ActiveMQ.

No docs

I try to keep it simple and easy to understand. May be counter-intuitive for some, but concise high-quality code speaks for itself. There're no comments and little indirection. Read the source if you wish, leave an issue if it's not enough or you want to add or fix something.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

stompman-0.2.1.tar.gz (11.8 kB view hashes)

Uploaded Source

Built Distribution

stompman-0.2.1-py3-none-any.whl (12.0 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page