Skip to main content

High-performance message queue client for Python backed by PostgreSQL

Project description

Queen MQ - Python Client

Modern, high-performance message queue client for Python

PyPI License Python

Quick StartFeaturesDocumentationExamples


What is Queen MQ?

Queen MQ is a PostgreSQL-backed message queue system with a powerful feature set:

  • FIFO Partitions - Unlimited ordered partitions within queues
  • Consumer Groups - Kafka-style consumer groups for scalability
  • Flexible Semantics - Exactly-once, at-least-once, and at-most-once delivery
  • Transactions - Atomic operations across push and ack
  • High Performance - 200K+ messages/sec with proper batching
  • Subscription Modes - Process from beginning, new messages only, or from timestamp
  • Dead Letter Queue - Automatic failure handling and monitoring
  • Message Tracing - Debug distributed workflows with trace timelines
  • Client-Side Buffering - 10x-100x throughput boost for high-volume pushes
  • Real-time Streaming - Windowed aggregation and processing

This client provides a fluent, async/await API for Python applications.


Installation

pip install queen-mq

Requirements: Python 3.8+


Quick Start

import asyncio
from queen import Queen

async def main():
    # Connect to Queen server
    async with Queen('http://localhost:6632') as queen:
        # Create a queue
        await queen.queue('tasks').create()
        
        # Push messages
        await queen.queue('tasks').push([
            {'data': {'task': 'send-email', 'to': 'alice@example.com'}}
        ])
        
        # Consume messages
        await queen.queue('tasks').consume(async def process(message):
            print('Processing:', message['data'])
            # Auto-ack on success, auto-retry on error
        )

asyncio.run(main())

Core Concepts

Queues

Logical containers for messages with configurable settings:

await queen.queue('orders').config({
    'leaseTime': 300,          # 5 minutes
    'retryLimit': 3,
    'priority': 5,
    'encryptionEnabled': False
}).create()

Partitions

Ordered lanes within a queue:

# All messages for user-123 are processed in order
await queen.queue('user-events').partition('user-123').push([
    {'data': {'event': 'login'}},
    {'data': {'event': 'view-page'}},
    {'data': {'event': 'logout'}}
])

Consumer Groups

Multiple consumers sharing work:

# Worker 1 & 2 share the load
await queen.queue('emails').group('processors').consume(async def handler(msg):
    await send_email(msg['data'])
)

# Separate group processes same messages independently
await queen.queue('emails').group('analytics').consume(async def handler(msg):
    await log_metrics(msg['data'])
)

Subscription Modes

Control whether consumer groups process historical messages:

# Default: Process ALL messages (including backlog)
await queen.queue('events').group('batch-analytics').consume(handler)

# Skip history, only new messages
await queen.queue('events').group('realtime-monitor').subscription_mode('new').consume(handler)

# Start from specific timestamp
await queen.queue('events').group('replay').subscription_from('2025-10-28T10:00:00.000Z').consume(handler)

Connection Options

Single Server

queen = Queen('http://localhost:6632')

Multiple Servers (High Availability)

queen = Queen(['http://server1:6632', 'http://server2:6632'])

Full Configuration

queen = Queen({
    'urls': ['http://server1:6632', 'http://server2:6632'],
    'timeout_millis': 30000,
    'retry_attempts': 3,
    'load_balancing_strategy': 'affinity',  # or 'round-robin', 'session'
    'enable_failover': True
})

Basic Usage Patterns

Push Messages

# Simple push
await queen.queue('tasks').push([
    {'data': {'job': 'resize-image', 'imageId': 123}}
])

# With partition
await queen.queue('tasks').partition('tenant-456').push([
    {'data': {'action': 'process'}}
])

# With custom transaction ID (for exactly-once)
await queen.queue('tasks').push([
    {'transactionId': 'unique-id-123', 'data': {'value': 42}}
])

Consume Messages (Long-Running Workers)

# Single message processing (batch=1, default)
# Handler receives a single message
await queen.queue('tasks').concurrency(10).consume(async def handler(message):
    await process_task(message['data'])
    # Auto-ack on success, auto-retry on error
)

