Simple and lightweight queue synchronization between Python and Laravel using Redis
Project description
LaraQueue
Simple and lightweight queue synchronization between Python and Laravel using Redis. Process Laravel jobs in Python and vice versa.
Fork Notice: This package is a fork of the original python-laravel-queue by @sinanbekar. This version includes critical bug fixes, comprehensive tests, and updated compatibility with newer dependencies.
🚀 NEW in v1.0.0: Full Async Support with asyncio for high-performance applications!
NOTE: This package is now stable and production-ready with both synchronous and asynchronous APIs.
✨ New Features
🚀 Async Support (v1.0.0)
Full asyncio support for high loads:
- Asynchronous processing - use
AsyncQueuefor maximum performance - Parallel processing - configurable number of concurrent tasks
- AsyncIOEventEmitter - asynchronous event handlers
- High performance - up to 50+ concurrent tasks
- asyncio compatibility - full integration with Python async/await ecosystem
import asyncio
import aioredis
from lara_queue import AsyncQueue
async def main():
# Create async Redis client
redis_client = await aioredis.from_url("redis://localhost:6379")
# Create async queue
queue = AsyncQueue(
client=redis_client,
queue='async_worker',
max_concurrent_jobs=20, # 20 concurrent tasks
enable_metrics=True
)
# Async handler
@queue.handler
async def process_email(data):
job_data = data.get('data', {})
await asyncio.sleep(0.1) # Async work
print(f"Email sent: {job_data.get('to')}")
# Add tasks asynchronously
for i in range(100):
await queue.push('App\\Jobs\\EmailJob', {
'to': f'user{i}@example.com',
'subject': f'Email {i}'
})
# Start processing
await queue.listen()
# Run
asyncio.run(main())
🛡️ Robust Error Handling (v0.0.3)
The package now includes a comprehensive error handling system:
- Automatic reconnection to Redis when connection is lost
- Retry logic with smart delays
- Detailed logging of all operations and errors
- Protection against invalid data - worker continues running when encountering problematic messages
🔄 Graceful Shutdown (v0.0.3)
Advanced signal handling for clean worker termination:
- Signal handlers for SIGINT (Ctrl+C) and SIGTERM (kill)
- Current job completion - waits for job to finish before stopping
- Automatic registration - handlers are set up when you call
listen() - Manual shutdown - programmatically trigger shutdown with
queue.shutdown() - No job loss - ensures current job completes successfully
💀 Dead Letter Queue (v0.0.4)
Advanced job failure handling with retry mechanisms:
- Automatic retry with exponential backoff (5s, 10s, 20s, 40s, max 60s)
- Configurable max retries (default: 3 attempts)
- Dead letter queue for permanently failed jobs
- Job reprocessing from dead letter queue
- Comprehensive failure tracking with error details and timestamps
🔄 Advanced Retry Mechanism (v0.0.5)
Powerful and flexible retry system with multiple strategies:
- Multiple retry strategies: Exponential, Linear, Fixed, Custom
- Configurable retry parameters: delays, max attempts, jitter
- Exception-based retry control: retry only for specific error types
- Retry statistics and monitoring: track success rates and performance
- Runtime configuration updates: change retry settings without restart
- Jitter support: prevent thundering herd problems
📊 Metrics & Monitoring (v0.0.5)
Comprehensive metrics collection and performance monitoring:
- Real-time metrics: track processed, successful, and failed jobs
- Performance analytics: average processing time, throughput, min/max times
- Job type breakdown: metrics per job type with success rates
- Error tracking: detailed error counts and types
- Historical data: configurable history size for trend analysis
- Memory efficient: automatic cleanup of old metrics data
# Create queue with Dead Letter Queue
queue = Queue(
redis_client,
queue='email_worker',
dead_letter_queue='email_failed', # Custom DLQ name
max_retries=3 # Retry failed jobs 3 times
)
# Get failed jobs
failed_jobs = queue.get_dead_letter_jobs(limit=100)
# Reprocess a failed job
queue.reprocess_dead_letter_job(failed_jobs[0])
# Clear all failed jobs
queue.clear_dead_letter_queue()
🔄 Advanced Retry Configuration
from lara_queue import Queue, RetryStrategy
# Exponential backoff strategy (default)
queue_exponential = Queue(
redis_client,
queue='email_worker',
max_retries=5,
retry_strategy=RetryStrategy.EXPONENTIAL,
retry_delay=2, # Initial delay: 2s
retry_max_delay=60, # Max delay: 60s
retry_backoff_multiplier=2.0, # Multiply by 2 each time
retry_jitter=True, # Add randomness to prevent thundering herd
retry_exceptions=[ValueError, ConnectionError] # Only retry these exceptions
)
# Linear retry strategy
queue_linear = Queue(
redis_client,
queue='notification_worker',
max_retries=4,
retry_strategy=RetryStrategy.LINEAR,
retry_delay=5, # Each retry: 5s, 10s, 15s, 20s
retry_jitter=False # No randomness for predictable delays
)
# Fixed delay strategy
queue_fixed = Queue(
redis_client,
queue='report_worker',
max_retries=3,
retry_strategy=RetryStrategy.FIXED,
retry_delay=10, # Always 10 seconds between retries
retry_jitter=True # Add some randomness
)
# Custom retry function
def fibonacci_retry_delay(attempt: int) -> int:
"""Fibonacci-based retry delay: 1, 1, 2, 3, 5, 8, 13..."""
if attempt <= 1:
return 1
elif attempt == 2:
return 1
else:
a, b = 1, 1
for _ in range(attempt - 2):
a, b = b, a + b
return min(b, 20) # Cap at 20 seconds
queue_custom = Queue(
redis_client,
queue='analytics_worker',
max_retries=6,
retry_strategy=RetryStrategy.CUSTOM,
retry_custom_function=fibonacci_retry_delay,
retry_exceptions=[Exception] # Retry for all exceptions
)
# Monitor retry statistics
stats = queue_exponential.get_retry_statistics()
print(f"Total retries: {stats['total_retries']}")
print(f"Success rate: {stats['success_rate']:.1f}%")
print(f"Dead letter jobs: {stats['dead_letter_jobs']}")
# Update retry configuration at runtime
queue_exponential.update_retry_config(
max_retries=7,
retry_delay=1,
retry_strategy=RetryStrategy.LINEAR
)
# Reset retry statistics
queue_exponential.reset_retry_statistics()
📊 Metrics Configuration
from lara_queue import Queue, MetricsCollector
# Create queue with metrics enabled
queue = Queue(
redis_client,
queue='monitored_worker',
enable_metrics=True, # Enable metrics collection
metrics_history_size=1000 # Keep last 1000 jobs in history
)
# Get comprehensive metrics
metrics = queue.get_metrics()
print(f"Total processed: {metrics['general']['total_processed']}")
print(f"Success rate: {metrics['general']['success_rate']:.1f}%")
print(f"Throughput: {metrics['performance']['throughput_per_second']:.2f} jobs/sec")
print(f"Avg processing time: {metrics['performance']['avg_processing_time']:.3f}s")
# Get metrics for specific job type
email_metrics = queue.get_job_type_metrics('App\\Jobs\\EmailJob')
if email_metrics:
print(f"Email jobs: {email_metrics['total']} total, {email_metrics['success_rate']:.1f}% success")
# Get recent job history
recent_jobs = queue.get_recent_jobs(limit=10)
for job in recent_jobs:
status = "✅" if job['success'] else "❌"
print(f"{status} {job['name']} - {job['processing_time']:.3f}s")
# Get performance summary
summary = queue.get_performance_summary()
print(f"Uptime: {summary['general']['uptime_seconds']:.1f}s")
print(f"Total retries: {summary['general']['total_retries']}")
# Reset metrics
queue.reset_metrics()
# Disable metrics for better performance
queue_no_metrics = Queue(
redis_client,
queue='high_performance_worker',
enable_metrics=False # Disable metrics collection
)
🏷️ Type Hints (v0.0.4)
Complete type annotations for better IDE support and code safety:
- Full type coverage for all methods and parameters
- IDE autocompletion and type checking
- Runtime type safety with proper annotations
- Optional parameters with
Optional[T]types - Generic types for collections and data structures
from typing import Dict, List, Any, Optional
from lara_queue import Queue
# Typed queue creation
queue: Queue = Queue(
client=redis_client,
queue='typed_worker',
dead_letter_queue='typed_failed',
max_retries=3
)
# Typed job processing
@queue.handler
def process_email(data: Dict[str, Any]) -> None:
email_type: str = data.get('type', 'unknown')
recipient: str = data.get('recipient', 'unknown')
subject: Optional[str] = data.get('subject')
# Type-safe processing
if 'invalid' in recipient.lower():
raise ValueError(f"Invalid email address: {recipient}")
print(f"Email sent to {recipient}")
# Typed DLQ operations
failed_jobs: List[Dict[str, Any]] = queue.get_dead_letter_jobs(limit=100)
success: bool = queue.reprocess_dead_letter_job(failed_jobs[0])
cleared_count: int = queue.clear_dead_letter_queue()
import logging
# Enable logging for debugging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('lara_queue')
logger.setLevel(logging.DEBUG)
Installation
pip install LaraQueue
Usage
🚀 Async Usage (Recommended for High Performance)
For high-performance applications, use the async API:
import asyncio
import aioredis
from lara_queue import AsyncQueue, RetryStrategy
async def main():
# Create async Redis client
redis_client = await aioredis.from_url("redis://localhost:6379")
# Create async queue with high performance settings
queue = AsyncQueue(
client=redis_client,
queue='async_worker',
max_concurrent_jobs=20, # Process 20 jobs simultaneously
enable_metrics=True,
retry_strategy=RetryStrategy.EXPONENTIAL,
max_retries=3
)
# Async job handler
@queue.handler
async def process_email(data):
job_data = data.get('data', {})
# Simulate async work (API calls, database operations, etc.)
await asyncio.sleep(0.1)
print(f"Email sent to: {job_data.get('to')}")
# Add jobs asynchronously
for i in range(100):
await queue.push('App\\Jobs\\EmailJob', {
'to': f'user{i}@example.com',
'subject': f'Welcome Email {i}',
'body': 'Welcome to our service!'
})
# Start processing
await queue.listen()
# Run the async application
asyncio.run(main())
High-Performance Async Example
import asyncio
import aioredis
from lara_queue import AsyncQueue
async def high_performance_worker():
redis_client = await aioredis.from_url("redis://localhost:6379")
# High-performance queue configuration
queue = AsyncQueue(
client=redis_client,
queue='high_perf_worker',
max_concurrent_jobs=50, # 50 concurrent jobs
enable_metrics=True,
metrics_history_size=10000
)
@queue.handler
async def fast_processor(data):
job_data = data.get('data', {})
# Fast async processing
await asyncio.sleep(0.05) # 50ms processing time
# Your business logic here
result = await process_business_logic(job_data)
return result
# Process thousands of jobs efficiently
await queue.listen()
async def process_business_logic(data):
# Simulate business logic
await asyncio.sleep(0.02)
return f"Processed: {data.get('id')}"
# Run high-performance worker
asyncio.run(high_performance_worker())
Async with Laravel Integration
import asyncio
import aioredis
from lara_queue import AsyncQueue
async def laravel_async_integration():
redis_client = await aioredis.from_url("redis://localhost:6379")
# Queue for processing Laravel jobs
queue = AsyncQueue(
client=redis_client,
queue='python_worker', # Queue name Laravel sends to
max_concurrent_jobs=10
)
@queue.handler
async def handle_laravel_email(data):
job_data = data.get('data', {})
# Process Laravel email job
await send_email_async(
to=job_data.get('to'),
subject=job_data.get('subject'),
body=job_data.get('body')
)
@queue.handler
async def handle_laravel_notification(data):
job_data = data.get('data', {})
# Process Laravel notification
await send_notification_async(
user_id=job_data.get('user_id'),
message=job_data.get('message')
)
# Send jobs to Laravel
laravel_queue = AsyncQueue(
client=redis_client,
queue='laravel_worker' # Queue name Laravel listens to
)
await laravel_queue.push('App\\Jobs\\UpdateUserJob', {
'user_id': 123,
'data': {'last_login': time.time()}
})
# Start processing
await queue.listen()
async def send_email_async(to, subject, body):
# Your async email sending logic
await asyncio.sleep(0.1)
print(f"Email sent to {to}")
async def send_notification_async(user_id, message):
# Your async notification logic
await asyncio.sleep(0.05)
print(f"Notification sent to user {user_id}")
# Run Laravel integration
asyncio.run(laravel_async_integration())
Synchronous Usage (Legacy)
Listen for jobs in Python
from lara_queue import Queue
from redis import Redis
r = Redis(host='localhost', port=6379, db=0)
queue_python = Queue(r, queue='python')
@queue_python.handler
def handle(data):
name = data['name'] # job name
job_data = data['data'] # job data
print('Processing: ' + job_data['a'] + ' ' + job_data['b'] + ' ' + job_data['c'])
queue_python.listen()
Send jobs from Laravel
<?php
$job = new \App\Jobs\TestJob('hi', 'send to', 'python');
dispatch($job)->onQueue('python');
Send jobs to Laravel from Python
from lara_queue import Queue
from redis import Redis
r = Redis(host='localhost', port=6379, db=0)
queue_laravel = Queue(r, queue='laravel')
queue_laravel.push('App\\Jobs\\TestJob', {'a': 'hello', 'b': 'send to', 'c': 'laravel'})
TestJob in Laravel
<?php
namespace App\Jobs;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Log;
class TestJob implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public $a, $b, $c;
/**
* Create a new job instance.
*
* @return void
*/
public function __construct($a, $b, $c)
{
$this->a = $a;
$this->b = $b;
$this->c = $c;
}
/**
* Execute the job.
*
* @return void
*/
public function handle()
{
Log::info('TEST: ' . $this->a . ' ' . $this->b . ' ' . $this->c);
}
}
Process jobs in Laravel
You need to :listen (or :work) the preferred queue name to handle jobs sent from Python in Laravel.
php artisan queue:listen --queue=laravel
Graceful Shutdown Example
import logging
from lara_queue import Queue
from redis import Redis
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
r = Redis(host='localhost', port=6379, db=0)
queue = Queue(r, queue='python_worker')
@queue.handler
def handle_job(data):
logger.info(f"Processing job: {data['name']}")
# Simulate some work
import time
time.sleep(5)
logger.info("Job completed!")
logger.info("Worker starting...")
logger.info("Press Ctrl+C to trigger graceful shutdown")
logger.info("Current job will complete before stopping")
try:
queue.listen() # Signal handlers auto-registered
except KeyboardInterrupt:
logger.info("Worker stopped gracefully")
Manual Shutdown Example
queue = Queue(r, queue='test')
@queue.handler
def handle_job(data):
# Process job
process_data(data)
# Trigger shutdown programmatically
if should_stop():
queue.shutdown()
queue.listen()
Error Handling Example
from lara_queue import Queue
from redis import Redis
from redis.exceptions import ConnectionError
try:
r = Redis(host='localhost', port=6379, db=0)
queue = Queue(r, queue='python_worker')
@queue.handler
def handle_job(data):
print(f"Processing job: {data['name']}")
queue.listen() # Worker is now resilient to Redis errors!
except ConnectionError as e:
print(f"Failed to connect to Redis: {e}")
except KeyboardInterrupt:
print("Worker stopped gracefully")
Retry Strategy Recommendations
| Strategy | Use Case | Example |
|---|---|---|
| Exponential | Network/DB temporary failures | API calls, database connections |
| Linear | Predictable resource limits | Rate-limited APIs, queue backpressure |
| Fixed | Simple retry scenarios | File processing, simple validations |
| Custom | Complex business logic | Fibonacci delays, circuit breaker patterns |
Best Practices:
- Use jitter=True to prevent thundering herd problems
- Set retry_exceptions to only retry recoverable errors
- Monitor retry statistics to optimize your retry strategy
- Use dead letter queues for permanently failed jobs
- Consider max_delay limits to prevent excessive wait times
Features
- ✅ Async Support (v1.0.0) - Full asyncio support for high-performance applications
- ✅ Concurrent Processing - Configurable concurrent job processing (up to 50+ jobs)
- ✅ Redis driver support - Queue communication between Python and Laravel
- ✅ Bidirectional job processing - Send and receive jobs in both directions
- ✅ PHP object serialization - Compatible with Laravel's job serialization format
- ✅ Event-driven architecture - Simple decorator-based job handlers (sync & async)
- ✅ Automatic reconnection - Resilient to network issues
- ✅ Comprehensive error handling - Detailed logging and error recovery
- ✅ Graceful shutdown - Signal handling (SIGINT, SIGTERM) with job completion
- ✅ Advanced retry mechanisms - Multiple strategies with full configurability
- ✅ Retry statistics and monitoring - Track performance and success rates
- ✅ Comprehensive metrics collection - Real-time performance monitoring
- ✅ Production ready - Battle-tested with extensive test coverage
- ✅ Tested - 100+ unit and integration tests included (sync + async)
Requirements
- Python 3.7+
- Redis 4.0+
- Laravel 8+ (for Laravel side)
- aioredis 2.0+ (for async support)
Performance Recommendations
Async vs Sync Performance
| Feature | Sync Queue | Async Queue | Performance Gain |
|---|---|---|---|
| Concurrent Jobs | 1 | 1-50+ | 10-50x faster |
| Throughput | ~100 jobs/sec | ~1000+ jobs/sec | 10x+ faster |
| Memory Usage | Lower | Slightly higher | ~20% more |
| CPU Usage | Higher | Lower | ~30% less |
| I/O Efficiency | Blocking | Non-blocking | Much better |
Recommended Settings
# High Performance Async Configuration
queue = AsyncQueue(
client=redis_client,
queue='high_perf',
max_concurrent_jobs=20, # Adjust based on your system
enable_metrics=True,
retry_strategy=RetryStrategy.EXPONENTIAL,
max_retries=3
)
# For CPU-intensive tasks
queue = AsyncQueue(
client=redis_client,
queue='cpu_intensive',
max_concurrent_jobs=4, # Match CPU cores
enable_metrics=True
)
# For I/O-intensive tasks (API calls, DB operations)
queue = AsyncQueue(
client=redis_client,
queue='io_intensive',
max_concurrent_jobs=50, # High concurrency
enable_metrics=True
)
Development
# Install development dependencies
pip install -e .
pip install -r requirements-dev.txt
# Run tests
pytest tests/ -v
# Run async tests
pytest tests/test_async_queue.py -v
# Run specific test file
pytest tests/test_error_handling.py -v
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
License
MIT License - see LICENSE file for details.
Credits
- Original package: python-laravel-queue by @sinanbekar
- This fork maintained with critical bug fixes and improvements
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file laraqueue-1.1.0.tar.gz.
File metadata
- Download URL: laraqueue-1.1.0.tar.gz
- Upload date:
- Size: 86.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c8c9f3532b7dad8c48eca3a4085b093008e0975c3284abe3fece6aa192fbee60
|
|
| MD5 |
f811dbbd3926d920faedddde58af0c8e
|
|
| BLAKE2b-256 |
5100ee5fb27ff5b4ceb158430f776a4cf62a3b195679a4bc0c7f879d3b8c0767
|
File details
Details for the file laraqueue-1.1.0-py3-none-any.whl.
File metadata
- Download URL: laraqueue-1.1.0-py3-none-any.whl
- Upload date:
- Size: 33.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
43749bce45606c359f1c0d6d47dcc7cfe3a8d7fe7f5d6442c9259388614b86a5
|
|
| MD5 |
3288f0d1e6c7c17a9eeae6306f39635c
|
|
| BLAKE2b-256 |
fb76d721003ba6f8f498321a283f7e6834adfb1c7f8bc7a2c15fd7c7c7a366d4
|