Skip to main content

A simple, robust RabbitMQ manager for Python applications

Project description

RabbitMQ Easy

PyPI version Python Support License: MIT

A simple, robust RabbitMQ manager for Python applications with built-in connection management, retry logic, and dead letter queue support.

🚀 Features

  • 🔄 Automatic Connection Management: Built-in retry logic and connection recovery
  • 🛡️ Dead Letter Queue Support: Automatic setup of dead letter exchanges and queues
  • 📊 Comprehensive Logging: Both file and console logging with emoji indicators
  • 🔧 Environment Variable Support: Easy configuration through environment variables
  • 🧪 Idempotent Operations: Safe to run multiple times without errors
  • 📦 Context Manager Support: Proper resource cleanup
  • ⚡ Production Ready: Used in production environments
  • 🗑️ Resource Management: Built-in cleanup and deletion functions

📦 Installation

pip install rabbitmq-easy

🏃‍♂️ Quick Start

Basic Usage

from rabbitmq_easy import RabbitMQManager

# Simple setup
manager = RabbitMQManager(
    host='localhost',
    port=5672,
    username='guest',
    password='guest',
    queues=['orders', 'payments'],
    routing_keys=['orders.*', 'payments.*'],
    exchange='my_exchange'
)

# Publish a message
manager.publish_message('my_exchange', 'orders.new', '{"order_id": 123}')

# Use as context manager
with RabbitMQManager() as manager:  # Uses environment variables
    manager.publish_message('my_exchange', 'orders.new', '{"order_id": 123}')

Environment Variables Setup

Create a .env file:

RABBITMQ_HOST=localhost
RABBITMQ_PORT=5672
RABBITMQ_USERNAME=guest
RABBITMQ_PASSWORD=guest
RABBITMQ_EXCHANGE=my_exchange
RABBITMQ_QUEUES=orders,payments,notifications
RABBITMQ_ROUTING_KEYS=orders.*,payments.*,notifications.*

Then use:

from rabbitmq_easy import create_rabbitmq_manager

# Auto-loads from environment variables
manager = create_rabbitmq_manager()

🔧 Advanced Usage

Custom Configuration

from rabbitmq_easy import RabbitMQManager

# Advanced configuration
manager = RabbitMQManager(
    host='rabbitmq.example.com',
    port=5672,
    username='user',
    password='pass',
    queues=['high_priority', 'low_priority'],
    routing_keys=['urgent.*', 'normal.*'],
    exchange='task_exchange',
    dead_letter_exchange='failed_tasks',
    dead_letter_routing_key='failed',
    max_retries=3,
    retry_delay=5,
    enable_console_logging=True,
    log_level='INFO'
)

# Setup additional queues dynamically
manager.setup_queue(
    queue_name='special_queue',
    exchange='task_exchange',
    routing_key='special.*'
)

# Get queue information
info = manager.get_queue_info('high_priority')
print(f"Messages in queue: {info['message_count']}")

# Health check
health = manager.health_check()
print(f"Status: {health['status']}")

Consumer Example

import json
from rabbitmq_easy import RabbitMQManager

def process_message(ch, method, properties, body):
    """Process incoming message"""
    try:
        data = json.loads(body)
        print(f"Processing: {data}")
        
        # Your business logic here
        
        # Acknowledge message
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
        print(f"Error processing message: {e}")
        # Reject message (will go to dead letter queue if configured)
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

# Setup consumer
with RabbitMQManager() as manager:
    manager.start_consuming('orders', process_message)

Dead Letter Queue Handling

def handle_failed_messages(ch, method, properties, body):
    """Handle messages that failed processing"""
    try:
        # Get failure information
        headers = properties.headers or {}
        death_info = headers.get('x-death', [])
        
        if death_info:
            failure_reason = death_info[0].get('reason')
            failure_count = death_info[0].get('count')
            print(f"Message failed {failure_count} times. Reason: {failure_reason}")
        
        # Reprocess or log the failure
        data = json.loads(body)
        print(f"Dead letter message: {data}")
        
        # Acknowledge to remove from DLQ
        ch.basic_ack(delivery_tag=method.delivery_tag)
        
    except Exception as e:
        print(f"Error handling dead letter: {e}")
        # Acknowledge to prevent infinite loops
        ch.basic_ack(delivery_tag=method.delivery_tag)

