Skip to main content

Official Python SDK for TogoMQ - a modern, high-performance message queue service

Project description

TogoMQ SDK for Python

PyPI version Python versions License: MIT CI

The official Python SDK for TogoMQ - a modern, high-performance message queue service. This SDK provides a simple and intuitive API for publishing and subscribing to messages using gRPC streaming.

Features

  • 🚀 High Performance: Built on gRPC for efficient communication
  • 📡 Streaming Support: Native support for streaming pub/sub operations
  • 🔒 Secure: TLS encryption and token-based authentication
  • 🎯 Simple API: Easy-to-use client with fluent configuration
  • 📝 Comprehensive Logging: Configurable log levels for debugging
  • Thread-Safe: Safe for concurrent use with threading
  • Well Tested: Comprehensive test coverage
  • 🐍 Type Hints: Full typing support for better IDE integration

Requirements

  • Python 3.9 or higher
  • Access to a TogoMQ server
  • Valid TogoMQ authentication token

Installation

Install the SDK using pip:

pip install togomq-sdk

Quick Start

from togomq import Client, Config, Message

# Create client with your token
config = Config(token="your-token-here")
client = Client(config)

try:
    # Publish a message
    messages = [Message("orders", b"Hello TogoMQ!")]
    response = client.pub_batch(messages)
    print(f"Published {response.messages_received} messages")
finally:
    client.close()

Configuration

The SDK supports flexible configuration with sensible defaults:

Default Configuration

from togomq import Config, Client

# Create client with defaults (only token is required)
config = Config(token="your-token-here")
client = Client(config)

Configuration Options

Option Default Description
host q.togomq.io TogoMQ server hostname
port 5123 TogoMQ server port
log_level info Logging level (debug, info, warn, error, none)
token (required) Authentication token
use_tls True Whether to use TLS encryption

Custom Configuration

config = Config(
    token="your-token-here",
    host="custom.togomq.io",
    port=9000,
    log_level="debug",
    use_tls=True,
)
client = Client(config)

Usage

Publishing Messages

Note: Topic name is required for all published messages. Each message must specify a topic.

Publishing a Batch of Messages

from togomq import Client, Config, Message

# Create client
config = Config(token="your-token")
client = Client(config)

try:
    # Create messages - topic is required for each message
    messages = [
        Message("orders", b"order-1"),
        Message("orders", b"order-2").with_variables({
            "priority": "high",
            "customer": "12345",
        }),
        Message("orders", b"order-3")
            .with_postpone(60)      # Delay 60 seconds
            .with_retention(3600),  # Keep for 1 hour
    ]
    
    # Publish
    response = client.pub_batch(messages)
    print(f"Published {response.messages_received} messages")
finally:
    client.close()

Publishing via Generator (Streaming)

from typing import Generator

def message_generator() -> Generator[Message, None, None]:
    for i in range(100):
        msg = Message("events", f"event-{i}".encode())
        yield msg

# Publish streaming
with Client(config) as client:
    response = client.pub(message_generator())
    print(f"Published {response.messages_received} messages")

Subscribing to Messages

Note: Topic is required for subscriptions. Use wildcards like "orders.*" for pattern matching, or "*" to receive messages from all topics.

Basic Subscription

from togomq import Client, Config, SubscribeOptions

# Create client
config = Config(token="your-token")
client = Client(config)

try:
    # Subscribe to specific topic
    # Topic is required - use "*" to subscribe to all topics
    options = SubscribeOptions("orders")
    msg_gen, err_gen = client.sub(options)
    
    # Receive messages
    for msg in msg_gen:
        print(f"Received message from {msg.topic}: {msg.body.decode()}")
        print(f"Message UUID: {msg.uuid}")
        
        # Access variables
        if "priority" in msg.variables:
            print(f"Priority: {msg.variables['priority']}")
finally:
    client.close()

Advanced Subscription with Options

# Subscribe with batch size and rate limiting
# Default values: Batch = 0 (server default 1000), SpeedPerSec = 0 (unlimited)
options = (SubscribeOptions("orders.*")  # Wildcard topic
    .with_batch(10)                      # Receive up to 10 messages at once
    .with_speed_per_sec(100))            # Limit to 100 messages per second

msg_gen, err_gen = client.sub(options)

Subscription Options:

  • batch: Maximum number of messages to receive at once (default: 0 = server default 1000)
  • speed_per_sec: Rate limit for message delivery per second (default: 0 = unlimited)

Subscribe to All Topics (Wildcard)

# Subscribe to all topics using "*" wildcard
options = SubscribeOptions("*")  # "*" = all topics
msg_gen, err_gen = client.sub(options)

Subscribe with Pattern Wildcards

# Subscribe to all orders topics (orders.new, orders.updated, etc.)
options = SubscribeOptions("orders.*")
msg_gen, err_gen = client.sub(options)

# Subscribe to all topics
options = SubscribeOptions("*")
msg_gen, err_gen = client.sub(options)