# Batch processing (batch>1)
# Handler receives an array of messages
await queen.queue('tasks').batch(20).concurrency(5).consume(async def handler(messages):
    for message in messages:
        await process_task(message['data'])
)

# Process with limit and stop
await queen.queue('tasks').limit(100).consume(async def handler(message):
    await process_task(message['data'])
)

Pop Messages (On-Demand Processing)

# Grab messages manually
messages = await queen.queue('tasks').batch(10).wait(True).pop()

# Manual acknowledgment
for message in messages:
    try:
        await process_message(message['data'])
        await queen.ack(message, True)  # Success
    except Exception as error:
        await queen.ack(message, False)  # Retry

Transactions (Atomic Operations)

# Pop from queue A
messages = await queen.queue('input').pop()

# Atomically: ack input AND push output
await (queen.transaction()
    .ack(messages[0])
    .queue('output')
    .push([{'data': processed_result}])
    .commit())

Client-Side Buffering (High Throughput)

# Buffer messages locally, batch to server
for i in range(10000):
    await queen.queue('events').buffer({'message_count': 500, 'time_millis': 1000}).push([
        {'data': {'id': i}}
    ])

# Flush remaining buffered messages
await queen.flush_all_buffers()

# Result: 10x-100x faster than individual pushes

Dead Letter Queue

# Enable DLQ on queue
await queen.queue('risky').config({'retryLimit': 3, 'dlqAfterMaxRetries': True}).create()

# Query failed messages
dlq = await queen.queue('risky').dlq().limit(10).get()

print(f"Found {dlq['total']} failed messages")
for msg in dlq['messages']:
    print('Error:', msg.get('errorMessage'))

Message Tracing

await queen.queue('orders').consume(async def handler(msg):
    order_id = msg['data']['orderId']
    
    # Record trace with name for cross-service correlation
    await msg['trace']({
        'traceName': f"order-{order_id}",
        'eventType': 'info',
        'data': {'text': 'Order processing started'}
    })
    
    await process_order(msg['data'])
    
    await msg['trace']({
        'traceName': f"order-{order_id}",
        'eventType': 'processing',
        'data': {'text': 'Order completed', 'total': msg['data']['total']}
    })
)

# View traces in webapp: Traces → Search "order-12345"

API Reference

Queue Operations

# Create
await queen.queue('my-queue').create()
await queen.queue('my-queue').config({'priority': 5}).create()

# Delete
await queen.queue('my-queue').delete()

Push

await queen.queue('q').push([{'data': {'value': 1}}])
await queen.queue('q').partition('p1').push([{'data': {'value': 1}}])
await queen.queue('q').buffer({'message_count': 100, 'time_millis': 1000}).push([...])

Pop

msgs = await queen.queue('q').pop()
msgs = await queen.queue('q').batch(10).pop()
msgs = await queen.queue('q').batch(10).wait(True).pop()

Consume

# batch=1 (default): handler receives single message
await queen.queue('q').consume(async def handler(msg): ...)

# batch>1: handler receives array of messages
await queen.queue('q').batch(10).consume(async def handler(msgs): ...)

# Other options
await queen.queue('q').limit(10).consume(handler)
await queen.queue('q').concurrency(5).consume(handler)
await queen.queue('q').group('my-group').consume(handler)

Acknowledgment

await queen.ack(message, True)   # Success
await queen.ack(message, False)  # Retry
await queen.ack(message, False, {'error': 'reason'})
await queen.ack([msg1, msg2], True)  # Batch ack

Transactions

await (queen.transaction()
    .ack(message)
    .queue('output')
    .push([{'data': {'result': 'processed'}}])
    .commit())

Lease Renewal

await queen.renew(message)
await queen.renew([msg1, msg2, msg3])
await queen.queue('q').renew_lease(True, 60000).consume(handler)

Buffering

await queen.flush_all_buffers()
await queen.queue('q').flush_buffer()
stats = queen.get_buffer_stats()

Dead Letter Queue