# Monitor failed messages
manager.start_consuming('failed_messages', handle_failed_messages)

⚙️ Configuration Options

Parameter Environment Variable Default Description
host RABBITMQ_HOST localhost RabbitMQ server host
port RABBITMQ_PORT 5672 RabbitMQ server port
username RABBITMQ_USERNAME guest Username for authentication
password RABBITMQ_PASSWORD guest Password for authentication
queues RABBITMQ_QUEUES [] Comma-separated list of queues
routing_keys RABBITMQ_ROUTING_KEYS [] Comma-separated list of routing keys
exchange RABBITMQ_EXCHANGE '' Exchange name
dead_letter_exchange RABBITMQ_DEAD_LETTER_EXCHANGE {exchange}_dlx Dead letter exchange name
dead_letter_routing_key RABBITMQ_DEAD_LETTER_ROUTING_KEY dead_letter Dead letter routing key
max_retries RABBITMQ_MAX_RETRIES 5 Maximum connection retry attempts
retry_delay RABBITMQ_RETRY_DELAY 5 Delay between retry attempts
heartbeat RABBITMQ_HEARTBEAT 60 Connection heartbeat interval
enable_console_logging RABBITMQ_CONSOLE_LOGGING True Enable console logging
log_level RABBITMQ_LOG_LEVEL INFO Logging level

🛠️ Resource Management

Queue and Exchange Operations

# Delete specific resources
manager.delete_queue("old_queue")
manager.delete_exchange("old_exchange")

# Delete only if empty/unused
manager.delete_queue("queue_name", if_empty=True)
manager.delete_exchange("exchange_name", if_unused=True)

# Purge messages without deleting queue
message_count = manager.purge_queue("queue_name")
print(f"Purged {message_count} messages")

# Clean up dead letter infrastructure
manager.cleanup_dead_letter_setup()

# Delete all resources created by this manager
results = manager.delete_all_setup_resources(confirm=True)

# Complete reset
manager.reset_manager(confirm=True)

❌ Error Handling

The package includes custom exceptions for better error handling:

from rabbitmq_easy import RabbitMQConnectionError, RabbitMQConfigurationError

try:
    manager = RabbitMQManager(
        host='invalid-host',
        queues=['queue1', 'queue2'],
        routing_keys=['key1']  # Mismatch with queues count
    )
except RabbitMQConfigurationError as e:
    print(f"Configuration error: {e}")
except RabbitMQConnectionError as e:
    print(f"Connection error: {e}")

🏗️ What Gets Created Automatically

When you initialize with:

manager = RabbitMQManager(
    exchange='orders',
    queues=['new_orders', 'pending_orders'],
    routing_keys=['orders.new', 'orders.pending']
)

RabbitMQ Easy automatically creates:

  1. orders exchange (main exchange)
  2. orders_dlx exchange (dead letter exchange)
  3. new_orders queue → bound to orders exchange with orders.new routing key
  4. pending_orders queue → bound to orders exchange with orders.pending routing key
  5. failed_messages queue → bound to orders_dlx exchange for error handling

All queues are configured with dead letter routing to capture failed messages automatically.

📋 Best Practices

  1. Use Environment Variables: Store sensitive information like passwords in environment variables
  2. Context Managers: Use with statements for automatic cleanup
  3. Dead Letter Queues: Always configure dead letter queues for production
  4. Health Checks: Implement health checks in your applications
  5. Logging: Monitor logs for connection issues and errors
  6. Error Handling: Always handle exceptions in your consumers
  7. Resource Cleanup: Use the provided cleanup functions during development

⚠️ Important Notes

Queue and Routing Key Validation

The number of queues must match the number of routing keys:

# ✅ Correct
manager = RabbitMQManager(
    queues=['queue1', 'queue2'],
    routing_keys=['key1', 'key2']  # Same count
)

