Skip to main content

KubeMQ Python SDK - Production-grade messaging with async support, pub/sub, queues, and commands/queries

Project description

KubeMQ Python SDK

PyPI version CI codecov Python 3.11+ License: MIT

Description

KubeMQ is an enterprise-grade message broker for containers, designed for any workload and architecture running in Kubernetes. This SDK provides a production-ready Python client supporting all KubeMQ messaging patterns: Events (pub/sub), Events Store (persistent pub/sub), Queues (with ack/reject), Commands, and Queries (RPC).

Table of Contents

Installation

Requires Python 3.11+.

pip install kubemq

For optional features:

pip install kubemq[docs]    # API reference generation
pip install kubemq[otel]    # OpenTelemetry integration

Quick Start

Prerequisites: Python 3.11+, KubeMQ server running at localhost:50000 (install KubeMQ)

Send and Receive Events

Send an event:

from kubemq import PubSubClient, EventMessage

with PubSubClient(address="localhost:50000") as client:
    client.publish_event(
        EventMessage(channel="quickstart", body=b"Hello KubeMQ!")
    )
    print("Event sent!")

Subscribe to events:

import time
from kubemq import PubSubClient, EventsSubscription, CancellationToken

def on_event(event):
    print(f"Received: {event.body.decode('utf-8')}")

with PubSubClient(address="localhost:50000") as client:
    client.subscribe_to_events(
        subscription=EventsSubscription(
            channel="quickstart",
            on_receive_event_callback=on_event,
            on_error_callback=lambda e: print(f"Error: {e}"),
        ),
        cancel=CancellationToken(),
    )
    time.sleep(60)  # Keep listening

See also: Queues Quick Start | RPC Quick Start

Messaging Patterns

Pattern Delivery Guarantee Use When Example Use Case
Events At-most-once Fire-and-forget broadcasting to multiple subscribers Real-time notifications, log streaming
Events Store At-least-once (persistent) Subscribers must not miss messages, even if offline Audit trails, event sourcing, replay
Queues At-least-once (with ack) Work must be processed exactly by one consumer with acknowledgment Job processing, task distribution
Commands At-most-once (request/reply) You need a response confirming the action was executed Device control, configuration changes
Queries At-most-once (request/reply) You need to retrieve data from a responder Data lookups, service-to-service reads

Events

Fire-and-forget pub/sub. Use PubSubClient (sync) or AsyncPubSubClient (async). View examples →

Events Store

Persistent pub/sub with replay. Subscribers can start from a sequence number, timestamp, or receive only new messages. View examples →

Queues

Pull-based message queues with acknowledgment, reject, requeue, dead-letter queues, and delayed delivery. View examples →

Commands

Request/reply where the sender expects confirmation of execution (no response payload). View examples →

Queries

Request/reply where the sender expects a data response. View examples →

Configuration

Option Type Default Description
address str "localhost:50000" KubeMQ server gRPC address
client_id str hostname Unique client identifier
auth_token str "" Authentication token
tls TLSConfig disabled TLS configuration (cert, key, CA files)
max_send_size int 104857600 Maximum send message size in bytes
max_receive_size int 104857600 Maximum receive message size in bytes
auto_reconnect bool True Auto-reconnect on connection loss
reconnect_interval_seconds int 1 Seconds between reconnect attempts
log_level int | None None (no logging) Python logging level

All clients accept these options as constructor arguments:

from kubemq import PubSubClient, TLSConfig

client = PubSubClient(
    address="kubemq-server:50000",
    client_id="my-service",
    auth_token="your-token",
    tls=TLSConfig(
        enabled=True,
        cert_file="/path/to/cert.pem",
        key_file="/path/to/key.pem",
        ca_file="/path/to/ca.pem",
    ),
)

Error Handling

All SDK errors extend KubeMQError:

from kubemq import (
    PubSubClient, EventMessage,
    KubeMQError, KubeMQConnectionError, KubeMQTimeoutError,
)

try:
    with PubSubClient(address="localhost:50000") as client:
        client.publish_event(
            EventMessage(channel="ch1", body=b"hello")
        )
except KubeMQConnectionError as e:
    print(f"Connection failed: {e}")
except KubeMQTimeoutError as e:
    print(f"Operation timed out: {e}")
except KubeMQError as e:
    print(f"KubeMQ error: {e}")

See the Troubleshooting Guide for solutions to common errors.

Troubleshooting

Problem Solution
Connection refused Ensure KubeMQ is running: docker run -p 50000:50000 kubemq/kubemq-community
Authentication failed Verify auth_token matches server configuration
Channel not found Create the channel first or enable auto-create on the server
Timeout / deadline exceeded Increase timeout_in_seconds or check network latency
No messages received Verify subscriber is connected before sender publishes

Full Troubleshooting Guide (11+ entries)

Security

See SECURITY.md for vulnerability reporting. The SDK supports TLS and mTLS connections — for configuration details, see How to Connect with TLS.

Additional Resources

Contributing

Contributions are welcome! See CONTRIBUTING.md for development setup, coding standards, and pull request guidelines.

License

This project is licensed under the MIT License — see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kubemq-4.1.4.tar.gz (322.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kubemq-4.1.4-py3-none-any.whl (187.2 kB view details)

Uploaded Python 3

File details

Details for the file kubemq-4.1.4.tar.gz.

File metadata

  • Download URL: kubemq-4.1.4.tar.gz
  • Upload date:
  • Size: 322.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.4

File hashes

Hashes for kubemq-4.1.4.tar.gz
Algorithm Hash digest
SHA256 39a3b17b5f338d2074cc3be817f60be226ff302901a4935018da82feabc566f0
MD5 c0805fe12f1aa572382603de28aeab85
BLAKE2b-256 10e99dba165e19b1f9ec0bca99a726f2902a6c3480a5ee904bc946acc00d846b

See more details on using hashes here.

File details

Details for the file kubemq-4.1.4-py3-none-any.whl.

File metadata

  • Download URL: kubemq-4.1.4-py3-none-any.whl
  • Upload date:
  • Size: 187.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.4

File hashes

Hashes for kubemq-4.1.4-py3-none-any.whl
Algorithm Hash digest
SHA256 6e02aa7c9c3f3d6a5cbf7467e7c5f9c324408bc409590b327d14bd44fb78618e
MD5 60c5499bff13b223398481372997f9d9
BLAKE2b-256 9fd079a1733a5bce53a5695a0682f208a5c14b354c46ada5acde2683cd36e5a1

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page