Skip to main content

KubeMQ Python SDK - Production-grade messaging with async support, pub/sub, queues, and commands/queries

Project description

KubeMQ Python SDK

PyPI version CI codecov Python 3.11+ License: MIT

Description

KubeMQ is an enterprise-grade message broker for containers, designed for any workload and architecture running in Kubernetes. This SDK provides a production-ready Python client supporting all KubeMQ messaging patterns: Events (pub/sub), Events Store (persistent pub/sub), Queues (with ack/reject), Commands, and Queries (RPC).

Table of Contents

Installation

Requires Python 3.11+.

pip install kubemq

For optional features:

pip install kubemq[docs]    # API reference generation
pip install kubemq[otel]    # OpenTelemetry integration

Quick Start

Prerequisites: Python 3.11+, KubeMQ server running at localhost:50000 (install KubeMQ)

Send and Receive Events

Send an event:

from kubemq import PubSubClient, EventMessage

with PubSubClient(address="localhost:50000") as client:
    client.publish_event(
        EventMessage(channel="quickstart", body=b"Hello KubeMQ!")
    )
    print("Event sent!")

Subscribe to events:

import time
from kubemq import PubSubClient, EventsSubscription, CancellationToken

def on_event(event):
    print(f"Received: {event.body.decode('utf-8')}")

with PubSubClient(address="localhost:50000") as client:
    client.subscribe_to_events(
        subscription=EventsSubscription(
            channel="quickstart",
            on_receive_event_callback=on_event,
            on_error_callback=lambda e: print(f"Error: {e}"),
        ),
        cancel=CancellationToken(),
    )
    time.sleep(60)  # Keep listening

See also: Queues Quick Start | RPC Quick Start

Messaging Patterns

Pattern Delivery Guarantee Use When Example Use Case
Events At-most-once Fire-and-forget broadcasting to multiple subscribers Real-time notifications, log streaming
Events Store At-least-once (persistent) Subscribers must not miss messages, even if offline Audit trails, event sourcing, replay
Queues At-least-once (with ack) Work must be processed exactly by one consumer with acknowledgment Job processing, task distribution
Commands At-most-once (request/reply) You need a response confirming the action was executed Device control, configuration changes
Queries At-most-once (request/reply) You need to retrieve data from a responder Data lookups, service-to-service reads

Events

Fire-and-forget pub/sub. Use PubSubClient (sync) or AsyncPubSubClient (async). View examples →

Events Store

Persistent pub/sub with replay. Subscribers can start from a sequence number, timestamp, or receive only new messages. View examples →

Queues

Pull-based message queues with acknowledgment, reject, requeue, dead-letter queues, and delayed delivery. View examples →

Commands

Request/reply where the sender expects confirmation of execution (no response payload). View examples →

Queries

Request/reply where the sender expects a data response. View examples →

Configuration

Option Type Default Description
address str "localhost:50000" KubeMQ server gRPC address
client_id str hostname Unique client identifier
auth_token str "" Authentication token
tls TLSConfig disabled TLS configuration (cert, key, CA files)
max_send_size int 104857600 Maximum send message size in bytes
max_receive_size int 104857600 Maximum receive message size in bytes
auto_reconnect bool True Auto-reconnect on connection loss
reconnect_interval_seconds int 1 Seconds between reconnect attempts
log_level int | None None (no logging) Python logging level

All clients accept these options as constructor arguments:

from kubemq import PubSubClient, TLSConfig

client = PubSubClient(
    address="kubemq-server:50000",
    client_id="my-service",
    auth_token="your-token",
    tls=TLSConfig(
        enabled=True,
        cert_file="/path/to/cert.pem",
        key_file="/path/to/key.pem",
        ca_file="/path/to/ca.pem",
    ),
)

Error Handling

All SDK errors extend KubeMQError:

from kubemq import (
    PubSubClient, EventMessage,
    KubeMQError, KubeMQConnectionError, KubeMQTimeoutError,
)

try:
    with PubSubClient(address="localhost:50000") as client:
        client.publish_event(
            EventMessage(channel="ch1", body=b"hello")
        )
except KubeMQConnectionError as e:
    print(f"Connection failed: {e}")
except KubeMQTimeoutError as e:
    print(f"Operation timed out: {e}")
except KubeMQError as e:
    print(f"KubeMQ error: {e}")

See the Troubleshooting Guide for solutions to common errors.

Troubleshooting

Problem Solution
Connection refused Ensure KubeMQ is running: docker run -p 50000:50000 kubemq/kubemq-community
Authentication failed Verify auth_token matches server configuration
Channel not found Create the channel first or enable auto-create on the server
Timeout / deadline exceeded Increase timeout_in_seconds or check network latency
No messages received Verify subscriber is connected before sender publishes

Full Troubleshooting Guide (11+ entries)

Security

See SECURITY.md for vulnerability reporting. The SDK supports TLS and mTLS connections — for configuration details, see How to Connect with TLS.

Additional Resources

Contributing

Contributions are welcome! See CONTRIBUTING.md for development setup, coding standards, and pull request guidelines.

License

This project is licensed under the MIT License — see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kubemq-4.1.0.tar.gz (320.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kubemq-4.1.0-py3-none-any.whl (187.1 kB view details)

Uploaded Python 3

File details

Details for the file kubemq-4.1.0.tar.gz.

File metadata

  • Download URL: kubemq-4.1.0.tar.gz
  • Upload date:
  • Size: 320.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.4

File hashes

Hashes for kubemq-4.1.0.tar.gz
Algorithm Hash digest
SHA256 4fa0279b7cf99f6bdb0eb0cf1e047a7abe8c37953669f5ce8bc50d3a69197db8
MD5 99de42c3275e382c2c8bce11aa043ea4
BLAKE2b-256 4a90f4c34bc9d1408655fad44aaa687dbac203308d5b40553dd3d016448cbd80

See more details on using hashes here.

File details

Details for the file kubemq-4.1.0-py3-none-any.whl.

File metadata

  • Download URL: kubemq-4.1.0-py3-none-any.whl
  • Upload date:
  • Size: 187.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.4

File hashes

Hashes for kubemq-4.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 25fdf12ea54f42f9a070c90f114defd7ff75f95eff21d293a73372342d79b4b5
MD5 0e7c97adaa41c6710c95e3c6618e0c49
BLAKE2b-256 884c1ac66546cee14c68136066b5f3e6a3537d9bb5af7cdb8cdddfecae8f3f7c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page