Skip to main content

Python STOMP client with pleasant API

Project description

stompman

A Python client for STOMP asynchronous messaging protocol that is:

  • asynchronous,
  • not abandoned,
  • has typed, modern, comprehensible API.

How To Use

Before you start using stompman, make sure you have it installed. If you optionally want to use stompman over a websocket, you can install with stompman[ws] instead of stompman:

uv add stompman
poetry add stompman

Initialize a client:

async with stompman.Client(
    servers=[
        stompman.ConnectionParameters(host="171.0.0.1", port=61616, login="user1", passcode="passcode1"),
        stompman.ConnectionParameters(host="172.0.0.1", port=61616, login="user2", passcode="passcode2"),
    ],


    # SSL — can be either `None` (default), `True`, or `ssl.SSLContext'
    ssl=None,

    # Error frame handler:
    on_error_frame=lambda error_frame: print(error_frame.body),

    # Optional parameters with sensible defaults:
    heartbeat=stompman.Heartbeat(will_send_interval_ms=1000, want_to_receive_interval_ms=1000),
    connect_retry_attempts=3,
    connect_retry_interval=1,
    connect_timeout=2,
    connection_confirmation_timeout=2,
    disconnect_confirmation_timeout=2,
    write_retry_attempts=3,
    check_server_alive_interval_factor=3,
    no_message_restart_interval=datetime.timedelta(hours=1),  # None to disable
) as client:
    ...

Initialize a client with a custom connection class, for example, connecting to a stomp producer over websocket:

# uv/poetry add stompman[ws] to get WebScoketConnection support
from stompman.connection_ws import WebSocketConnection

async with stompman.Client(
    servers=[
        stompman.ConnectionParameters(host="171.0.0.1", port=8080, login="", passcode="", ws_uri_path="/ws/path"),
    ],
    connection_class=WebSocketConnection,
    ...
) as client:
    ...

Sending Messages

To send a message, use the following code:

await client.send(b"hi there!", destination="DLQ", headers={"persistent": "true"})

Or, to send messages in a transaction:

async with client.begin() as transaction:
    for _ in range(10):
        await transaction.send(body=b"hi there!", destination="DLQ", headers={"persistent": "true"})
        await asyncio.sleep(0.1)

Listening for Messages

Now, let's subscribe to a destination and listen for messages:

async def handle_message_from_dlq(message_frame: stompman.MessageFrame) -> None:
    print(message_frame.body)


await client.subscribe("DLQ", handle_message_from_dlq, on_suppressed_exception=print)

Entered stompman.Client will block forever waiting for messages if there are any active subscriptions.

Sometimes it's useful to avoid that:

dlq_subscription = await client.subscribe("DLQ", handle_message_from_dlq, on_suppressed_exception=print)
await dlq_subscription.unsubscribe()

By default, subscription have ACK mode "client-individual". If handler successfully processes the message, an ACK frame will be sent. If handler raises an exception, a NACK frame will be sent. You can catch (and log) exceptions using on_suppressed_exception parameter:

await client.subscribe(
    "DLQ",
    handle_message_from_dlq,
    on_suppressed_exception=lambda exception, message_frame: print(exception, message_frame),
)

You can change the ack mode used by specifying the ack parameter:

# Server will assume that all messages sent to the subscription before the ACK'ed message are received and processed:
await client.subscribe("DLQ", handle_message_from_dlq, ack="client", on_suppressed_exception=print)

# Server will assume that messages are received as soon as it send them to client:
await client.subscribe("DLQ", handle_message_from_dlq, ack="auto", on_suppressed_exception=print)

You can pass custom headers to client.subscribe():

await client.subscribe("DLQ", handle_message_from_dlq, ack="client", headers={"selector": "location = 'Europe'"}, on_suppressed_exception=print)

Handling ACK/NACKs yourself

If you want to send ACK and NACK frames yourself, you can use client.subscribe_with_manual_ack():

async def handle_message_from_dlq(message_frame: stompman.AckableMessageFrame) -> None:
    print(message_frame.body)
    await message_frame.ack()

await client.subscribe_with_manual_ack("DLQ", handle_message_from_dlq, ack="client")

Note that this way exceptions won't be suppressed automatically.

Cleaning Up

stompman takes care of cleaning up resources automatically. When you leave the context of async context managers stompman.Client(), or client.begin(), the necessary frames will be sent to the server.

Handling Connectivity Issues

  • If multiple servers were provided, stompman will attempt to connect to each one simultaneously and will use the first that succeeds. If all servers fail to connect, an stompman.FailedAllConnectAttemptsError will be raised. In normal situation it doesn't need to be handled: tune retry and timeout parameters in stompman.Client() to your needs.

  • When connection is lost, stompman will attempt to handle it automatically. stompman.FailedAllConnectAttemptsError will be raised if all connection attempts fail. stompman.FailedAllWriteAttemptsError will be raised if connection succeeds but sending a frame or heartbeat lead to losing connection.

  • If no messages are received for no_message_restart_interval (defaults to 1 hour), stompman will force a reconnect. Set to None to disable.

  • To implement health checks, use stompman.Client.is_alive() — it will return True if everything is OK and False if server is not responding.

  • stompman will write log warnings when connection is lost, after successful reconnection or invalid state during ack/nack.

...and caveats

  • stompman supports Python 3.11 and newer.
  • It implements STOMP 1.2 — the latest version of the protocol.
  • Heartbeats are required, and sent automatically in background (defaults to 1 second).

Also, I want to pointed out that:

  • Protocol parsing is inspired by aiostomp (meaning: consumed by me and refactored from).
  • stompman is tested and used with ActiveMQ Artemis and ActiveMQ Classic.
    • Caveat: a message sent by a Stomp client is converted into a JMS TextMessage/BytesMessage based on the content-length header (see the docs here). In order to send a TextMessage, Client.send needs to be invoked with add_content_length header set to False
  • Specification says that headers in CONNECT and CONNECTED frames shouldn't be escaped for backwards compatibility. stompman escapes headers in CONNECT frame (outcoming), but does not unescape headers in CONNECTED (outcoming).

FastStream STOMP broker

An implementation of STOMP broker for FastStream.

Examples

See examples in examples/.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

stompman-3.11.0.tar.gz (15.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

stompman-3.11.0-py3-none-any.whl (21.1 kB view details)

Uploaded Python 3

File details

Details for the file stompman-3.11.0.tar.gz.

File metadata

  • Download URL: stompman-3.11.0.tar.gz
  • Upload date:
  • Size: 15.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.2 {"installer":{"name":"uv","version":"0.11.2","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for stompman-3.11.0.tar.gz
Algorithm Hash digest
SHA256 049a0c3468e83142544ff345b7cf69a5eb76994f82e47fce7f91e636e2a3f272
MD5 e06265d2c3e695eaaa1bf1b89cd6a4d7
BLAKE2b-256 866ee87eed62600d3a0043cc2782c84a16d997f20ed42fd4d1de0bc8f550d9de

See more details on using hashes here.

File details

Details for the file stompman-3.11.0-py3-none-any.whl.

File metadata

  • Download URL: stompman-3.11.0-py3-none-any.whl
  • Upload date:
  • Size: 21.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.2 {"installer":{"name":"uv","version":"0.11.2","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for stompman-3.11.0-py3-none-any.whl
Algorithm Hash digest
SHA256 9351ab107d73a646f2e42943220ebdbe964ab79ef61ede170fd40c470dd71cf6
MD5 78ec365a4da2384d0edacced713774a3
BLAKE2b-256 e2e0a60f479f821f7bd0e9476c4680ba4a67e6638b02107818e8ae55b0b67a3a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page