# ❌ Will raise RabbitMQConfigurationError
manager = RabbitMQManager(
    queues=['queue1', 'queue2'],
    routing_keys=['key1']  # Mismatch!
)

Dead Letter Configuration

Don't configure dead letter queues to point to themselves:

# ❌ Don't do this - creates infinite loop
manager.setup_queue(
    'my_dlq',
    dead_letter_exchange='same_exchange'
)

# ✅ Do this instead
manager.setup_queue('my_dlq', dead_letter_exchange=None)

🔍 Monitoring and Health Checks

# Check connection health
health = manager.health_check()
if health['status'] == 'healthy':
    print("✅ RabbitMQ connection is healthy")
else:
    print(f"❌ RabbitMQ connection issues: {health['error']}")

# Get detailed queue information
info = manager.get_queue_info('orders')
print(f"Queue: {info['queue']}")
print(f"Messages: {info['message_count']}")
print(f"Consumers: {info['consumer_count']}")

🚀 Production Deployment

Docker Compose Example

version: '3.8'
services:
  app:
    build: .
    environment:
     - ~/etc/<org_name>/.env
    depends_on:
      - rabbitmq
  
  rabbitmq:
    image: rabbitmq:3-management
    environment:
     - ~/etc/<org_name>/.env

Kubernetes ConfigMap

apiVersion: v1
kind: ConfigMap
metadata:
  name: rabbitmq-config
data:
  RABBITMQ_HOST: "rabbitmq-service"
  RABBITMQ_PORT: "5672"
  RABBITMQ_EXCHANGE: "production"
  RABBITMQ_QUEUES: "orders,payments,notifications"
  RABBITMQ_ROUTING_KEYS: "orders.*,payments.*,notifications.*"

🤝 Contributing

Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.

  1. Fork the repository
  2. Create a feature branch (git checkout -b feature/amazing-feature)
  3. Make your changes
  4. Add tests for your changes
  5. Run the test suite (pytest)
  6. Commit your changes (git commit -m 'Add amazing feature')
  7. Push to the branch (git push origin feature/amazing-feature)
  8. Open a Pull Request

📝 License

This project is licensed under the MIT License - see the LICENSE file for details.

🐛 Support

📚 Changelog

See CHANGELOG.md for a detailed list of changes and version history.

🙏 Acknowledgments

  • Built on top of the excellent pika library
  • Inspired by the need for simpler RabbitMQ management in Python applications
  • Thanks to the RabbitMQ team for creating an amazing message broker

Made with ❤️ for the Python community

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rabbitmq_easy-1.0.3.tar.gz (22.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

rabbitmq_easy-1.0.3-py3-none-any.whl (17.9 kB view details)

Uploaded Python 3

File details

Details for the file rabbitmq_easy-1.0.3.tar.gz.

File metadata

  • Download URL: rabbitmq_easy-1.0.3.tar.gz
  • Upload date:
  • Size: 22.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.9.6

File hashes

Hashes for rabbitmq_easy-1.0.3.tar.gz
Algorithm Hash digest
SHA256 f80be22629c20db0802f0203a8d1927459b15546c416a6e16d22f3b4565e0558
MD5 03bebd4770d2146ab31b8b1d1d473d18
BLAKE2b-256 4fe8584fcf0f16d6f33fb4d675d1d33e223eec5bd18159c237f21df9d7980b4f

See more details on using hashes here.

File details

Details for the file rabbitmq_easy-1.0.3-py3-none-any.whl.

File metadata

  • Download URL: rabbitmq_easy-1.0.3-py3-none-any.whl
  • Upload date:
  • Size: 17.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.9.6

File hashes

Hashes for rabbitmq_easy-1.0.3-py3-none-any.whl
Algorithm Hash digest
SHA256 899414f37d93c113dac649c11dfab8251f2e74628f014adfb9fcf2e3bf529426
MD5 b9c16ac15dc7996a850850375c92b5be
BLAKE2b-256 98dd8d6bd50d5fdf1bc3c380a0c8205d1a0e2d4e396220b77a1346be657bb029

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page