BullMQ manager logic
Project description
TREDOPS BULLMQ UTILS
A complete utility library for BullMQ management and administration, designed to simplify message queue operations in enterprise-level Python applications.
🎯 Objectives
This library provides a comprehensive set of tools to:
- Simplify BullMQ management with intuitive Python interfaces
- Monitor and administer queues in real-time
- Implement backup/restore systems for operational continuity
- Handle priorities, delays, and job cancellations efficiently
- Integrate with MongoDB for persistence and auditing
📦 Main Components
🔧 BullMQ Consumer (consumer_bullmq.py)
- Objective: Simplified and robust consumer for processing messages
- Features:
- Safe signal handling (SIGINT/SIGTERM)
- Automatic queue configuration
- Integrated logging and error handling
- Reduced from 200+ lines to 78 lines of clean code
📤 BullMQ Sender (bullmq_sender.py)
- Objective: Message sending with advanced features
- Features:
- Priorities: 1-10 system (default 5)
- Delays: Seconds delay before processing
- Custom Job IDs: For tracking and relationships
- Async/sync versions: Compatibility with different contexts
❌ Job Cancellation (bullmq_cancel_execution.py)
- Objective: Cancel jobs before they are processed
- Features:
- Individual and batch cancellation
- Intelligent search by job_id across multiple states
- Synchronous and asynchronous versions
- Handles jobs in states: waiting, delayed, prioritized
👁️ Monitoring and Visibility (bullmq_visibility.py)
- Objective: Real-time monitoring of queue status
- Features:
- Per-queue statistics: Job count by state
- System health: Detection of saturated or problematic queues
- Global summary: Consolidated view of all queues
- Automatic recommendations: Alerts and optimization suggestions
💾 Backup and Restore (bullmaq_backup.py)
- Objective: Complete backup/restore system with MongoDB
- Features:
- Scalable structure: One MongoDB document per job
- Complete backup: Captures all job states (waiting, delayed, active, completed, failed)
- Selective restore: By queue, state, or specific backup
- Backup management: List, delete, and recreate backups
- MongoDB integration:
bullmqJobBackupcollection for persistence
📋 Backup Data Structure
Each job is stored as an individual document in MongoDB:
{
"backup_id": "backup_20260105_120000",
"createAt": "2026-01-05T12:00:00.000Z",
"updateAt": "2026-01-05T12:00:00.000Z",
"worker": "processing_queue",
"job_id": "job_123",
"job_name": "process_order",
"job_state": "waiting",
"message": {
"order_id": 12345,
"customer": "John Doe"
},
"delay": 0,
"priority": 5,
"attempts": 0
}
🚀 Use Cases
1. Message Processing
from bullmq_sender import bullmq_sender
from consumer_bullmq import consumer_bullmq
# Send message with high priority and delay
job_id = bullmq_sender(
worker="urgent_tasks",
priority=9,
delay_sec=10,
message={"task": "process_payment", "amount": 1000},
job_id="payment_12345"
)
# Process messages
consumer_bullmq(worker="urgent_tasks", function_process=payment_processor)
2. Backup and Restore
from bullmaq_backup import bullmq_backup, bullmq_restore, bullmq_delete_backup
# Create complete backup
result = bullmq_backup(
db=mongodb_connection,
queue_names=["orders", "payments", "notifications"],
backup_name="daily_backup_20260105"
)
# Restore from backup
restore_result = bullmq_restore(
db=mongodb_connection,
backup_id="daily_backup_20260105",
restore_states=["waiting", "delayed"] # Only pending jobs
)
3. Monitoring and Cancellation
from bullmq_visibility import bullmq_visibility_summary
from bullmq_cancel_execution import bullmq_cancel_execution
# System monitoring
summary = bullmq_visibility_summary(["orders", "payments"])
print(f"Total jobs: {summary['total_jobs']}")
# Cancel specific job
cancel_result = bullmq_cancel_execution(
job_id="payment_12345",
queue_name="payments"
)
🔧 Configuration
Environment Variables
REDIS_HOST: Redis URL (default:redis://192.168.1.163:6379)- MongoDB configuration through
tredops_eval.connect
Dependencies
bullmq: BullMQ client for Pythonpymongo: MongoDB driverasyncio: For asynchronous operations
🧪 Testing
The project includes comprehensive tests that verify:
- ✅ Functionality with real BullMQ
- ✅ Integration with real MongoDB
- ✅ Correct backup document structure
- ✅ Restore and cancellation operations
- ✅ Error handling and edge cases
# Run tests
pytest tredops_strategy_worker/bullmaq_backup_test.py -v
📊 Advantages
- 🚀 Scalability: Individual documents per job, supports thousands of messages
- 🔒 Reliability: Backup/restore system for operational continuity
- 📈 Observability: Real-time monitoring and detailed metrics
- ⚡ Performance: Optimized operations with aggregation pipelines
- 🔧 Flexibility: Configurable APIs for different use cases
- 🛡️ Robustness: Complete error handling and automatic recovery
Developed for TREDOPS - Enterprise BullMQ management system
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file bullmq_py-1.2.6.tar.gz.
File metadata
- Download URL: bullmq_py-1.2.6.tar.gz
- Upload date:
- Size: 53.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7c37601919b22de0518675f23ef901e4b70cab55c2a0f18a27c7019cfc8032ff
|
|
| MD5 |
91cdc97107a4494d8c2ad685b4e2ee57
|
|
| BLAKE2b-256 |
3c8b3e5df1d97b25c6071ee4ba665cf2cbaf37a1394d6f7fbfe785626c6a4f66
|
File details
Details for the file bullmq_py-1.2.6-py3-none-any.whl.
File metadata
- Download URL: bullmq_py-1.2.6-py3-none-any.whl
- Upload date:
- Size: 68.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b50ec722fc061ccf94edfb17ddb503f038ead724a307399cf20126c2b5045fad
|
|
| MD5 |
3b7fde622da8bf978cc2d48e3cd1a1fc
|
|
| BLAKE2b-256 |
6d0a4b4b4da42e113824ddcb5bbe9c68bfa9d52e86bfa916781639456e21855e
|