dlq = await queen.queue('q').dlq().limit(10).get()
dlq = await queen.queue('q').dlq('consumer-group').limit(10).get()

Shutdown

await queen.close()  # Flush buffers and close connections

Configuration Defaults

Client Defaults

{
    'timeout_millis': 30000,
    'retry_attempts': 3,
    'retry_delay_millis': 1000,
    'load_balancing_strategy': 'affinity',
    'enable_failover': True
}

Queue Defaults

{
    'leaseTime': 300,           # 5 minutes
    'retryLimit': 3,
    'priority': 0,
    'delayedProcessing': 0,
    'windowBuffer': 0,
    'maxSize': 0,              # Unlimited
    'retentionSeconds': 0,     # Keep forever
    'encryptionEnabled': False
}

Consume Defaults

{
    'concurrency': 1,
    'batch': 1,
    'auto_ack': True,
    'wait': True,              # Long polling
    'timeout_millis': 30000,
    'limit': None,             # Run forever
    'renew_lease': False
}

Logging

Enable detailed logging for debugging:

export QUEEN_CLIENT_LOG=true
python your_app.py

Example output:

[2025-10-28T10:30:45.123Z] [INFO] [Queen.constructor] {"status":"initialized","urls":1}
[2025-10-28T10:30:45.234Z] [INFO] [QueueBuilder.push] {"queue":"tasks","partition":"Default","count":5}

Type Hints

Full type hints included for IDE support:

from queen import Queen, Message
from typing import Dict, Any

queen: Queen = Queen('http://localhost:6632')

async def handler(message: Message) -> None:
    data: Dict[str, Any] = message['data']
    print(data)

await queen.queue('orders').consume(handler)

Best Practices

  1. Use consume() for workers - Simpler API, handles retries automatically
  2. Use pop() for control - When you need precise control over acking
  3. Buffer for speed - Always use buffering when pushing many messages
  4. Partitions for order - Use partitions when message order matters
  5. Consumer groups for scale - Run multiple workers in the same group
  6. Transactions for consistency - Use transactions for atomic operations
  7. Enable DLQ - Always enable DLQ in production
  8. Renew long leases - Use auto-renewal for long-running tasks
  9. Graceful shutdown - Use async context manager or call queen.close()
  10. Monitor DLQ - Regularly check for failed messages

📝 Important Notes

Handler Signatures:

  • When batch=1 (default), handler receives a single message: async def handler(message): ...
  • When batch>1, handler receives an array of messages: async def handler(messages): ...
  • When each=True, always receives single messages regardless of batch size

Documentation


License

Apache 2.0 - See LICENSE


Support

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

queen_mq-0.12.2.tar.gz (52.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

queen_mq-0.12.2-py3-none-any.whl (48.2 kB view details)

Uploaded Python 3

File details

Details for the file queen_mq-0.12.2.tar.gz.

File metadata

  • Download URL: queen_mq-0.12.2.tar.gz
  • Upload date:
  • Size: 52.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.8

File hashes

Hashes for queen_mq-0.12.2.tar.gz
Algorithm Hash digest
SHA256 53d77940f9a11a5f4085f9a3c713ca4a3e9c9848ee489f1072f86e92b673bf3d
MD5 cc28c8a1caa8298eb7f46a22f9b2deb2
BLAKE2b-256 d90552f3b8d6a959412f7b7a8459a071fb02447f39b984bafa59d95e3309071d

See more details on using hashes here.

File details

Details for the file queen_mq-0.12.2-py3-none-any.whl.

File metadata

  • Download URL: queen_mq-0.12.2-py3-none-any.whl
  • Upload date:
  • Size: 48.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.8

File hashes

Hashes for queen_mq-0.12.2-py3-none-any.whl
Algorithm Hash digest
SHA256 44316013756818ba264d04a154fe0669d21636621b3b328c4b8f9af9ba28b95d
MD5 cbbd52a8ae552ac9c1098117ff3c4a56
BLAKE2b-256 aa943053b3aa48d6c4b8d967f316d170b8ca027541f336794f5284dc9578b52b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page