Skip to main content

High-performance message queue client for Python backed by PostgreSQL

Project description

Queen MQ - Python Client

Modern, high-performance message queue client for Python

PyPI License Python

Quick StartFeaturesDocumentationExamples


What is Queen MQ?

Queen MQ is a PostgreSQL-backed message queue system with a powerful feature set:

  • FIFO Partitions - Unlimited ordered partitions within queues
  • Consumer Groups - Kafka-style consumer groups for scalability
  • Flexible Semantics - Exactly-once, at-least-once, and at-most-once delivery
  • Transactions - Atomic operations across push and ack
  • High Performance - 200K+ messages/sec with proper batching
  • Subscription Modes - Process from beginning, new messages only, or from timestamp
  • Dead Letter Queue - Automatic failure handling and monitoring
  • Message Tracing - Debug distributed workflows with trace timelines
  • Client-Side Buffering - 10x-100x throughput boost for high-volume pushes
  • Real-time Streaming - Windowed aggregation and processing

This client provides a fluent, async/await API for Python applications.


Installation

pip install queen-mq

Requirements: Python 3.8+


Quick Start

import asyncio
from queen import Queen

async def main():
    # Connect to Queen server
    async with Queen('http://localhost:6632') as queen:
        # Create a queue
        await queen.queue('tasks').create()
        
        # Push messages
        await queen.queue('tasks').push([
            {'data': {'task': 'send-email', 'to': 'alice@example.com'}}
        ])
        
        # Consume messages
        await queen.queue('tasks').consume(async def process(message):
            print('Processing:', message['data'])
            # Auto-ack on success, auto-retry on error
        )

asyncio.run(main())

Core Concepts

Queues

Logical containers for messages with configurable settings:

await queen.queue('orders').config({
    'leaseTime': 300,          # 5 minutes
    'retryLimit': 3,
    'priority': 5,
    'encryptionEnabled': False
}).create()

Partitions

Ordered lanes within a queue:

# All messages for user-123 are processed in order
await queen.queue('user-events').partition('user-123').push([
    {'data': {'event': 'login'}},
    {'data': {'event': 'view-page'}},
    {'data': {'event': 'logout'}}
])

Consumer Groups

Multiple consumers sharing work:

# Worker 1 & 2 share the load
await queen.queue('emails').group('processors').consume(async def handler(msg):
    await send_email(msg['data'])
)

# Separate group processes same messages independently
await queen.queue('emails').group('analytics').consume(async def handler(msg):
    await log_metrics(msg['data'])
)

Subscription Modes

Control whether consumer groups process historical messages:

# Default: Process ALL messages (including backlog)
await queen.queue('events').group('batch-analytics').consume(handler)

# Skip history, only new messages
await queen.queue('events').group('realtime-monitor').subscription_mode('new').consume(handler)

# Start from specific timestamp
await queen.queue('events').group('replay').subscription_from('2025-10-28T10:00:00.000Z').consume(handler)

Connection Options

Single Server

queen = Queen('http://localhost:6632')

Multiple Servers (High Availability)

queen = Queen(['http://server1:6632', 'http://server2:6632'])

Full Configuration

queen = Queen({
    'urls': ['http://server1:6632', 'http://server2:6632'],
    'timeout_millis': 30000,
    'retry_attempts': 3,
    'load_balancing_strategy': 'affinity',  # or 'round-robin', 'session'
    'enable_failover': True
})

Basic Usage Patterns

Push Messages

# Simple push
await queen.queue('tasks').push([
    {'data': {'job': 'resize-image', 'imageId': 123}}
])

# With partition
await queen.queue('tasks').partition('tenant-456').push([
    {'data': {'action': 'process'}}
])

# With custom transaction ID (for exactly-once)
await queen.queue('tasks').push([
    {'transactionId': 'unique-id-123', 'data': {'value': 42}}
])

Consume Messages (Long-Running Workers)

# Single message processing (batch=1, default)
# Handler receives a single message
await queen.queue('tasks').concurrency(10).consume(async def handler(message):
    await process_task(message['data'])
    # Auto-ack on success, auto-retry on error
)

