Skip to main content

A simple, robust RabbitMQ manager for Python applications

Project description

RabbitMQ Easy

PyPI version Python Support License: MIT

A simple, robust RabbitMQ manager for Python applications with built-in connection management, retry logic, and dead letter queue support.

Features

  • Automatic Connection Management: Built-in retry logic and connection recovery
  • Dead Letter Queue Support: Automatic setup of dead letter exchanges and queues
  • Comprehensive Logging: Both file and console logging with emoji indicators
  • Environment Variable Support: Easy configuration through environment variables
  • Idempotent Operations: Safe to run multiple times without errors
  • Context Manager Support: Proper resource cleanup
  • Production Ready: Used in production environments
  • Resource Management: Built-in cleanup and deletion functions

Installation

pip install rabbitmq-easy

Quick Start

Basic Usage

from rabbitmq_easy import RabbitMQManager

# Simple setup
manager = RabbitMQManager(
    host='localhost',
    port=5672,
    username='guest',
    password='guest',
    queues=['orders', 'payments'],
    routing_keys=['orders.*', 'payments.*'],
    exchange='my_exchange'
)

# Publish a message
manager.publish_message('my_exchange', 'orders.new', '{"order_id": 123}')

# Use as context manager
with RabbitMQManager() as manager:  # Uses environment variables
    manager.publish_message('my_exchange', 'orders.new', '{"order_id": 123}')

Environment Variables Setup

Create a .env file:

RABBITMQ_HOST=localhost
RABBITMQ_PORT=5672
RABBITMQ_USERNAME=guest
RABBITMQ_PASSWORD=guest
RABBITMQ_EXCHANGE=my_exchange
RABBITMQ_QUEUES=orders,payments,notifications
RABBITMQ_ROUTING_KEYS=orders.*,payments.*,notifications.*

Then use:

from rabbitmq_easy import create_rabbitmq_manager

# Auto-loads from environment variables
manager = create_rabbitmq_manager()

Advanced Usage

Custom Configuration

from rabbitmq_easy import RabbitMQManager

# Advanced configuration
manager = RabbitMQManager(
    host='rabbitmq.example.com',
    port=5672,
    username='user',
    password='pass',
    queues=['high_priority', 'low_priority'],
    routing_keys=['urgent.*', 'normal.*'],
    exchange='task_exchange',
    dead_letter_exchange='failed_tasks',
    dead_letter_routing_key='failed',
    dead_letter_queue_name='failed_tasks',
    max_retries=3,
    retry_delay=5,
    enable_console_logging=True,
    log_level='INFO'
)

# Setup additional queues dynamically
manager.setup_queue(
    queue_name='special_queue',
    exchange='task_exchange',
    routing_key='special.*'
)

# Get queue information
info = manager.get_queue_info('high_priority')
print(f"Messages in queue: {info['message_count']}")

# Health check
health = manager.health_check()
print(f"Status: {health['status']}")

Consumer Example

import json
from rabbitmq_easy import RabbitMQManager

def process_message(ch, method, properties, body):
    """Process incoming message"""
    try:
        data = json.loads(body)
        print(f"Processing: {data}")
        
        # Your business logic here
        
        # Acknowledge message
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
        print(f"Error processing message: {e}")
        # Reject message (will go to dead letter queue if configured)
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

# Setup consumer
with RabbitMQManager() as manager:
    manager.start_consuming('orders', process_message)

Dead Letter Queue Handling

def handle_failed_messages(ch, method, properties, body):
    """Handle messages that failed processing"""
    try:
        # Get failure information
        headers = properties.headers or {}
        death_info = headers.get('x-death', [])
        
        if death_info:
            failure_reason = death_info[0].get('reason')
            failure_count = death_info[0].get('count')
            print(f"Message failed {failure_count} times. Reason: {failure_reason}")
        
        # Reprocess or log the failure
        data = json.loads(body)
        print(f"Dead letter message: {data}")
        
        # Acknowledge to remove from DLQ
        ch.basic_ack(delivery_tag=method.delivery_tag)
        
    except Exception as e:
        print(f"Error handling dead letter: {e}")
        # Acknowledge to prevent infinite loops
        ch.basic_ack(delivery_tag=method.delivery_tag)

