Skip to main content

Official Python SDK for TogoMQ - a modern, high-performance message queue service

Project description

TogoMQ SDK for Python

PyPI version Python versions License: MIT

The official Python SDK for TogoMQ - a modern, high-performance message queue service. This SDK provides a simple and intuitive API for publishing and subscribing to messages using gRPC streaming.

Features

  • 🚀 High Performance: Built on gRPC for efficient communication
  • 📡 Streaming Support: Native support for streaming pub/sub operations
  • 🔒 Secure: TLS encryption and token-based authentication
  • 🎯 Simple API: Easy-to-use client with fluent configuration
  • 📝 Comprehensive Logging: Configurable log levels for debugging
  • Thread-Safe: Safe for concurrent use with threading
  • Well Tested: Comprehensive test coverage
  • 🐍 Type Hints: Full typing support for better IDE integration

Requirements

  • Python 3.9 or higher
  • Access to a TogoMQ server
  • Valid TogoMQ authentication token

Installation

Install the SDK using pip:

pip install togomq-sdk

Quick Start

from togomq import Client, Config, Message

# Create client with your token
config = Config(token="your-token-here")
client = Client(config)

try:
    # Publish a message
    messages = [Message("orders", b"Hello TogoMQ!")]
    response = client.pub_batch(messages)
    print(f"Published {response.messages_received} messages")
finally:
    client.close()

Configuration

The SDK supports flexible configuration with sensible defaults:

Default Configuration

from togomq import Config, Client

# Create client with defaults (only token is required)
config = Config(token="your-token-here")
client = Client(config)

Configuration Options

Option Default Description
host q.togomq.io TogoMQ server hostname
port 5123 TogoMQ server port
log_level info Logging level (debug, info, warn, error, none)
token (required) Authentication token
use_tls True Whether to use TLS encryption

Custom Configuration

config = Config(
    token="your-token-here",
    host="custom.togomq.io",
    port=9000,
    log_level="debug",
    use_tls=True,
)
client = Client(config)

Usage

Publishing Messages

Note: Topic name is required for all published messages. Each message must specify a topic.

Publishing a Batch of Messages

from togomq import Client, Config, Message

# Create client
config = Config(token="your-token")
client = Client(config)

try:
    # Create messages - topic is required for each message
    messages = [
        Message("orders", b"order-1"),
        Message("orders", b"order-2").with_variables({
            "priority": "high",
            "customer": "12345",
        }),
        Message("orders", b"order-3")
            .with_postpone(60)      # Delay 60 seconds
            .with_retention(3600),  # Keep for 1 hour
    ]
    
    # Publish
    response = client.pub_batch(messages)
    print(f"Published {response.messages_received} messages")
finally:
    client.close()

Publishing via Generator (Streaming)

from typing import Generator

def message_generator() -> Generator[Message, None, None]:
    for i in range(100):
        msg = Message("events", f"event-{i}".encode())
        yield msg

# Publish streaming
with Client(config) as client:
    response = client.pub(message_generator())
    print(f"Published {response.messages_received} messages")

Subscribing to Messages

Note: Topic is required for subscriptions. Use wildcards like "orders.*" for pattern matching, or "*" to receive messages from all topics.

Basic Subscription

from togomq import Client, Config, SubscribeOptions

# Create client
config = Config(token="your-token")
client = Client(config)

try:
    # Subscribe to specific topic
    # Topic is required - use "*" to subscribe to all topics
    options = SubscribeOptions("orders")
    msg_gen, err_gen = client.sub(options)
    
    # Receive messages
    for msg in msg_gen:
        print(f"Received message from {msg.topic}: {msg.body.decode()}")
        print(f"Message UUID: {msg.uuid}")
        
        # Access variables
        if "priority" in msg.variables:
            print(f"Priority: {msg.variables['priority']}")
finally:
    client.close()

Advanced Subscription with Options

# Subscribe with batch size and rate limiting
# Default values: Batch = 0 (server default 1000), SpeedPerSec = 0 (unlimited)
options = (SubscribeOptions("orders.*")  # Wildcard topic
    .with_batch(10)                      # Receive up to 10 messages at once
    .with_speed_per_sec(100))            # Limit to 100 messages per second

msg_gen, err_gen = client.sub(options)

Subscription Options:

  • batch: Maximum number of messages to receive at once (default: 0 = server default 1000)
  • speed_per_sec: Rate limit for message delivery per second (default: 0 = unlimited)

Subscribe to All Topics (Wildcard)

# Subscribe to all topics using "*" wildcard
options = SubscribeOptions("*")  # "*" = all topics
msg_gen, err_gen = client.sub(options)

Subscribe with Pattern Wildcards

# Subscribe to all orders topics (orders.new, orders.updated, etc.)
options = SubscribeOptions("orders.*")
msg_gen, err_gen = client.sub(options)

