Skip to main content

KubeMQ Python SDK - Production-grade messaging with async support, pub/sub, queues, and commands/queries

Project description

KubeMQ Python SDK

PyPI version CI codecov Python 3.11+ License: MIT

Description

KubeMQ is an enterprise-grade message broker for containers, designed for any workload and architecture running in Kubernetes. This SDK provides a production-ready Python client supporting all KubeMQ messaging patterns: Events (pub/sub), Events Store (persistent pub/sub), Queues (with ack/reject), Commands, and Queries (RPC).

Table of Contents

Installation

Requires Python 3.11+.

pip install kubemq

For optional features:

pip install kubemq[docs]    # API reference generation
pip install kubemq[otel]    # OpenTelemetry integration

Quick Start

Prerequisites: Python 3.11+, KubeMQ server running at localhost:50000 (install KubeMQ)

Send and Receive Events

Send an event:

from kubemq import PubSubClient, EventMessage

with PubSubClient(address="localhost:50000") as client:
    client.publish_event(
        EventMessage(channel="quickstart", body=b"Hello KubeMQ!")
    )
    print("Event sent!")

Subscribe to events:

import time
from kubemq import PubSubClient, EventsSubscription, CancellationToken

def on_event(event):
    print(f"Received: {event.body.decode('utf-8')}")

with PubSubClient(address="localhost:50000") as client:
    client.subscribe_to_events(
        subscription=EventsSubscription(
            channel="quickstart",
            on_receive_event_callback=on_event,
            on_error_callback=lambda e: print(f"Error: {e}"),
        ),
        cancel=CancellationToken(),
    )
    time.sleep(60)  # Keep listening

See also: Queues Quick Start | RPC Quick Start

Messaging Patterns

Pattern Delivery Guarantee Use When Example Use Case
Events At-most-once Fire-and-forget broadcasting to multiple subscribers Real-time notifications, log streaming
Events Store At-least-once (persistent) Subscribers must not miss messages, even if offline Audit trails, event sourcing, replay
Queues At-least-once (with ack) Work must be processed exactly by one consumer with acknowledgment Job processing, task distribution
Commands At-most-once (request/reply) You need a response confirming the action was executed Device control, configuration changes
Queries At-most-once (request/reply) You need to retrieve data from a responder Data lookups, service-to-service reads

Events

Fire-and-forget pub/sub. Use PubSubClient (sync) or AsyncPubSubClient (async). View examples →

Events Store

Persistent pub/sub with replay. Subscribers can start from a sequence number, timestamp, or receive only new messages. View examples →

Queues

Pull-based message queues with acknowledgment, reject, requeue, dead-letter queues, and delayed delivery. View examples →

Commands

Request/reply where the sender expects confirmation of execution (no response payload). View examples →

Queries

Request/reply where the sender expects a data response. View examples →

Configuration

Option Type Default Description
address str "localhost:50000" KubeMQ server gRPC address
client_id str hostname Unique client identifier
auth_token str "" Authentication token
tls TLSConfig disabled TLS configuration (cert, key, CA files)
max_send_size int 104857600 Maximum send message size in bytes
max_receive_size int 104857600 Maximum receive message size in bytes
auto_reconnect bool True Auto-reconnect on connection loss
reconnect_interval_seconds int 1 Seconds between reconnect attempts
log_level int | None None (no logging) Python logging level

All clients accept these options as constructor arguments:

from kubemq import PubSubClient, TLSConfig

client = PubSubClient(
    address="kubemq-server:50000",
    client_id="my-service",
    auth_token="your-token",
    tls=TLSConfig(
        enabled=True,
        cert_file="/path/to/cert.pem",
        key_file="/path/to/key.pem",
        ca_file="/path/to/ca.pem",
    ),
)

Error Handling

All SDK errors extend KubeMQError:

from kubemq import (
    PubSubClient, EventMessage,
    KubeMQError, KubeMQConnectionError, KubeMQTimeoutError,
)

try:
    with PubSubClient(address="localhost:50000") as client:
        client.publish_event(
            EventMessage(channel="ch1", body=b"hello")
        )
except KubeMQConnectionError as e:
    print(f"Connection failed: {e}")
except KubeMQTimeoutError as e:
    print(f"Operation timed out: {e}")
except KubeMQError as e:
    print(f"KubeMQ error: {e}")

See the Troubleshooting Guide for solutions to common errors.

Troubleshooting

Problem Solution
Connection refused Ensure KubeMQ is running: docker run -p 50000:50000 kubemq/kubemq-community
Authentication failed Verify auth_token matches server configuration
Channel not found Create the channel first or enable auto-create on the server
Timeout / deadline exceeded Increase timeout_in_seconds or check network latency
No messages received Verify subscriber is connected before sender publishes

Full Troubleshooting Guide (11+ entries)

Security

See SECURITY.md for vulnerability reporting. The SDK supports TLS and mTLS connections — for configuration details, see How to Connect with TLS.

Additional Resources

Contributing

Contributions are welcome! See CONTRIBUTING.md for development setup, coding standards, and pull request guidelines.

License

This project is licensed under the MIT License — see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kubemq-4.1.2.tar.gz (320.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kubemq-4.1.2-py3-none-any.whl (187.1 kB view details)

Uploaded Python 3

File details

Details for the file kubemq-4.1.2.tar.gz.

File metadata

  • Download URL: kubemq-4.1.2.tar.gz
  • Upload date:
  • Size: 320.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.4

File hashes

Hashes for kubemq-4.1.2.tar.gz
Algorithm Hash digest
SHA256 c6ef085334efdbcc4e5c51b62cfbccfd657f6fd4767c8ea38fb2574255c0aff5
MD5 9323041f0410ce2858c607fb35065ded
BLAKE2b-256 e8b365616bebedbaf50f176a0106e6bd3958e7f19e014f2d66eab7e64e398f4b

See more details on using hashes here.

File details

Details for the file kubemq-4.1.2-py3-none-any.whl.

File metadata

  • Download URL: kubemq-4.1.2-py3-none-any.whl
  • Upload date:
  • Size: 187.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.4

File hashes

Hashes for kubemq-4.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 a1c4c71cd27eb4e6ce5cdca7d62b96664a4f58ab32e8570ce72872e034870cd8
MD5 3c0a978ae6ee11ccebf4362887047865
BLAKE2b-256 81c6b687540d943abe3fe8c14a864de924414954df6086ec4c67ba42d23e06cb

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page