Official Python SDK for TogoMQ - a modern, high-performance message queue service
Project description
TogoMQ SDK for Python
The official Python SDK for TogoMQ - a modern, high-performance message queue service. This SDK provides a simple and intuitive API for publishing and subscribing to messages using gRPC streaming.
Features
- 🚀 High Performance: Built on gRPC for efficient communication
- 📡 Streaming Support: Native support for streaming pub/sub operations
- 🔒 Secure: TLS encryption and token-based authentication
- 🎯 Simple API: Easy-to-use client with fluent configuration
- 📝 Comprehensive Logging: Configurable log levels for debugging
- ⚡ Thread-Safe: Safe for concurrent use with threading
- ✅ Well Tested: Comprehensive test coverage
- 🐍 Type Hints: Full typing support for better IDE integration
Requirements
- Python 3.9 or higher
- Access to a TogoMQ server
- Valid TogoMQ authentication token
Installation
Install the SDK using pip:
pip install togomq-sdk
Quick Start
from togomq import Client, Config, Message
# Create client with your token
config = Config(token="your-token-here")
client = Client(config)
try:
# Publish a message
messages = [Message("orders", b"Hello TogoMQ!")]
response = client.pub_batch(messages)
print(f"Published {response.messages_received} messages")
finally:
client.close()
Configuration
The SDK supports flexible configuration with sensible defaults:
Default Configuration
from togomq import Config, Client
# Create client with defaults (only token is required)
config = Config(token="your-token-here")
client = Client(config)
Configuration Options
| Option | Default | Description |
|---|---|---|
| host | q.togomq.io | TogoMQ server hostname |
| port | 5123 | TogoMQ server port |
| log_level | info | Logging level (debug, info, warn, error, none) |
| token | (required) | Authentication token |
| use_tls | True | Whether to use TLS encryption |
Custom Configuration
config = Config(
token="your-token-here",
host="custom.togomq.io",
port=9000,
log_level="debug",
use_tls=True,
)
client = Client(config)
Usage
Publishing Messages
Note: Topic name is required for all published messages. Each message must specify a topic.
Publishing a Batch of Messages
from togomq import Client, Config, Message
# Create client
config = Config(token="your-token")
client = Client(config)
try:
# Create messages - topic is required for each message
messages = [
Message("orders", b"order-1"),
Message("orders", b"order-2").with_variables({
"priority": "high",
"customer": "12345",
}),
Message("orders", b"order-3")
.with_postpone(60) # Delay 60 seconds
.with_retention(3600), # Keep for 1 hour
]
# Publish
response = client.pub_batch(messages)
print(f"Published {response.messages_received} messages")
finally:
client.close()
Publishing via Generator (Streaming)
from typing import Generator
def message_generator() -> Generator[Message, None, None]:
for i in range(100):
msg = Message("events", f"event-{i}".encode())
yield msg
# Publish streaming
with Client(config) as client:
response = client.pub(message_generator())
print(f"Published {response.messages_received} messages")
Subscribing to Messages
Note: Topic is required for subscriptions. Use wildcards like "orders.*" for pattern matching, or "*" to receive messages from all topics.
Basic Subscription
from togomq import Client, Config, SubscribeOptions
# Create client
config = Config(token="your-token")
client = Client(config)
try:
# Subscribe to specific topic
# Topic is required - use "*" to subscribe to all topics
options = SubscribeOptions("orders")
msg_gen, err_gen = client.sub(options)
# Receive messages
for msg in msg_gen:
print(f"Received message from {msg.topic}: {msg.body.decode()}")
print(f"Message UUID: {msg.uuid}")
# Access variables
if "priority" in msg.variables:
print(f"Priority: {msg.variables['priority']}")
finally:
client.close()
Advanced Subscription with Options
# Subscribe with batch size and rate limiting
# Default values: Batch = 0 (server default 1000), SpeedPerSec = 0 (unlimited)
options = (SubscribeOptions("orders.*") # Wildcard topic
.with_batch(10) # Receive up to 10 messages at once
.with_speed_per_sec(100)) # Limit to 100 messages per second
msg_gen, err_gen = client.sub(options)
Subscription Options:
- batch: Maximum number of messages to receive at once (default: 0 = server default 1000)
- speed_per_sec: Rate limit for message delivery per second (default: 0 = unlimited)
Subscribe to All Topics (Wildcard)
# Subscribe to all topics using "*" wildcard
options = SubscribeOptions("*") # "*" = all topics
msg_gen, err_gen = client.sub(options)
Subscribe with Pattern Wildcards
# Subscribe to all orders topics (orders.new, orders.updated, etc.)
options = SubscribeOptions("orders.*")
msg_gen, err_gen = client.sub(options)
# Subscribe to all topics
options = SubscribeOptions("*")
msg_gen, err_gen = client.sub(options)
Message Structure
Publishing Message
Important: Topic is required when publishing messages.
class Message:
topic: str # Message topic (required)
body: bytes # Message payload
variables: Dict[str, str] # Custom key-value metadata
postpone: int # Delay in seconds before message is available
retention: int # How long to keep message (seconds)
Received Message
class Message:
topic: str # Message topic
uuid: str # Unique message identifier
body: bytes # Message payload
variables: Dict[str, str] # Custom key-value metadata
Error Handling
The SDK provides detailed error information:
from togomq import TogoMQError, ErrorCode
try:
response = client.pub_batch(messages)
except TogoMQError as e:
# Check error type
if e.code == ErrorCode.AUTH:
print("Authentication failed")
elif e.code == ErrorCode.CONNECTION:
print("Connection error")
elif e.code == ErrorCode.VALIDATION:
print("Validation error")
else:
print(f"Error: {e}")
Error Codes
ErrorCode.CONNECTION- Connection or network errorsErrorCode.AUTH- Authentication failuresErrorCode.VALIDATION- Invalid input or configurationErrorCode.PUBLISH- Publishing errorsErrorCode.SUBSCRIBE- Subscription errorsErrorCode.STREAM- General streaming errorsErrorCode.CONFIGURATION- Configuration errors
Logging
Control logging verbosity with the log_level configuration:
config = Config(
token="your-token",
log_level="debug", # debug, info, warn, error, none
)
Log levels:
debug- All logs including debug informationinfo- Informational messages and abovewarn- Warnings and errors onlyerror- Error messages onlynone- Disable logging
Best Practices
- Reuse Clients: Create one client per application/thread and reuse it
- Handle Errors: Always check and handle errors appropriately
- Close Connections: Always close clients using
client.close()or context manager - Use Context Manager: Prefer
withstatement for automatic cleanup - Batch Messages: Use
pub_batch()for better performance when publishing multiple messages - Monitor Subscriptions: Always handle both message and error generators in subscriptions
# ✅ Good - Using context manager
with Client(config) as client:
client.pub_batch(messages)
# ✅ Good - Manual cleanup
client = Client(config)
try:
client.pub_batch(messages)
finally:
client.close()
Examples
Check out the examples/ directory for complete working examples:
examples/publish.py- Publishing examplesexamples/subscribe.py- Subscription examplesexamples/complete_workflow.py- Complete workflow with threading
Development
Setup Development Environment
# Clone the repository
git clone https://github.com/TogoMQ/togomq-sdk-python.git
cd togomq-sdk-python
# Install dependencies
pip install -e ".[dev]"
Running Tests
# Run all tests
pytest
# Run with coverage
pytest --cov=togomq --cov-report=html
Code Quality
# Format code
black togomq tests examples
# Lint code
ruff check togomq tests examples
# Type checking
mypy togomq
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add some amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
License
This project is licensed under the MIT License - see the LICENSE file for details.
Support
- Documentation: https://togomq.io/docs
- Issues: https://github.com/TogoMQ/togomq-sdk-python/issues
- TogoMQ Website: https://togomq.io
Related Projects
- togomq-grpc-python - Auto-generated gRPC protobuf definitions
- togomq-sdk-go - TogoMQ SDK for Go
Changelog
See Releases for version history and changes. TogoMQ SDK library for Python
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file togomq_sdk-0.1.1.tar.gz.
File metadata
- Download URL: togomq_sdk-0.1.1.tar.gz
- Upload date:
- Size: 26.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fd3fd08d2d71402af317a8aae7406b30d2b47dca07a026d1fa96915bc4d9431d
|
|
| MD5 |
440ccfad9962c6699a2be08cccd64abe
|
|
| BLAKE2b-256 |
be83d4852b16a84fce2ae6f15fe741d3e5d643ab4198957ff8648d1648ef352c
|
Provenance
The following attestation bundles were made for togomq_sdk-0.1.1.tar.gz:
Publisher:
release.yml on TogoMQ/togomq-sdk-python
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
togomq_sdk-0.1.1.tar.gz -
Subject digest:
fd3fd08d2d71402af317a8aae7406b30d2b47dca07a026d1fa96915bc4d9431d - Sigstore transparency entry: 680503094
- Sigstore integration time:
-
Permalink:
TogoMQ/togomq-sdk-python@e0acd88d1223092cf4280f63e70c422ebec4b431 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/TogoMQ
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@e0acd88d1223092cf4280f63e70c422ebec4b431 -
Trigger Event:
push
-
Statement type:
File details
Details for the file togomq_sdk-0.1.1-py3-none-any.whl.
File metadata
- Download URL: togomq_sdk-0.1.1-py3-none-any.whl
- Upload date:
- Size: 13.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c56b7125c9284c00d9e5a2bf618add361d6a2fbcd0b7462b854cac83876aff80
|
|
| MD5 |
6f5bb1526d1bccca9ef4b77284c1d2ea
|
|
| BLAKE2b-256 |
4870369400baa0953225aa509a1223e6dd626c1a9211c3b6140120e90805ce8a
|
Provenance
The following attestation bundles were made for togomq_sdk-0.1.1-py3-none-any.whl:
Publisher:
release.yml on TogoMQ/togomq-sdk-python
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
togomq_sdk-0.1.1-py3-none-any.whl -
Subject digest:
c56b7125c9284c00d9e5a2bf618add361d6a2fbcd0b7462b854cac83876aff80 - Sigstore transparency entry: 680503126
- Sigstore integration time:
-
Permalink:
TogoMQ/togomq-sdk-python@e0acd88d1223092cf4280f63e70c422ebec4b431 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/TogoMQ
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@e0acd88d1223092cf4280f63e70c422ebec4b431 -
Trigger Event:
push
-
Statement type: