Skip to main content

A simple, robust RabbitMQ manager for Python applications

Project description

RabbitMQ Easy

PyPI version Python Support License: MIT

A simple, robust RabbitMQ manager for Python applications with built-in connection management, retry logic, and dead letter queue support.

🚀 Features

  • 🔄 Automatic Connection Management: Built-in retry logic and connection recovery
  • 🛡️ Dead Letter Queue Support: Automatic setup of dead letter exchanges and queues
  • 📊 Comprehensive Logging: Both file and console logging with emoji indicators
  • 🔧 Environment Variable Support: Easy configuration through environment variables
  • 🧪 Idempotent Operations: Safe to run multiple times without errors
  • 📦 Context Manager Support: Proper resource cleanup
  • ⚡ Production Ready: Used in production environments
  • 🗑️ Resource Management: Built-in cleanup and deletion functions

📦 Installation

pip install rabbitmq-easy

🏃‍♂️ Quick Start

Basic Usage

from rabbitmq_easy import RabbitMQManager

# Simple setup
manager = RabbitMQManager(
    host='localhost',
    port=5672,
    username='admin',
    password='password',
    queues=['queue1', 'queue2'],
    routing_keys=['queue1.*', 'queue2.*'],
    exchange='my_exchange'
)

# Publish a message
manager.publish_message('my_exchange', 'queue1.new', '{"order_id": 123}')

# Use as context manager
with RabbitMQManager() as manager:  # Uses environment variables
    manager.publish_message('my_exchange', 'queue1.new', '{"order_id": 123}')

Environment Variables Setup

Create a .env file:

RABBITMQ_HOST=localhost
RABBITMQ_PORT=5672
RABBITMQ_USERNAME=admin
RABBITMQ_PASSWORD=password
RABBITMQ_EXCHANGE=my_exchange
RABBITMQ_QUEUES=queue1,queue2
RABBITMQ_ROUTING_KEYS=queue1.*,queue2.*.*

Then use:

from rabbitmq_easy import create_rabbitmq_manager

# Auto-loads from environment variables
manager = create_rabbitmq_manager()

🔧 Advanced Usage

Custom Configuration

from rabbitmq_easy import RabbitMQManager

# Advanced configuration
manager = RabbitMQManager(
    host='rabbitmq.example.com',
    port=5672,
    username='user',
    password='pass',
    queues=['high_priority', 'low_priority'],
    routing_keys=['urgent.*', 'normal.*'],
    exchange='task_exchange',
    dead_letter_exchange='failed_tasks',
    dead_letter_routing_key='failed',
    max_retries=3,
    retry_delay=5,
    enable_console_logging=True,
    log_level='INFO'
)

# Setup additional queues dynamically
manager.setup_queue(
    queue_name='special_queue',
    exchange='task_exchange',
    routing_key='special.*'
)

# Get queue information
info = manager.get_queue_info('high_priority')
print(f"Messages in queue: {info['message_count']}")

# Health check
health = manager.health_check()
print(f"Status: {health['status']}")

Consumer Example

import json
from rabbitmq_easy import RabbitMQManager

def process_message(ch, method, properties, body):
    """Process incoming message"""
    try:
        data = json.loads(body)
        print(f"Processing: {data}")
        
        # Your business logic here
        
        # Acknowledge message
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
        print(f"Error processing message: {e}")
        # Reject message (will go to dead letter queue if configured)
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

# Setup consumer
with RabbitMQManager() as manager:
    manager.start_consuming('queue1', process_message)

Dead Letter Queue Handling

def handle_failed_messages(ch, method, properties, body):
    """Handle messages that failed processing"""
    try:
        # Get failure information
        headers = properties.headers or {}
        death_info = headers.get('x-death', [])
        
        if death_info:
            failure_reason = death_info[0].get('reason')
            failure_count = death_info[0].get('count')
            print(f"Message failed {failure_count} times. Reason: {failure_reason}")
        
        # Reprocess or log the failure
        data = json.loads(body)
        print(f"Dead letter message: {data}")
        
        # Acknowledge to remove from DLQ
        ch.basic_ack(delivery_tag=method.delivery_tag)
        
    except Exception as e:
        print(f"Error handling dead letter: {e}")
        # Acknowledge to prevent infinite loops
        ch.basic_ack(delivery_tag=method.delivery_tag)

# Monitor failed messages
manager.start_consuming('failed_messages', handle_failed_messages)

⚙️ Configuration Options

Parameter Environment Variable Default Description
host RABBITMQ_HOST localhost RabbitMQ server host
port RABBITMQ_PORT 5672 RabbitMQ server port
username RABBITMQ_USERNAME guest Username for authentication
password RABBITMQ_PASSWORD guest Password for authentication
queues RABBITMQ_QUEUES [] Comma-separated list of queues
routing_keys RABBITMQ_ROUTING_KEYS [] Comma-separated list of routing keys
exchange RABBITMQ_EXCHANGE '' Exchange name
dead_letter_exchange RABBITMQ_DEAD_LETTER_EXCHANGE {exchange}_dlx Dead letter exchange name
dead_letter_routing_key RABBITMQ_DEAD_LETTER_ROUTING_KEY dead_letter Dead letter routing key
max_retries RABBITMQ_MAX_RETRIES 5 Maximum connection retry attempts
retry_delay RABBITMQ_RETRY_DELAY 5 Delay between retry attempts
heartbeat RABBITMQ_HEARTBEAT 60 Connection heartbeat interval
enable_console_logging RABBITMQ_CONSOLE_LOGGING True Enable console logging
log_level RABBITMQ_LOG_LEVEL INFO Logging level

🛠️ Resource Management

Queue and Exchange Operations

# Delete specific resources
manager.delete_queue("old_queue")
manager.delete_exchange("old_exchange")

# Delete only if empty/unused
manager.delete_queue("queue_name", if_empty=True)
manager.delete_exchange("exchange_name", if_unused=True)

# Purge messages without deleting queue
message_count = manager.purge_queue("queue_name")
print(f"Purged {message_count} messages")

# Clean up dead letter infrastructure
manager.cleanup_dead_letter_setup()

# Delete all resources created by this manager
results = manager.delete_all_setup_resources(confirm=True)

# Complete reset
manager.reset_manager(confirm=True)

❌ Error Handling

The package includes custom exceptions for better error handling:

from rabbitmq_easy import RabbitMQConnectionError, RabbitMQConfigurationError

try:
    manager = RabbitMQManager(
        host='invalid-host',
        queues=['queue1', 'queue2'],
        routing_keys=['key1']  # Mismatch with queues count
    )
except RabbitMQConfigurationError as e:
    print(f"Configuration error: {e}")
except RabbitMQConnectionError as e:
    print(f"Connection error: {e}")

🏗️ What Gets Created Automatically

When you initialize with:

manager = RabbitMQManager(
    exchange='queue1',
    queues=['new_queue1', 'pending_queue1'],
    routing_keys=['queue1.new', 'queue1.pending']
)

RabbitMQ Easy automatically creates:

  1. queue1 exchange (main exchange)
  2. queue1_dlx exchange (dead letter exchange)
  3. new_queue1 queue → bound to queue1 exchange with queue1.new routing key
  4. pending_queue1 queue → bound to queue1 exchange with queue1.pending routing key
  5. failed_messages queue → bound to queue1_dlx exchange for error handling

All queues are configured with dead letter routing to capture failed messages automatically.

📋 Best Practices

  1. Use Environment Variables: Store sensitive information like passwords in environment variables
  2. Context Managers: Use with statements for automatic cleanup
  3. Dead Letter Queues: Always configure dead letter queues for production
  4. Health Checks: Implement health checks in your applications
  5. Logging: Monitor logs for connection issues and errors
  6. Error Handling: Always handle exceptions in your consumers
  7. Resource Cleanup: Use the provided cleanup functions during development

⚠️ Important Notes

Queue and Routing Key Validation

The number of queues must match the number of routing keys:

# ✅ Correct
manager = RabbitMQManager(
    queues=['queue1', 'queue2'],
    routing_keys=['key1', 'key2']  # Same count
)