# Batch processing (batch>1)
# Handler receives an array of messages
await queen.queue('tasks').batch(20).concurrency(5).consume(async def handler(messages):
    for message in messages:
        await process_task(message['data'])
)

# Process with limit and stop
await queen.queue('tasks').limit(100).consume(async def handler(message):
    await process_task(message['data'])
)

Pop Messages (On-Demand Processing)

# Grab messages manually
messages = await queen.queue('tasks').batch(10).wait(True).pop()

# Manual acknowledgment
for message in messages:
    try:
        await process_message(message['data'])
        await queen.ack(message, True)  # Success
    except Exception as error:
        await queen.ack(message, False)  # Retry

Transactions (Atomic Operations)

# Pop from queue A
messages = await queen.queue('input').pop()

# Atomically: ack input AND push output
await (queen.transaction()
    .ack(messages[0])
    .queue('output')
    .push([{'data': processed_result}])
    .commit())

Client-Side Buffering (High Throughput)

# Buffer messages locally, batch to server
for i in range(10000):
    await queen.queue('events').buffer({'message_count': 500, 'time_millis': 1000}).push([
        {'data': {'id': i}}
    ])

# Flush remaining buffered messages
await queen.flush_all_buffers()

# Result: 10x-100x faster than individual pushes

Dead Letter Queue

# Enable DLQ on queue
await queen.queue('risky').config({'retryLimit': 3, 'dlqAfterMaxRetries': True}).create()

# Query failed messages
dlq = await queen.queue('risky').dlq().limit(10).get()

print(f"Found {dlq['total']} failed messages")
for msg in dlq['messages']:
    print('Error:', msg.get('errorMessage'))

Message Tracing

await queen.queue('orders').consume(async def handler(msg):
    order_id = msg['data']['orderId']
    
    # Record trace with name for cross-service correlation
    await msg['trace']({
        'traceName': f"order-{order_id}",
        'eventType': 'info',
        'data': {'text': 'Order processing started'}
    })
    
    await process_order(msg['data'])
    
    await msg['trace']({
        'traceName': f"order-{order_id}",
        'eventType': 'processing',
        'data': {'text': 'Order completed', 'total': msg['data']['total']}
    })
)

# View traces in webapp: Traces → Search "order-12345"

API Reference

Queue Operations

# Create
await queen.queue('my-queue').create()
await queen.queue('my-queue').config({'priority': 5}).create()

# Delete
await queen.queue('my-queue').delete()

Push

await queen.queue('q').push([{'data': {'value': 1}}])
await queen.queue('q').partition('p1').push([{'data': {'value': 1}}])
await queen.queue('q').buffer({'message_count': 100, 'time_millis': 1000}).push([...])

Pop

msgs = await queen.queue('q').pop()
msgs = await queen.queue('q').batch(10).pop()
msgs = await queen.queue('q').batch(10).wait(True).pop()

Consume

# batch=1 (default): handler receives single message
await queen.queue('q').consume(async def handler(msg): ...)

# batch>1: handler receives array of messages
await queen.queue('q').batch(10).consume(async def handler(msgs): ...)

# Other options
await queen.queue('q').limit(10).consume(handler)
await queen.queue('q').concurrency(5).consume(handler)
await queen.queue('q').group('my-group').consume(handler)

Acknowledgment

await queen.ack(message, True)   # Success
await queen.ack(message, False)  # Retry
await queen.ack(message, False, {'error': 'reason'})
await queen.ack([msg1, msg2], True)  # Batch ack

Transactions

await (queen.transaction()
    .ack(message)
    .queue('output')
    .push([{'data': {'result': 'processed'}}])
    .commit())

Lease Renewal

await queen.renew(message)
await queen.renew([msg1, msg2, msg3])
await queen.queue('q').renew_lease(True, 60000).consume(handler)

Buffering

await queen.flush_all_buffers()
await queen.queue('q').flush_buffer()
stats = queen.get_buffer_stats()

Dead Letter Queue

