Skip to main content

BullMQ manager logic

Project description

TREDOPS BULLMQ UTILS

A complete utility library for BullMQ management and administration, designed to simplify message queue operations in enterprise-level Python applications.

🎯 Objectives

This library provides a comprehensive set of tools to:

  • Simplify BullMQ management with intuitive Python interfaces
  • Monitor and administer queues in real-time
  • Implement backup/restore systems for operational continuity
  • Handle priorities, delays, and job cancellations efficiently
  • Integrate with MongoDB for persistence and auditing

📦 Main Components

🔧 BullMQ Consumer (consumer_bullmq.py)

  • Objective: Simplified and robust consumer for processing messages
  • Features:
    • Safe signal handling (SIGINT/SIGTERM)
    • Automatic queue configuration
    • Integrated logging and error handling
    • Reduced from 200+ lines to 78 lines of clean code

📤 BullMQ Sender (bullmq_sender.py)

  • Objective: Message sending with advanced features
  • Features:
    • Priorities: 1-10 system (default 5)
    • Delays: Seconds delay before processing
    • Custom Job IDs: For tracking and relationships
    • Async/sync versions: Compatibility with different contexts

Job Cancellation (bullmq_cancel_execution.py)

  • Objective: Cancel jobs before they are processed
  • Features:
    • Individual and batch cancellation
    • Intelligent search by job_id across multiple states
    • Synchronous and asynchronous versions
    • Handles jobs in states: waiting, delayed, prioritized

👁️ Monitoring and Visibility (bullmq_visibility.py)

  • Objective: Real-time monitoring of queue status
  • Features:
    • Per-queue statistics: Job count by state
    • System health: Detection of saturated or problematic queues
    • Global summary: Consolidated view of all queues
    • Automatic recommendations: Alerts and optimization suggestions

💾 Backup and Restore (bullmaq_backup.py)

  • Objective: Complete backup/restore system with MongoDB
  • Features:
    • Scalable structure: One MongoDB document per job
    • Complete backup: Captures all job states (waiting, delayed, active, completed, failed)
    • Selective restore: By queue, state, or specific backup
    • Backup management: List, delete, and recreate backups
    • MongoDB integration: bullmqJobBackup collection for persistence

📋 Backup Data Structure

Each job is stored as an individual document in MongoDB:

{
  "backup_id": "backup_20260105_120000",
  "createAt": "2026-01-05T12:00:00.000Z",
  "updateAt": "2026-01-05T12:00:00.000Z",
  "worker": "processing_queue",
  "job_id": "job_123",
  "job_name": "process_order",
  "job_state": "waiting",
  "message": {
    "order_id": 12345,
    "customer": "John Doe"
  },
  "delay": 0,
  "priority": 5,
  "attempts": 0
}

🚀 Use Cases

1. Message Processing

from bullmq_sender import bullmq_sender
from consumer_bullmq import consumer_bullmq

# Send message with high priority and delay
job_id = bullmq_sender(
    worker="urgent_tasks",
    priority=9,
    delay_sec=10,
    message={"task": "process_payment", "amount": 1000},
    job_id="payment_12345"
)

# Process messages
consumer_bullmq(worker="urgent_tasks", function_process=payment_processor)

2. Backup and Restore

from bullmaq_backup import bullmq_backup, bullmq_restore, bullmq_delete_backup

# Create complete backup
result = bullmq_backup(
    db=mongodb_connection,
    queue_names=["orders", "payments", "notifications"],
    backup_name="daily_backup_20260105"
)

# Restore from backup
restore_result = bullmq_restore(
    db=mongodb_connection,
    backup_id="daily_backup_20260105",
    restore_states=["waiting", "delayed"]  # Only pending jobs
)

3. Monitoring and Cancellation

from bullmq_visibility import bullmq_visibility_summary
from bullmq_cancel_execution import bullmq_cancel_execution

# System monitoring
summary = bullmq_visibility_summary(["orders", "payments"])
print(f"Total jobs: {summary['total_jobs']}")

# Cancel specific job
cancel_result = bullmq_cancel_execution(
    job_id="payment_12345",
    queue_name="payments"
)

🔧 Configuration

Environment Variables

  • REDIS_HOST: Redis URL (default: redis://192.168.1.163:6379)
  • MongoDB configuration through tredops_eval.connect

Dependencies

  • bullmq: BullMQ client for Python
  • pymongo: MongoDB driver
  • asyncio: For asynchronous operations

🧪 Testing

The project includes comprehensive tests that verify:

  • ✅ Functionality with real BullMQ
  • ✅ Integration with real MongoDB
  • ✅ Correct backup document structure
  • ✅ Restore and cancellation operations
  • ✅ Error handling and edge cases
# Run tests
pytest tredops_strategy_worker/bullmaq_backup_test.py -v

📊 Advantages

  • 🚀 Scalability: Individual documents per job, supports thousands of messages
  • 🔒 Reliability: Backup/restore system for operational continuity
  • 📈 Observability: Real-time monitoring and detailed metrics
  • ⚡ Performance: Optimized operations with aggregation pipelines
  • 🔧 Flexibility: Configurable APIs for different use cases
  • 🛡️ Robustness: Complete error handling and automatic recovery

Developed for TREDOPS - Enterprise BullMQ management system

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

bullmq_py-1.2.6.tar.gz (53.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

bullmq_py-1.2.6-py3-none-any.whl (68.8 kB view details)

Uploaded Python 3

File details

Details for the file bullmq_py-1.2.6.tar.gz.

File metadata

  • Download URL: bullmq_py-1.2.6.tar.gz
  • Upload date:
  • Size: 53.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.0

File hashes

Hashes for bullmq_py-1.2.6.tar.gz
Algorithm Hash digest
SHA256 7c37601919b22de0518675f23ef901e4b70cab55c2a0f18a27c7019cfc8032ff
MD5 91cdc97107a4494d8c2ad685b4e2ee57
BLAKE2b-256 3c8b3e5df1d97b25c6071ee4ba665cf2cbaf37a1394d6f7fbfe785626c6a4f66

See more details on using hashes here.

File details

Details for the file bullmq_py-1.2.6-py3-none-any.whl.

File metadata

  • Download URL: bullmq_py-1.2.6-py3-none-any.whl
  • Upload date:
  • Size: 68.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.0

File hashes

Hashes for bullmq_py-1.2.6-py3-none-any.whl
Algorithm Hash digest
SHA256 b50ec722fc061ccf94edfb17ddb503f038ead724a307399cf20126c2b5045fad
MD5 3b7fde622da8bf978cc2d48e3cd1a1fc
BLAKE2b-256 6d0a4b4b4da42e113824ddcb5bbe9c68bfa9d52e86bfa916781639456e21855e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page