Skip to main content

KubeMQ Python SDK - Production-grade messaging with async support, pub/sub, queues, and commands/queries

Project description

KubeMQ Python SDK

PyPI version CI codecov Python 3.11+ License: MIT

Description

KubeMQ is an enterprise-grade message broker for containers, designed for any workload and architecture running in Kubernetes. This SDK provides a production-ready Python client supporting all KubeMQ messaging patterns: Events (pub/sub), Events Store (persistent pub/sub), Queues (with ack/reject), Commands, and Queries (RPC).

Table of Contents

Installation

Requires Python 3.11+.

pip install kubemq

For optional features:

pip install kubemq[docs]    # API reference generation
pip install kubemq[otel]    # OpenTelemetry integration

Quick Start

Prerequisites: Python 3.11+, KubeMQ server running at localhost:50000 (install KubeMQ)

Send and Receive Events

Send an event:

from kubemq import PubSubClient, EventMessage

with PubSubClient(address="localhost:50000") as client:
    client.publish_event(
        EventMessage(channel="quickstart", body=b"Hello KubeMQ!")
    )
    print("Event sent!")

Subscribe to events:

import time
from kubemq import PubSubClient, EventsSubscription, CancellationToken

def on_event(event):
    print(f"Received: {event.body.decode('utf-8')}")

with PubSubClient(address="localhost:50000") as client:
    client.subscribe_to_events(
        subscription=EventsSubscription(
            channel="quickstart",
            on_receive_event_callback=on_event,
            on_error_callback=lambda e: print(f"Error: {e}"),
        ),
        cancel=CancellationToken(),
    )
    time.sleep(60)  # Keep listening

See also: Queues Quick Start | RPC Quick Start

Messaging Patterns

Pattern Delivery Guarantee Use When Example Use Case
Events At-most-once Fire-and-forget broadcasting to multiple subscribers Real-time notifications, log streaming
Events Store At-least-once (persistent) Subscribers must not miss messages, even if offline Audit trails, event sourcing, replay
Queues At-least-once (with ack) Work must be processed exactly by one consumer with acknowledgment Job processing, task distribution
Commands At-most-once (request/reply) You need a response confirming the action was executed Device control, configuration changes
Queries At-most-once (request/reply) You need to retrieve data from a responder Data lookups, service-to-service reads

Events

Fire-and-forget pub/sub. Use PubSubClient (sync) or AsyncPubSubClient (async). View examples →

Events Store

Persistent pub/sub with replay. Subscribers can start from a sequence number, timestamp, or receive only new messages. View examples →

Queues

Pull-based message queues with acknowledgment, reject, requeue, dead-letter queues, and delayed delivery. View examples →

Commands

Request/reply where the sender expects confirmation of execution (no response payload). View examples →

Queries

Request/reply where the sender expects a data response. View examples →

Configuration

Option Type Default Description
address str "localhost:50000" KubeMQ server gRPC address
client_id str hostname Unique client identifier
auth_token str "" Authentication token
tls TLSConfig disabled TLS configuration (cert, key, CA files)
max_send_size int 104857600 Maximum send message size in bytes
max_receive_size int 104857600 Maximum receive message size in bytes
auto_reconnect bool True Auto-reconnect on connection loss
reconnect_interval_seconds int 1 Seconds between reconnect attempts
log_level int | None None (no logging) Python logging level

All clients accept these options as constructor arguments:

from kubemq import PubSubClient, TLSConfig

client = PubSubClient(
    address="kubemq-server:50000",
    client_id="my-service",
    auth_token="your-token",
    tls=TLSConfig(
        enabled=True,
        cert_file="/path/to/cert.pem",
        key_file="/path/to/key.pem",
        ca_file="/path/to/ca.pem",
    ),
)

Error Handling

All SDK errors extend KubeMQError:

from kubemq import (
    PubSubClient, EventMessage,
    KubeMQError, KubeMQConnectionError, KubeMQTimeoutError,
)

try:
    with PubSubClient(address="localhost:50000") as client:
        client.publish_event(
            EventMessage(channel="ch1", body=b"hello")
        )
except KubeMQConnectionError as e:
    print(f"Connection failed: {e}")
except KubeMQTimeoutError as e:
    print(f"Operation timed out: {e}")
except KubeMQError as e:
    print(f"KubeMQ error: {e}")

See the Troubleshooting Guide for solutions to common errors.

Troubleshooting

Problem Solution
Connection refused Ensure KubeMQ is running: docker run -p 50000:50000 kubemq/kubemq-community
Authentication failed Verify auth_token matches server configuration
Channel not found Create the channel first or enable auto-create on the server
Timeout / deadline exceeded Increase timeout_in_seconds or check network latency
No messages received Verify subscriber is connected before sender publishes

Full Troubleshooting Guide (11+ entries)

Security

See SECURITY.md for vulnerability reporting. The SDK supports TLS and mTLS connections — for configuration details, see How to Connect with TLS.

Additional Resources

Contributing

Contributions are welcome! See CONTRIBUTING.md for development setup, coding standards, and pull request guidelines.

License

This project is licensed under the MIT License — see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kubemq-4.1.1.tar.gz (320.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kubemq-4.1.1-py3-none-any.whl (187.1 kB view details)

Uploaded Python 3

File details

Details for the file kubemq-4.1.1.tar.gz.

File metadata

  • Download URL: kubemq-4.1.1.tar.gz
  • Upload date:
  • Size: 320.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.4

File hashes

Hashes for kubemq-4.1.1.tar.gz
Algorithm Hash digest
SHA256 3949a0a1049dc08efda199f99e4f032462e0382ded3048383e32886e444eb762
MD5 8d6d56f958454937911a391311851010
BLAKE2b-256 bf5270f2c85a8033ec6befe26975def4c071ffd456ab0bbaf652109ee11d8eda

See more details on using hashes here.

File details

Details for the file kubemq-4.1.1-py3-none-any.whl.

File metadata

  • Download URL: kubemq-4.1.1-py3-none-any.whl
  • Upload date:
  • Size: 187.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.4

File hashes

Hashes for kubemq-4.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 ac5663fab7bf319e697a9eec06b23f1123ff698d1a8d0abae27ff16ca3f4df09
MD5 e4f46a86a19eb1468cc889c0107f0bad
BLAKE2b-256 2f4930d327bc3cbc3e99205ecf48947fee185f86c77f8bc61378cc2dd5b76fb4

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page