Skip to main content

Python STOMP client with pleasant API

Project description

stompman

A Python client for STOMP asynchronous messaging protocol that is:

  • asynchronous,
  • not abandoned,
  • has typed, modern, comprehensible API.

How To Use

Before you start using stompman, make sure you have it installed:

uv add stompman
poetry add stompman

Initialize a client:

async with stompman.Client(
    servers=[
        stompman.ConnectionParameters(host="171.0.0.1", port=61616, login="user1", passcode="passcode1"),
        stompman.ConnectionParameters(host="172.0.0.1", port=61616, login="user2", passcode="passcode2"),
    ],

    # Handlers:
    on_error_frame=lambda error_frame: print(error_frame.body),
    on_heartbeat=lambda: print("Server sent a heartbeat"),  # also can be async

    # SSL — can be either `None` (default), `True`, or `ssl.SSLContext'
    ssl=None,

    # Optional parameters with sensible defaults:
    heartbeat=stompman.Heartbeat(will_send_interval_ms=1000, want_to_receive_interval_ms=1000),
    connect_retry_attempts=3,
    connect_retry_interval=1,
    connect_timeout=2,
    connection_confirmation_timeout=2,
    disconnect_confirmation_timeout=2,
    read_timeout=2,
    write_retry_attempts=3,
) as client:
    ...

Sending Messages

To send a message, use the following code:

await client.send(b"hi there!", destination="DLQ", headers={"persistent": "true"})

Or, to send messages in a transaction:

async with client.begin() as transaction:
    for _ in range(10):
        await transaction.send(body=b"hi there!", destination="DLQ", headers={"persistent": "true"})
        await asyncio.sleep(0.1)

Listening for Messages

Now, let's subscribe to a destination and listen for messages:

async def handle_message_from_dlq(message_frame: stompman.MessageFrame) -> None:
    print(message_frame.body)


await client.subscribe("DLQ", handle_message_from_dlq, on_suppressed_exception=print)

Entered stompman.Client will block forever waiting for messages if there are any active subscriptions.

Sometimes it's useful to avoid that:

dlq_subscription = await client.subscribe("DLQ", handle_message_from_dlq, on_suppressed_exception=print)
await dlq_subscription.unsubscribe()

By default, subscription have ACK mode "client-individual". If handler successfully processes the message, an ACK frame will be sent. If handler raises an exception, a NACK frame will be sent. You can catch (and log) exceptions using on_suppressed_exception parameter:

await client.subscribe(
    "DLQ",
    handle_message_from_dlq,
    on_suppressed_exception=lambda exception, message_frame: print(exception, message_frame),
)

You can change the ack mode used by specifying the ack parameter:

# Server will assume that all messages sent to the subscription before the ACK'ed message are received and processed:
await client.subscribe("DLQ", handle_message_from_dlq, ack="client", on_suppressed_exception=print)

# Server will assume that messages are received as soon as it send them to client:
await client.subscribe("DLQ", handle_message_from_dlq, ack="auto", on_suppressed_exception=print)

You can pass custom headers to client.subscribe():

await client.subscribe("DLQ", handle_message_from_dlq, ack="client", headers={"selector": "location = 'Europe'"}, on_suppressed_exception=print)

Cleaning Up

stompman takes care of cleaning up resources automatically. When you leave the context of async context managers stompman.Client(), or client.begin(), the necessary frames will be sent to the server.

Handling Connectivity Issues

  • If multiple servers were provided, stompman will attempt to connect to each one simultaneously and will use the first that succeeds. If all servers fail to connect, an stompman.FailedAllConnectAttemptsError will be raised. In normal situation it doesn't need to be handled: tune retry and timeout parameters in stompman.Client() to your needs.

  • When connection is lost, stompman will attempt to handle it automatically. stompman.FailedAllConnectAttemptsError will be raised if all connection attempts fail. stompman.FailedAllWriteAttemptsError will be raised if connection succeeds but sending a frame or heartbeat lead to losing connection.

...and caveats

  • stompman supports Python 3.11 and newer.
  • It implements STOMP 1.2 — the latest version of the protocol.
  • Heartbeats are required, and sent automatically in background (defaults to 1 second).

Also, I want to pointed out that:

  • Protocol parsing is inspired by aiostomp (meaning: consumed by me and refactored from).
  • stompman is tested and used with ActiveMQ Artemis and ActiveMQ Classic.
  • Specification says that headers in CONNECT and CONNECTED frames shouldn't be escaped for backwards compatibility. stompman escapes headers in CONNECT frame (outcoming), but does not unescape headers in CONNECTED (outcoming).

Examples

See producer and consumer examples in examples/.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

stompman-1.6.0.tar.gz (25.2 kB view details)

Uploaded Source

Built Distribution

stompman-1.6.0-py3-none-any.whl (18.0 kB view details)

Uploaded Python 3

File details

Details for the file stompman-1.6.0.tar.gz.

File metadata

  • Download URL: stompman-1.6.0.tar.gz
  • Upload date:
  • Size: 25.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.4.30

File hashes

Hashes for stompman-1.6.0.tar.gz
Algorithm Hash digest
SHA256 16e536d674b8c315e51ddbb877266fdd1b31ea6b00c4d13396f3d76051b85dfb
MD5 f33810c846d2823388de4491d6d35026
BLAKE2b-256 a83d49f5d2e11b024feb1bbe4b741168c7f945b591b8aefafa6b15eaedcf34ef

See more details on using hashes here.

File details

Details for the file stompman-1.6.0-py3-none-any.whl.

File metadata

File hashes

Hashes for stompman-1.6.0-py3-none-any.whl
Algorithm Hash digest
SHA256 2d1a440b18e370559baae3ad02314a793674abdfeab5367ed6de0eb81a3fa526
MD5 be22bf268b28c8d046eb66274c5f589c
BLAKE2b-256 8fce3769c995ad3e2850b810393309f0ae73c5240d52504cb7f6d0aa3e999bcd

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page