dlq = await queen.queue('q').dlq().limit(10).get()
dlq = await queen.queue('q').dlq('consumer-group').limit(10).get()

Shutdown

await queen.close()  # Flush buffers and close connections

Configuration Defaults

Client Defaults

{
    'timeout_millis': 30000,
    'retry_attempts': 3,
    'retry_delay_millis': 1000,
    'load_balancing_strategy': 'affinity',
    'enable_failover': True
}

Queue Defaults

{
    'leaseTime': 300,           # 5 minutes
    'retryLimit': 3,
    'priority': 0,
    'delayedProcessing': 0,
    'windowBuffer': 0,
    'maxSize': 0,              # Unlimited
    'retentionSeconds': 0,     # Keep forever
    'encryptionEnabled': False
}

Consume Defaults

{
    'concurrency': 1,
    'batch': 1,
    'auto_ack': True,
    'wait': True,              # Long polling
    'timeout_millis': 30000,
    'limit': None,             # Run forever
    'renew_lease': False
}

Logging

Enable detailed logging for debugging:

export QUEEN_CLIENT_LOG=true
python your_app.py

Example output:

[2025-10-28T10:30:45.123Z] [INFO] [Queen.constructor] {"status":"initialized","urls":1}
[2025-10-28T10:30:45.234Z] [INFO] [QueueBuilder.push] {"queue":"tasks","partition":"Default","count":5}

Type Hints

Full type hints included for IDE support:

from queen import Queen, Message
from typing import Dict, Any

queen: Queen = Queen('http://localhost:6632')

async def handler(message: Message) -> None:
    data: Dict[str, Any] = message['data']
    print(data)

await queen.queue('orders').consume(handler)

Best Practices

  1. Use consume() for workers - Simpler API, handles retries automatically
  2. Use pop() for control - When you need precise control over acking
  3. Buffer for speed - Always use buffering when pushing many messages
  4. Partitions for order - Use partitions when message order matters
  5. Consumer groups for scale - Run multiple workers in the same group
  6. Transactions for consistency - Use transactions for atomic operations
  7. Enable DLQ - Always enable DLQ in production
  8. Renew long leases - Use auto-renewal for long-running tasks
  9. Graceful shutdown - Use async context manager or call queen.close()
  10. Monitor DLQ - Regularly check for failed messages

📝 Important Notes

Handler Signatures:

  • When batch=1 (default), handler receives a single message: async def handler(message): ...
  • When batch>1, handler receives an array of messages: async def handler(messages): ...
  • When each=True, always receives single messages regardless of batch size

Documentation


License

Apache 2.0 - See LICENSE


Support

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

queen_mq-0.12.0.tar.gz (51.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

queen_mq-0.12.0-py3-none-any.whl (47.8 kB view details)

Uploaded Python 3

File details

Details for the file queen_mq-0.12.0.tar.gz.

File metadata

  • Download URL: queen_mq-0.12.0.tar.gz
  • Upload date:
  • Size: 51.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.8

File hashes

Hashes for queen_mq-0.12.0.tar.gz
Algorithm Hash digest
SHA256 5e314d8f54f84399f44633018c33a129f1df46d23a6a5a15d6415a09ba9455d5
MD5 03887cf2ce2e70a4b0c0753b9d9aa76e
BLAKE2b-256 977000764cb862fec93ac479f0a94e35070a9668325e5f0599ab91a65d7902db

See more details on using hashes here.

File details

Details for the file queen_mq-0.12.0-py3-none-any.whl.

File metadata

  • Download URL: queen_mq-0.12.0-py3-none-any.whl
  • Upload date:
  • Size: 47.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.8

File hashes

Hashes for queen_mq-0.12.0-py3-none-any.whl
Algorithm Hash digest
SHA256 1cf354ed45462bf7ddf44f7b18b89c3f445d24a7395b6ecf10276eb16701a546
MD5 d29043dc856b2439f5629ca1b3f293d6
BLAKE2b-256 ae68c21df092aa94318303802030387b7a80667da1be80e819a3f5acd90fab62

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page