# Monitor failed messages
manager.start_consuming('failed_messages', handle_failed_messages)

Configuration Options

Parameter Environment Variable Default Description
host RABBITMQ_HOST localhost RabbitMQ server host
port RABBITMQ_PORT 5672 RabbitMQ server port
username RABBITMQ_USERNAME guest Username for authentication
password RABBITMQ_PASSWORD guest Password for authentication
queues RABBITMQ_QUEUES [] Comma-separated list of queues
routing_keys RABBITMQ_ROUTING_KEYS [] Comma-separated list of routing keys
exchange RABBITMQ_EXCHANGE '' Exchange name
dead_letter_exchange RABBITMQ_DEAD_LETTER_EXCHANGE {exchange}_dlx Dead letter exchange name
dead_letter_routing_key RABBITMQ_DEAD_LETTER_ROUTING_KEY dead_letter Dead letter routing key
max_retries RABBITMQ_MAX_RETRIES 5 Maximum connection retry attempts
retry_delay RABBITMQ_RETRY_DELAY 5 Delay between retry attempts
heartbeat RABBITMQ_HEARTBEAT 60 Connection heartbeat interval
enable_console_logging RABBITMQ_CONSOLE_LOGGING True Enable console logging
log_level RABBITMQ_LOG_LEVEL INFO Logging level

Queue and Exchange Operations

# Delete specific resources
manager.delete_queue("old_queue")
manager.delete_exchange("old_exchange")

# Delete only if empty/unused
manager.delete_queue("queue_name", if_empty=True)
manager.delete_exchange("exchange_name", if_unused=True)

# Purge messages without deleting queue
message_count = manager.purge_queue("queue_name")
print(f"Purged {message_count} messages")

# Clean up dead letter infrastructure
manager.cleanup_dead_letter_setup()

# Delete all resources created by this manager
results = manager.delete_all_setup_resources(confirm=True)

# Complete reset
manager.reset_manager(confirm=True)

Error Handling

The package includes custom exceptions for better error handling:

from rabbitmq_easy import RabbitMQConnectionError, RabbitMQConfigurationError

try:
    manager = RabbitMQManager(
        host='invalid-host',
        queues=['queue1', 'queue2'],
        routing_keys=['key1']  # Mismatch with queues count
    )
except RabbitMQConfigurationError as e:
    print(f"Configuration error: {e}")
except RabbitMQConnectionError as e:
    print(f"Connection error: {e}")

What Gets Created Automatically

When you initialize with:

manager = RabbitMQManager(
    exchange='orders',
    queues=['new_orders', 'pending_orders'],
    routing_keys=['orders.new', 'orders.pending']
)

RabbitMQ Easy automatically creates:

  1. orders exchange (main exchange)
  2. orders_dlx exchange (dead letter exchange)
  3. new_orders queue → bound to orders exchange with orders.new routing key
  4. pending_orders queue → bound to orders exchange with orders.pending routing key
  5. failed_messages queue → bound to orders_dlx exchange for error handling

All queues are configured with dead letter routing to capture failed messages automatically.

Best Practices

  1. Use Environment Variables: Store sensitive information like passwords in environment variables
  2. Context Managers: Use with statements for automatic cleanup
  3. Dead Letter Queues: Always configure dead letter queues for production
  4. Health Checks: Implement health checks in your applications
  5. Logging: Monitor logs for connection issues and errors
  6. Error Handling: Always handle exceptions in your consumers
  7. Resource Cleanup: Use the provided cleanup functions during development

Queue and Routing Key Validation

The number of queues must match the number of routing keys:

# Correct
manager = RabbitMQManager(
    queues=['queue1', 'queue2'],
    routing_keys=['key1', 'key2']  # Same count
)