# Subscribe to all topics
options = SubscribeOptions("*")
msg_gen, err_gen = client.sub(options)

Message Structure

Publishing Message

Important: Topic is required when publishing messages.

class Message:
    topic: str              # Message topic (required)
    body: bytes            # Message payload
    variables: Dict[str, str]  # Custom key-value metadata
    postpone: int          # Delay in seconds before message is available
    retention: int         # How long to keep message (seconds)

Received Message

class Message:
    topic: str              # Message topic
    uuid: str              # Unique message identifier
    body: bytes            # Message payload
    variables: Dict[str, str]  # Custom key-value metadata

Error Handling

The SDK provides detailed error information:

from togomq import TogoMQError, ErrorCode

try:
    response = client.pub_batch(messages)
except TogoMQError as e:
    # Check error type
    if e.code == ErrorCode.AUTH:
        print("Authentication failed")
    elif e.code == ErrorCode.CONNECTION:
        print("Connection error")
    elif e.code == ErrorCode.VALIDATION:
        print("Validation error")
    else:
        print(f"Error: {e}")

Error Codes

  • ErrorCode.CONNECTION - Connection or network errors
  • ErrorCode.AUTH - Authentication failures
  • ErrorCode.VALIDATION - Invalid input or configuration
  • ErrorCode.PUBLISH - Publishing errors
  • ErrorCode.SUBSCRIBE - Subscription errors
  • ErrorCode.STREAM - General streaming errors
  • ErrorCode.CONFIGURATION - Configuration errors

Logging

Control logging verbosity with the log_level configuration:

config = Config(
    token="your-token",
    log_level="debug",  # debug, info, warn, error, none
)

Log levels:

  • debug - All logs including debug information
  • info - Informational messages and above
  • warn - Warnings and errors only
  • error - Error messages only
  • none - Disable logging

Best Practices

  1. Reuse Clients: Create one client per application/thread and reuse it
  2. Handle Errors: Always check and handle errors appropriately
  3. Close Connections: Always close clients using client.close() or context manager
  4. Use Context Manager: Prefer with statement for automatic cleanup
  5. Batch Messages: Use pub_batch() for better performance when publishing multiple messages
  6. Monitor Subscriptions: Always handle both message and error generators in subscriptions
# ✅ Good - Using context manager
with Client(config) as client:
    client.pub_batch(messages)

# ✅ Good - Manual cleanup
client = Client(config)
try:
    client.pub_batch(messages)
finally:
    client.close()

Examples

Check out the examples/ directory for complete working examples:

  • examples/publish.py - Publishing examples
  • examples/subscribe.py - Subscription examples
  • examples/complete_workflow.py - Complete workflow with threading

Development

Setup Development Environment

# Clone the repository
git clone https://github.com/TogoMQ/togomq-sdk-python.git
cd togomq-sdk-python

# Install dependencies
pip install -e ".[dev]"

Running Tests

# Run all tests
pytest

# Run with coverage
pytest --cov=togomq --cov-report=html

Code Quality

# Format code
black togomq tests examples

# Lint code
ruff check togomq tests examples

# Type checking
mypy togomq

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add some amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

License

This project is licensed under the MIT License - see the LICENSE file for details.

Support

Related Projects

Changelog

See Releases for version history and changes. TogoMQ SDK library for Python

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

togomq_sdk-0.1.1.tar.gz (26.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

togomq_sdk-0.1.1-py3-none-any.whl (13.5 kB view details)

Uploaded Python 3

File details

Details for the file togomq_sdk-0.1.1.tar.gz.

File metadata

  • Download URL: togomq_sdk-0.1.1.tar.gz
  • Upload date:
  • Size: 26.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for togomq_sdk-0.1.1.tar.gz
Algorithm Hash digest
SHA256 fd3fd08d2d71402af317a8aae7406b30d2b47dca07a026d1fa96915bc4d9431d
MD5 440ccfad9962c6699a2be08cccd64abe
BLAKE2b-256 be83d4852b16a84fce2ae6f15fe741d3e5d643ab4198957ff8648d1648ef352c

See more details on using hashes here.

Provenance

The following attestation bundles were made for togomq_sdk-0.1.1.tar.gz:

Publisher: release.yml on TogoMQ/togomq-sdk-python

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file togomq_sdk-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: togomq_sdk-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 13.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for togomq_sdk-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 c56b7125c9284c00d9e5a2bf618add361d6a2fbcd0b7462b854cac83876aff80
MD5 6f5bb1526d1bccca9ef4b77284c1d2ea
BLAKE2b-256 4870369400baa0953225aa509a1223e6dd626c1a9211c3b6140120e90805ce8a

See more details on using hashes here.

Provenance

The following attestation bundles were made for togomq_sdk-0.1.1-py3-none-any.whl:

Publisher: release.yml on TogoMQ/togomq-sdk-python

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page