High-performance message queue client for Python backed by PostgreSQL
Project description
Queen MQ - Python Client
Modern, high-performance message queue client for Python
What is Queen MQ?
Queen MQ is a PostgreSQL-backed message queue system with a powerful feature set:
- FIFO Partitions - Unlimited ordered partitions within queues
- Consumer Groups - Kafka-style consumer groups for scalability
- Flexible Semantics - Exactly-once, at-least-once, and at-most-once delivery
- Transactions - Atomic operations across push and ack
- High Performance - 200K+ messages/sec with proper batching
- Subscription Modes - Process from beginning, new messages only, or from timestamp
- Dead Letter Queue - Automatic failure handling and monitoring
- Message Tracing - Debug distributed workflows with trace timelines
- Client-Side Buffering - 10x-100x throughput boost for high-volume pushes
- Real-time Streaming - Windowed aggregation and processing
This client provides a fluent, async/await API for Python applications.
Installation
pip install queen-mq
Requirements: Python 3.8+
Quick Start
import asyncio
from queen import Queen
async def main():
# Connect to Queen server
async with Queen('http://localhost:6632') as queen:
# Create a queue
await queen.queue('tasks').create()
# Push messages
await queen.queue('tasks').push([
{'data': {'task': 'send-email', 'to': 'alice@example.com'}}
])
# Consume messages
await queen.queue('tasks').consume(async def process(message):
print('Processing:', message['data'])
# Auto-ack on success, auto-retry on error
)
asyncio.run(main())
Core Concepts
Queues
Logical containers for messages with configurable settings:
await queen.queue('orders').config({
'leaseTime': 300, # 5 minutes
'retryLimit': 3,
'priority': 5,
'encryptionEnabled': False
}).create()
Partitions
Ordered lanes within a queue:
# All messages for user-123 are processed in order
await queen.queue('user-events').partition('user-123').push([
{'data': {'event': 'login'}},
{'data': {'event': 'view-page'}},
{'data': {'event': 'logout'}}
])
Consumer Groups
Multiple consumers sharing work:
# Worker 1 & 2 share the load
await queen.queue('emails').group('processors').consume(async def handler(msg):
await send_email(msg['data'])
)
# Separate group processes same messages independently
await queen.queue('emails').group('analytics').consume(async def handler(msg):
await log_metrics(msg['data'])
)
Subscription Modes
Control whether consumer groups process historical messages:
# Default: Process ALL messages (including backlog)
await queen.queue('events').group('batch-analytics').consume(handler)
# Skip history, only new messages
await queen.queue('events').group('realtime-monitor').subscription_mode('new').consume(handler)
# Start from specific timestamp
await queen.queue('events').group('replay').subscription_from('2025-10-28T10:00:00.000Z').consume(handler)
Connection Options
Single Server
queen = Queen('http://localhost:6632')
Multiple Servers (High Availability)
queen = Queen(['http://server1:6632', 'http://server2:6632'])
Full Configuration
queen = Queen({
'urls': ['http://server1:6632', 'http://server2:6632'],
'timeout_millis': 30000,
'retry_attempts': 3,
'load_balancing_strategy': 'affinity', # or 'round-robin', 'session'
'enable_failover': True
})
Basic Usage Patterns
Push Messages
# Simple push
await queen.queue('tasks').push([
{'data': {'job': 'resize-image', 'imageId': 123}}
])
# With partition
await queen.queue('tasks').partition('tenant-456').push([
{'data': {'action': 'process'}}
])
# With custom transaction ID (for exactly-once)
await queen.queue('tasks').push([
{'transactionId': 'unique-id-123', 'data': {'value': 42}}
])
Consume Messages (Long-Running Workers)
# Single message processing (batch=1, default)
# Handler receives a single message
await queen.queue('tasks').concurrency(10).consume(async def handler(message):
await process_task(message['data'])
# Auto-ack on success, auto-retry on error
)
# Batch processing (batch>1)
# Handler receives an array of messages
await queen.queue('tasks').batch(20).concurrency(5).consume(async def handler(messages):
for message in messages:
await process_task(message['data'])
)
# Process with limit and stop
await queen.queue('tasks').limit(100).consume(async def handler(message):
await process_task(message['data'])
)
Pop Messages (On-Demand Processing)
# Grab messages manually
messages = await queen.queue('tasks').batch(10).wait(True).pop()
# Manual acknowledgment
for message in messages:
try:
await process_message(message['data'])
await queen.ack(message, True) # Success
except Exception as error:
await queen.ack(message, False) # Retry
Transactions (Atomic Operations)
# Pop from queue A
messages = await queen.queue('input').pop()
# Atomically: ack input AND push output
await (queen.transaction()
.ack(messages[0])
.queue('output')
.push([{'data': processed_result}])
.commit())
Client-Side Buffering (High Throughput)
# Buffer messages locally, batch to server
for i in range(10000):
await queen.queue('events').buffer({'message_count': 500, 'time_millis': 1000}).push([
{'data': {'id': i}}
])
# Flush remaining buffered messages
await queen.flush_all_buffers()
# Result: 10x-100x faster than individual pushes
Dead Letter Queue
# Enable DLQ on queue
await queen.queue('risky').config({'retryLimit': 3, 'dlqAfterMaxRetries': True}).create()
# Query failed messages
dlq = await queen.queue('risky').dlq().limit(10).get()
print(f"Found {dlq['total']} failed messages")
for msg in dlq['messages']:
print('Error:', msg.get('errorMessage'))
Message Tracing
await queen.queue('orders').consume(async def handler(msg):
order_id = msg['data']['orderId']
# Record trace with name for cross-service correlation
await msg['trace']({
'traceName': f"order-{order_id}",
'eventType': 'info',
'data': {'text': 'Order processing started'}
})
await process_order(msg['data'])
await msg['trace']({
'traceName': f"order-{order_id}",
'eventType': 'processing',
'data': {'text': 'Order completed', 'total': msg['data']['total']}
})
)
# View traces in webapp: Traces → Search "order-12345"
API Reference
Queue Operations
# Create
await queen.queue('my-queue').create()
await queen.queue('my-queue').config({'priority': 5}).create()
# Delete
await queen.queue('my-queue').delete()
Push
await queen.queue('q').push([{'data': {'value': 1}}])
await queen.queue('q').partition('p1').push([{'data': {'value': 1}}])
await queen.queue('q').buffer({'message_count': 100, 'time_millis': 1000}).push([...])
Pop
msgs = await queen.queue('q').pop()
msgs = await queen.queue('q').batch(10).pop()
msgs = await queen.queue('q').batch(10).wait(True).pop()
Consume
# batch=1 (default): handler receives single message
await queen.queue('q').consume(async def handler(msg): ...)
# batch>1: handler receives array of messages
await queen.queue('q').batch(10).consume(async def handler(msgs): ...)
# Other options
await queen.queue('q').limit(10).consume(handler)
await queen.queue('q').concurrency(5).consume(handler)
await queen.queue('q').group('my-group').consume(handler)
Acknowledgment
await queen.ack(message, True) # Success
await queen.ack(message, False) # Retry
await queen.ack(message, False, {'error': 'reason'})
await queen.ack([msg1, msg2], True) # Batch ack
Transactions
await (queen.transaction()
.ack(message)
.queue('output')
.push([{'data': {'result': 'processed'}}])
.commit())
Lease Renewal
await queen.renew(message)
await queen.renew([msg1, msg2, msg3])
await queen.queue('q').renew_lease(True, 60000).consume(handler)
Buffering
await queen.flush_all_buffers()
await queen.queue('q').flush_buffer()
stats = queen.get_buffer_stats()
Dead Letter Queue
dlq = await queen.queue('q').dlq().limit(10).get()
dlq = await queen.queue('q').dlq('consumer-group').limit(10).get()
Shutdown
await queen.close() # Flush buffers and close connections
Configuration Defaults
Client Defaults
{
'timeout_millis': 30000,
'retry_attempts': 3,
'retry_delay_millis': 1000,
'load_balancing_strategy': 'affinity',
'enable_failover': True
}
Queue Defaults
{
'leaseTime': 300, # 5 minutes
'retryLimit': 3,
'priority': 0,
'delayedProcessing': 0,
'windowBuffer': 0,
'maxSize': 0, # Unlimited
'retentionSeconds': 0, # Keep forever
'encryptionEnabled': False
}
Consume Defaults
{
'concurrency': 1,
'batch': 1,
'auto_ack': True,
'wait': True, # Long polling
'timeout_millis': 30000,
'limit': None, # Run forever
'renew_lease': False
}
Logging
Enable detailed logging for debugging:
export QUEEN_CLIENT_LOG=true
python your_app.py
Example output:
[2025-10-28T10:30:45.123Z] [INFO] [Queen.constructor] {"status":"initialized","urls":1}
[2025-10-28T10:30:45.234Z] [INFO] [QueueBuilder.push] {"queue":"tasks","partition":"Default","count":5}
Type Hints
Full type hints included for IDE support:
from queen import Queen, Message
from typing import Dict, Any
queen: Queen = Queen('http://localhost:6632')
async def handler(message: Message) -> None:
data: Dict[str, Any] = message['data']
print(data)
await queen.queue('orders').consume(handler)
Best Practices
- ✅ Use
consume()for workers - Simpler API, handles retries automatically - ✅ Use
pop()for control - When you need precise control over acking - ✅ Buffer for speed - Always use buffering when pushing many messages
- ✅ Partitions for order - Use partitions when message order matters
- ✅ Consumer groups for scale - Run multiple workers in the same group
- ✅ Transactions for consistency - Use transactions for atomic operations
- ✅ Enable DLQ - Always enable DLQ in production
- ✅ Renew long leases - Use auto-renewal for long-running tasks
- ✅ Graceful shutdown - Use async context manager or call
queen.close() - ✅ Monitor DLQ - Regularly check for failed messages
📝 Important Notes
Handler Signatures:
- When
batch=1(default), handler receives a single message:async def handler(message): ... - When
batch>1, handler receives an array of messages:async def handler(messages): ... - When
each=True, always receives single messages regardless of batch size
Documentation
- Node.js Client - Node.js client documentation
- HTTP API Reference - Raw HTTP endpoints
- Server Guide - Server setup and configuration
- Architecture Guide - Deep dive into internals
License
Apache 2.0 - See LICENSE
Support
- GitHub: smartpricing/queen
- Issues: GitHub Issues
- LinkedIn: Smartness
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file queen_mq-0.12.2.tar.gz.
File metadata
- Download URL: queen_mq-0.12.2.tar.gz
- Upload date:
- Size: 52.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
53d77940f9a11a5f4085f9a3c713ca4a3e9c9848ee489f1072f86e92b673bf3d
|
|
| MD5 |
cc28c8a1caa8298eb7f46a22f9b2deb2
|
|
| BLAKE2b-256 |
d90552f3b8d6a959412f7b7a8459a071fb02447f39b984bafa59d95e3309071d
|
File details
Details for the file queen_mq-0.12.2-py3-none-any.whl.
File metadata
- Download URL: queen_mq-0.12.2-py3-none-any.whl
- Upload date:
- Size: 48.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
44316013756818ba264d04a154fe0669d21636621b3b328c4b8f9af9ba28b95d
|
|
| MD5 |
cbbd52a8ae552ac9c1098117ff3c4a56
|
|
| BLAKE2b-256 |
aa943053b3aa48d6c4b8d967f316d170b8ca027541f336794f5284dc9578b52b
|