# Will raise RabbitMQConfigurationError
manager = RabbitMQManager(
    queues=['queue1', 'queue2'],
    routing_keys=['key1']  # Mismatch!
)

Dead Letter Configuration

Don't configure dead letter queues to point to themselves:

# Don't do this - creates infinite loop
manager.setup_queue(
    'my_dlq',
    dead_letter_exchange='same_exchange'
)

# Do this instead
manager.setup_queue('my_dlq', dead_letter_exchange=None)

Monitoring and Health Checks

# Check connection health
health = manager.health_check()
if health['status'] == 'healthy':
    print("RabbitMQ connection is healthy")
else:
    print(f"RabbitMQ connection issues: {health['error']}")

# Get detailed queue information
info = manager.get_queue_info('orders')
print(f"Queue: {info['queue']}")
print(f"Messages: {info['message_count']}")
print(f"Consumers: {info['consumer_count']}")

Production Deployment

Docker Compose Example

version: '3.8'
services:
  app:
    build: .
    environment:
     - ~/etc/<org_name>/.env
    depends_on:
      - rabbitmq
  
  rabbitmq:
    image: rabbitmq:3-management
    environment:
     - ~/etc/<org_name>/.env

Kubernetes ConfigMap

apiVersion: v1
kind: ConfigMap
metadata:
  name: rabbitmq-config
data:
  RABBITMQ_HOST: "rabbitmq-service"
  RABBITMQ_PORT: "5672"
  RABBITMQ_EXCHANGE: "production"
  RABBITMQ_QUEUES: "orders,payments,notifications"
  RABBITMQ_ROUTING_KEYS: "orders.*,payments.*,notifications.*"

Contributing

Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.

  1. Fork the repository
  2. Create a feature branch (git checkout -b feature/amazing-feature)
  3. Make your changes
  4. Add tests for your changes
  5. Run the test suite (pytest)
  6. Commit your changes (git commit -m 'Add amazing feature')
  7. Push to the branch (git push origin feature/amazing-feature)
  8. Open a Pull Request

License

This project is licensed under the MIT License - see the LICENSE file for details.

Support

Changelog

See CHANGELOG.md for a detailed list of changes and version history.

Acknowledgments

  • Built on top of the excellent pika library
  • Inspired by the need for simpler RabbitMQ management in Python applications
  • Thanks to the RabbitMQ team for creating an amazing message broker

Made with ❤️ for the Python community

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rabbitmq_easy-1.0.5.tar.gz (23.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

rabbitmq_easy-1.0.5-py3-none-any.whl (18.8 kB view details)

Uploaded Python 3

File details

Details for the file rabbitmq_easy-1.0.5.tar.gz.

File metadata

  • Download URL: rabbitmq_easy-1.0.5.tar.gz
  • Upload date:
  • Size: 23.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.0

File hashes

Hashes for rabbitmq_easy-1.0.5.tar.gz
Algorithm Hash digest
SHA256 07cb57d90ee665ba8166b53960f7ced96ea8a0c2a6f59c412cdf81e9e0556ff9
MD5 5dfeb82da6d1a88ee84fd6de41d1910b
BLAKE2b-256 8a632b434ed4b2c2eaed10d11f36998b4200a55ce49a8eba81560c6c931a91bb

See more details on using hashes here.

File details

Details for the file rabbitmq_easy-1.0.5-py3-none-any.whl.

File metadata

  • Download URL: rabbitmq_easy-1.0.5-py3-none-any.whl
  • Upload date:
  • Size: 18.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.0

File hashes

Hashes for rabbitmq_easy-1.0.5-py3-none-any.whl
Algorithm Hash digest
SHA256 013f2a8f0bbacefa5f67dd035b3f3f6ad6e01939cc5df60453c126ac1ef506e8
MD5 8027e332bff6f8d49cde267da3340535
BLAKE2b-256 0dc600f7518927ffa5a2bf173ff600a7290c88c99c5a85dd2456d35dfc3c6d2e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page