# ❌ Will raise RabbitMQConfigurationError
manager = RabbitMQManager(
    queues=['queue1', 'queue2'],
    routing_keys=['key1']  # Mismatch!
)

Dead Letter Configuration

Don't configure dead letter queues to point to themselves:

# ❌ Don't do this - creates infinite loop
manager.setup_queue(
    'my_dlq',
    dead_letter_exchange='same_exchange'
)

# ✅ Do this instead
manager.setup_queue('my_dlq', dead_letter_exchange=None)

🔍 Monitoring and Health Checks

# Check connection health
health = manager.health_check()
if health['status'] == 'healthy':
    print("✅ RabbitMQ connection is healthy")
else:
    print(f"❌ RabbitMQ connection issues: {health['error']}")

# Get detailed queue information
info = manager.get_queue_info('queue1')
print(f"Queue: {info['queue']}")
print(f"Messages: {info['message_count']}")
print(f"Consumers: {info['consumer_count']}")

🚀 Production Deployment

Docker Compose Example

version: '3.8'
services:
  app:
    build: .
    environment:
      RABBITMQ_HOST: rabbitmq
      RABBITMQ_USERNAME: admin
      RABBITMQ_PASSWORD: ${RABBITMQ_PASSWORD}
      RABBITMQ_EXCHANGE: production
      RABBITMQ_QUEUES: queue1,queue2
      RABBITMQ_ROUTING_KEYS: queue1.*,queue2.*.*
    depends_on:
      - rabbitmq
  
  rabbitmq:
    image: rabbitmq:3-management
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASSWORD}

Kubernetes ConfigMap

apiVersion: v1
kind: ConfigMap
metadata:
  name: rabbitmq-config
data:
  RABBITMQ_HOST: "rabbitmq-service"
  RABBITMQ_PORT: "5672"
  RABBITMQ_EXCHANGE: "production"
  RABBITMQ_QUEUES: "queue1,queue2"
  RABBITMQ_ROUTING_KEYS: "queue1.*,queue2.*.*"

🤝 Contributing

Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.

  1. Fork the repository
  2. Create a feature branch (git checkout -b feature/amazing-feature)
  3. Make your changes
  4. Add tests for your changes
  5. Run the test suite (pytest)
  6. Commit your changes (git commit -m 'Add amazing feature')
  7. Push to the branch (git push origin feature/amazing-feature)
  8. Open a Pull Request

📝 License

This project is licensed under the MIT License - see the LICENSE file for details.

🐛 Support

📚 Changelog

See CHANGELOG.md for a detailed list of changes and version history.

🙏 Acknowledgments

  • Built on top of the excellent pika library
  • Inspired by the need for simpler RabbitMQ management in Python applications
  • Thanks to the RabbitMQ team for creating an amazing message broker

Made with ❤️ for the Python community

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rabbitmq_easy-1.0.0.tar.gz (22.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

rabbitmq_easy-1.0.0-py3-none-any.whl (17.9 kB view details)

Uploaded Python 3

File details

Details for the file rabbitmq_easy-1.0.0.tar.gz.

File metadata

  • Download URL: rabbitmq_easy-1.0.0.tar.gz
  • Upload date:
  • Size: 22.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.9.6

File hashes

Hashes for rabbitmq_easy-1.0.0.tar.gz
Algorithm Hash digest
SHA256 fb6a633aac36c1d4b2a7178362a522335f093a856e22d782073bdb0da32a6f33
MD5 c1fb3f7b46025c3b102a0b29696edb5a
BLAKE2b-256 6c6f3386fcf89fc42e5f17420ee1f9d77c79b5885d4ff94580fa04fa16566307

See more details on using hashes here.

File details

Details for the file rabbitmq_easy-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: rabbitmq_easy-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 17.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.9.6

File hashes

Hashes for rabbitmq_easy-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 44498805643dd8f2e878d5ec7f08bb35a226ec197fc051bbd078c20b231ea134
MD5 4a976b8a55f3ab2810e258e13f89eaab
BLAKE2b-256 4801b12eaf799ab6ee89e2099201a46372208a5c770b8a6f76dcd26f37520f17

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page