Message Structure

Publishing Message

Important: Topic is required when publishing messages.

class Message:
    topic: str              # Message topic (required)
    body: bytes            # Message payload
    variables: Dict[str, str]  # Custom key-value metadata
    postpone: int          # Delay in seconds before message is available
    retention: int         # How long to keep message (seconds)

Received Message

class Message:
    topic: str              # Message topic
    uuid: str              # Unique message identifier
    body: bytes            # Message payload
    variables: Dict[str, str]  # Custom key-value metadata

Error Handling

The SDK provides detailed error information:

from togomq import TogoMQError, ErrorCode

try:
    response = client.pub_batch(messages)
except TogoMQError as e:
    # Check error type
    if e.code == ErrorCode.AUTH:
        print("Authentication failed")
    elif e.code == ErrorCode.CONNECTION:
        print("Connection error")
    elif e.code == ErrorCode.VALIDATION:
        print("Validation error")
    else:
        print(f"Error: {e}")

Error Codes

  • ErrorCode.CONNECTION - Connection or network errors
  • ErrorCode.AUTH - Authentication failures
  • ErrorCode.VALIDATION - Invalid input or configuration
  • ErrorCode.PUBLISH - Publishing errors
  • ErrorCode.SUBSCRIBE - Subscription errors
  • ErrorCode.STREAM - General streaming errors
  • ErrorCode.CONFIGURATION - Configuration errors

Logging

Control logging verbosity with the log_level configuration:

config = Config(
    token="your-token",
    log_level="debug",  # debug, info, warn, error, none
)

Log levels:

  • debug - All logs including debug information
  • info - Informational messages and above
  • warn - Warnings and errors only
  • error - Error messages only
  • none - Disable logging

Best Practices

  1. Reuse Clients: Create one client per application/thread and reuse it
  2. Handle Errors: Always check and handle errors appropriately
  3. Close Connections: Always close clients using client.close() or context manager
  4. Use Context Manager: Prefer with statement for automatic cleanup
  5. Batch Messages: Use pub_batch() for better performance when publishing multiple messages
  6. Monitor Subscriptions: Always handle both message and error generators in subscriptions
# ✅ Good - Using context manager
with Client(config) as client:
    client.pub_batch(messages)

# ✅ Good - Manual cleanup
client = Client(config)
try:
    client.pub_batch(messages)
finally:
    client.close()

Examples

Check out the examples/ directory for complete working examples:

  • examples/publish.py - Publishing examples
  • examples/subscribe.py - Subscription examples
  • examples/complete_workflow.py - Complete workflow with threading

Development

Setup Development Environment

# Clone the repository
git clone https://github.com/TogoMQ/togomq-sdk-python.git
cd togomq-sdk-python

# Install dependencies
pip install -e ".[dev]"

Running Tests

# Run all tests
pytest

# Run with coverage
pytest --cov=togomq --cov-report=html

Code Quality

# Format code
black togomq tests examples

# Lint code
ruff check togomq tests examples

# Type checking
mypy togomq

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add some amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

License

This project is licensed under the MIT License - see the LICENSE file for details.

Support

Related Projects

Changelog

See Releases for version history and changes. TogoMQ SDK library for Python

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

togomq_sdk-0.1.3.tar.gz (26.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

togomq_sdk-0.1.3-py3-none-any.whl (13.6 kB view details)

Uploaded Python 3

File details

Details for the file togomq_sdk-0.1.3.tar.gz.

File metadata

  • Download URL: togomq_sdk-0.1.3.tar.gz
  • Upload date:
  • Size: 26.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for togomq_sdk-0.1.3.tar.gz
Algorithm Hash digest
SHA256 3706d8bd3d691ad1dd4d4511154bb0a2a0bd7b5742d3e025f4f0450835c4243a
MD5 29a667c9a6050295486cf64c05ea48e8
BLAKE2b-256 3c7d00e3e7d91d8e04e5a0c219acd8f2520ed820ef4fe4eeab67d18a51c50fdf

See more details on using hashes here.

Provenance

The following attestation bundles were made for togomq_sdk-0.1.3.tar.gz:

Publisher: release.yml on TogoMQ/togomq-sdk-python

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file togomq_sdk-0.1.3-py3-none-any.whl.

File metadata

  • Download URL: togomq_sdk-0.1.3-py3-none-any.whl
  • Upload date:
  • Size: 13.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for togomq_sdk-0.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 f15f2a85421f7564771c76e87dce532a4ec946f1b7f0b6652efa29fbc8becd0e
MD5 cb40bb82287a817a479478ac3477cf77
BLAKE2b-256 abd36e08e3dfed7ffe45f03f32bb598c5a7142e50a659f303bcf93ad7aa7230a

See more details on using hashes here.

Provenance

The following attestation bundles were made for togomq_sdk-0.1.3-py3-none-any.whl:

Publisher: release.yml on TogoMQ/togomq-sdk-python

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page