Python STOMP client with pleasant API
Project description
stompman
A Python client for STOMP asynchronous messaging protocol that is:
- asynchronous,
- not abandoned,
- has pleasant and comprehensible API. Also: async generators, match statements, heavy typing coverage and no callbacks.
There were no such one—and now there're is.
How To Use
Before you start using stompman, make sure you have it installed:
pip install stompman
poetry add stompman
Initialize a client:
async with stompman.Client(
servers=[
stompman.ConnectionParameters(host="171.0.0.1", port=61616, login="user1", passcode="passcode1"),
stompman.ConnectionParameters(host="172.0.0.1", port=61616, login="user2", passcode="passcode2"),
],
# Optional parameters with sensible defaults:
heartbeat=stompman.Heartbeat(will_send_interval_ms=1000, want_to_receive_interval_ms=1000),
connect_retry_attempts=3,
connect_retry_interval=1,
connect_timeout=2,
connection_confirmation_timeout=2,
read_timeout=2,
) as client:
...
Sending Messages
To send a message, use the following code:
await client.send(body=b"hi there!", destination="DLQ", headers={"persistent": "true"})
Or, to send messages in a transaction:
async with client.enter_transaction() as transaction:
for _ in range(10):
await client.send(body=b"hi there!", destination="DLQ", transaction=transaction)
await asyncio.sleep(0.1)
Listening for Messages
Now, let's subscribe to a queue and listen for messages.
Notice that listen()
is not bound to a destination: it will listen to all subscribed destinations. If you want separate subscribtions, create separate clients for that.
async with client.subscribe("DLQ"):
async for event in client.listen():
...
...
—and that's where it gets interesting.
Before learning how to processing messages from server, we need to understand how other libraries do it. They use callbacks. Damn callbacks in asynchronous programming.
I wanted to avoid them, and came up with an elegant solution: combining async generator and match statement. Here how it looks like:
async for event in client.listen():
match event:
case stompman.MessageEvent(body=body):
print(f"message: {body!s}")
await event.ack()
case stompman.ErrorEvent(message_header=short_description, body=body):
print(f"{short_description}:\n{body!s}")
More complex example, that involves handling all possible events, and auto-acknowledgement:
async with asyncio.TaskGroup() as task_group:
async for event in client.listen():
match event:
case stompman.MessageEvent(body=body):
task_group.create_task(handle_message(body))
case stompman.ErrorEvent(message_header=short_description, body=body):
logger.error(
"Received an error from server", short_description=short_description, body=body, event=event
)
case stompman.HeartbeatEvent():
task_group.create_task(update_healthcheck_status())
async def handle_message(event: stompman.MessageEvent) -> None:
try:
validated_message = MyMessageModel.model_validate_json(event.body)
await run_business_logic(validated_message)
except Exception:
await event.nack()
logger.exception("Failed to handle message", event=event)
else:
await event.ack()
Cleaning Up
stompman takes care of cleaning up resources automatically. When you leave the context of async context managers stompman.Client()
, client.subscribe()
, or client.enter_transaction()
, the necessary frames will be sent to the server.
Handling Connectivity Issues
-
If multiple servers were provided, stompman will attempt to connect to each one simultaneously and will use the first that succeeds.
-
If all servers fail to connect, an
stompman.FailedAllConnectAttemptsError
will be raised. In normal situation it doesn't need to be handled: tune retry and timeout parameters instompman.Client()
to your needs. -
If a connection is lost, a
stompman.ConnectionLostError
will be raised. You should implement reconnect logic manually, for example, with stamina:for attempt in stamina.retry_context(on=stompman.ConnectionLostError): with attempt: async with stompman.Client(...) as client: ...
...and caveats
- stompman only runs on Python 3.11 and newer.
- It implements STOMP 1.2 — the latest version of the protocol.
- The client-individual ack mode is used, which means that server requires
ack
ornack
. In contrast, withclient
ack mode server assumes you don't care about messages that occured before you connected. And, withauto
ack mode server assumes client successfully received the message. - Heartbeats are required, and sent automatically on
listen()
(defaults to 1 second).
Also, I want to pointed out that:
- Protocol parsing is inspired by aiostomp (meaning: consumed by me and refactored from).
- stompman is tested and used with Artemis ActiveMQ.
No docs
I try to keep it simple and easy to understand. May be counter-intuitive for some, but concise high-quality code speaks for itself. There're no comments and little indirection. Read the source if you wish, leave an issue if it's not enough or you want to add or fix something.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.