KubeMQ Python SDK - Production-grade messaging with async support, pub/sub, queues, and commands/queries
Project description
KubeMQ Python SDK
Description
KubeMQ is an enterprise-grade message broker for containers, designed for any workload and architecture running in Kubernetes. This SDK provides a production-ready Python client supporting all KubeMQ messaging patterns: Events (pub/sub), Events Store (persistent pub/sub), Queues (with ack/reject), Commands, and Queries (RPC).
Table of Contents
- Installation
- Quick Start
- Messaging Patterns
- Configuration
- Error Handling
- Troubleshooting
- Security
- Additional Resources
- Contributing
- License
Installation
Requires Python 3.11+.
pip install kubemq
For optional features:
pip install kubemq[docs] # API reference generation
pip install kubemq[otel] # OpenTelemetry integration
Quick Start
Prerequisites: Python 3.11+, KubeMQ server running at
localhost:50000(install KubeMQ)
Send and Receive Events
Send an event:
from kubemq import PubSubClient, EventMessage
with PubSubClient(address="localhost:50000") as client:
client.publish_event(
EventMessage(channel="quickstart", body=b"Hello KubeMQ!")
)
print("Event sent!")
Subscribe to events:
import time
from kubemq import PubSubClient, EventsSubscription, CancellationToken
def on_event(event):
print(f"Received: {event.body.decode('utf-8')}")
with PubSubClient(address="localhost:50000") as client:
client.subscribe_to_events(
subscription=EventsSubscription(
channel="quickstart",
on_receive_event_callback=on_event,
on_error_callback=lambda e: print(f"Error: {e}"),
),
cancel=CancellationToken(),
)
time.sleep(60) # Keep listening
See also: Queues Quick Start | RPC Quick Start
Messaging Patterns
| Pattern | Delivery Guarantee | Use When | Example Use Case |
|---|---|---|---|
| Events | At-most-once | Fire-and-forget broadcasting to multiple subscribers | Real-time notifications, log streaming |
| Events Store | At-least-once (persistent) | Subscribers must not miss messages, even if offline | Audit trails, event sourcing, replay |
| Queues | At-least-once (with ack) | Work must be processed exactly by one consumer with acknowledgment | Job processing, task distribution |
| Commands | At-most-once (request/reply) | You need a response confirming the action was executed | Device control, configuration changes |
| Queries | At-most-once (request/reply) | You need to retrieve data from a responder | Data lookups, service-to-service reads |
Events
Fire-and-forget pub/sub. Use PubSubClient (sync) or AsyncPubSubClient (async).
View examples →
Events Store
Persistent pub/sub with replay. Subscribers can start from a sequence number, timestamp, or receive only new messages. View examples →
Queues
Pull-based message queues with acknowledgment, reject, requeue, dead-letter queues, and delayed delivery. View examples →
Commands
Request/reply where the sender expects confirmation of execution (no response payload). View examples →
Queries
Request/reply where the sender expects a data response. View examples →
Configuration
| Option | Type | Default | Description |
|---|---|---|---|
address |
str |
"localhost:50000" |
KubeMQ server gRPC address |
client_id |
str |
hostname | Unique client identifier |
auth_token |
str |
"" |
Authentication token |
tls |
TLSConfig |
disabled | TLS configuration (cert, key, CA files) |
max_send_size |
int |
104857600 |
Maximum send message size in bytes |
max_receive_size |
int |
104857600 |
Maximum receive message size in bytes |
auto_reconnect |
bool |
True |
Auto-reconnect on connection loss |
reconnect_interval_seconds |
int |
1 |
Seconds between reconnect attempts |
log_level |
int | None |
None (no logging) |
Python logging level |
All clients accept these options as constructor arguments:
from kubemq import PubSubClient, TLSConfig
client = PubSubClient(
address="kubemq-server:50000",
client_id="my-service",
auth_token="your-token",
tls=TLSConfig(
enabled=True,
cert_file="/path/to/cert.pem",
key_file="/path/to/key.pem",
ca_file="/path/to/ca.pem",
),
)
Error Handling
All SDK errors extend KubeMQError:
from kubemq import (
PubSubClient, EventMessage,
KubeMQError, KubeMQConnectionError, KubeMQTimeoutError,
)
try:
with PubSubClient(address="localhost:50000") as client:
client.publish_event(
EventMessage(channel="ch1", body=b"hello")
)
except KubeMQConnectionError as e:
print(f"Connection failed: {e}")
except KubeMQTimeoutError as e:
print(f"Operation timed out: {e}")
except KubeMQError as e:
print(f"KubeMQ error: {e}")
See the Troubleshooting Guide for solutions to common errors.
Troubleshooting
| Problem | Solution |
|---|---|
Connection refused |
Ensure KubeMQ is running: docker run -p 50000:50000 kubemq/kubemq-community |
Authentication failed |
Verify auth_token matches server configuration |
Channel not found |
Create the channel first or enable auto-create on the server |
Timeout / deadline exceeded |
Increase timeout_in_seconds or check network latency |
No messages received |
Verify subscriber is connected before sender publishes |
→ Full Troubleshooting Guide (11+ entries)
Security
See SECURITY.md for vulnerability reporting. The SDK supports TLS and mTLS connections — for configuration details, see How to Connect with TLS.
Additional Resources
- KubeMQ Documentation — Official KubeMQ documentation and guides
- API Reference — Auto-generated API documentation
- Full Documentation Index — Complete SDK documentation index
- KubeMQ Concepts — Core KubeMQ messaging concepts
- SDK Feature Parity Matrix — Cross-SDK feature comparison
- CHANGELOG.md — Release history
- TROUBLESHOOTING.md — Common issues and solutions
- Examples — Runnable code examples for all patterns
Contributing
Contributions are welcome! See CONTRIBUTING.md for development setup, coding standards, and pull request guidelines.
License
This project is licensed under the MIT License — see the LICENSE file for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file kubemq-4.1.0.tar.gz.
File metadata
- Download URL: kubemq-4.1.0.tar.gz
- Upload date:
- Size: 320.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.4
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4fa0279b7cf99f6bdb0eb0cf1e047a7abe8c37953669f5ce8bc50d3a69197db8
|
|
| MD5 |
99de42c3275e382c2c8bce11aa043ea4
|
|
| BLAKE2b-256 |
4a90f4c34bc9d1408655fad44aaa687dbac203308d5b40553dd3d016448cbd80
|
File details
Details for the file kubemq-4.1.0-py3-none-any.whl.
File metadata
- Download URL: kubemq-4.1.0-py3-none-any.whl
- Upload date:
- Size: 187.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.4
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
25fdf12ea54f42f9a070c90f114defd7ff75f95eff21d293a73372342d79b4b5
|
|
| MD5 |
0e7c97adaa41c6710c95e3c6618e0c49
|
|
| BLAKE2b-256 |
884c1ac66546cee14c68136066b5f3e6a3537d9bb5af7cdb8cdddfecae8f3f7c
|