Skip to main content

Simple and lightweight queue synchronization between Python and Laravel using Redis

Project description

LaraQueue

Simple and lightweight queue synchronization between Python and Laravel using Redis. Process Laravel jobs in Python and vice versa.

Fork Notice: This package is a fork of the original python-laravel-queue by @sinanbekar. This version includes critical bug fixes, comprehensive tests, and updated compatibility with newer dependencies.

🚀 NEW in v1.0.0: Full Async Support with asyncio for high-performance applications!

NOTE: This package is now stable and production-ready with both synchronous and asynchronous APIs.

✨ New Features

🚀 Async Support (v1.0.0)

Full asyncio support for high loads:

  • Asynchronous processing - use AsyncQueue for maximum performance
  • Parallel processing - configurable number of concurrent tasks
  • AsyncIOEventEmitter - asynchronous event handlers
  • High performance - up to 50+ concurrent tasks
  • asyncio compatibility - full integration with Python async/await ecosystem
import asyncio
import aioredis
from lara_queue import AsyncQueue

async def main():
    # Create async Redis client
    redis_client = await aioredis.from_url("redis://localhost:6379")
    
    # Create async queue
    queue = AsyncQueue(
        client=redis_client,
        queue='async_worker',
        max_concurrent_jobs=20,  # 20 concurrent tasks
        enable_metrics=True
    )
    
    # Async handler
    @queue.handler
    async def process_email(data):
        job_data = data.get('data', {})
        await asyncio.sleep(0.1)  # Async work
        print(f"Email sent: {job_data.get('to')}")
    
    # Add tasks asynchronously
    for i in range(100):
        await queue.push('App\\Jobs\\EmailJob', {
            'to': f'user{i}@example.com',
            'subject': f'Email {i}'
        })
    
    # Start processing
    await queue.listen()

# Run
asyncio.run(main())

🛡️ Robust Error Handling (v0.0.3)

The package now includes a comprehensive error handling system:

  • Automatic reconnection to Redis when connection is lost
  • Retry logic with smart delays
  • Detailed logging of all operations and errors
  • Protection against invalid data - worker continues running when encountering problematic messages

🔄 Graceful Shutdown (v0.0.3)

Advanced signal handling for clean worker termination:

  • Signal handlers for SIGINT (Ctrl+C) and SIGTERM (kill)
  • Current job completion - waits for job to finish before stopping
  • Automatic registration - handlers are set up when you call listen()
  • Manual shutdown - programmatically trigger shutdown with queue.shutdown()
  • No job loss - ensures current job completes successfully

💀 Dead Letter Queue (v0.0.4)

Advanced job failure handling with retry mechanisms:

  • Automatic retry with exponential backoff (5s, 10s, 20s, 40s, max 60s)
  • Configurable max retries (default: 3 attempts)
  • Dead letter queue for permanently failed jobs
  • Job reprocessing from dead letter queue
  • Comprehensive failure tracking with error details and timestamps

🔄 Advanced Retry Mechanism (v0.0.5)

Powerful and flexible retry system with multiple strategies:

  • Multiple retry strategies: Exponential, Linear, Fixed, Custom
  • Configurable retry parameters: delays, max attempts, jitter
  • Exception-based retry control: retry only for specific error types
  • Retry statistics and monitoring: track success rates and performance
  • Runtime configuration updates: change retry settings without restart
  • Jitter support: prevent thundering herd problems

📊 Metrics & Monitoring (v0.0.5)

Comprehensive metrics collection and performance monitoring:

  • Real-time metrics: track processed, successful, and failed jobs
  • Performance analytics: average processing time, throughput, min/max times
  • Job type breakdown: metrics per job type with success rates
  • Error tracking: detailed error counts and types
  • Historical data: configurable history size for trend analysis
  • Memory efficient: automatic cleanup of old metrics data
# Create queue with Dead Letter Queue
queue = Queue(
    redis_client, 
    queue='email_worker',
    dead_letter_queue='email_failed',  # Custom DLQ name
    max_retries=3  # Retry failed jobs 3 times
)

# Get failed jobs
failed_jobs = queue.get_dead_letter_jobs(limit=100)

# Reprocess a failed job
queue.reprocess_dead_letter_job(failed_jobs[0])

# Clear all failed jobs
queue.clear_dead_letter_queue()

🔄 Advanced Retry Configuration

from lara_queue import Queue, RetryStrategy

# Exponential backoff strategy (default)
queue_exponential = Queue(
    redis_client,
    queue='email_worker',
    max_retries=5,
    retry_strategy=RetryStrategy.EXPONENTIAL,
    retry_delay=2,                    # Initial delay: 2s
    retry_max_delay=60,               # Max delay: 60s
    retry_backoff_multiplier=2.0,     # Multiply by 2 each time
    retry_jitter=True,                # Add randomness to prevent thundering herd
    retry_exceptions=[ValueError, ConnectionError]  # Only retry these exceptions
)

# Linear retry strategy
queue_linear = Queue(
    redis_client,
    queue='notification_worker',
    max_retries=4,
    retry_strategy=RetryStrategy.LINEAR,
    retry_delay=5,                    # Each retry: 5s, 10s, 15s, 20s
    retry_jitter=False                # No randomness for predictable delays
)

# Fixed delay strategy
queue_fixed = Queue(
    redis_client,
    queue='report_worker',
    max_retries=3,
    retry_strategy=RetryStrategy.FIXED,
    retry_delay=10,                   # Always 10 seconds between retries
    retry_jitter=True                 # Add some randomness
)

# Custom retry function
def fibonacci_retry_delay(attempt: int) -> int:
    """Fibonacci-based retry delay: 1, 1, 2, 3, 5, 8, 13..."""
    if attempt <= 1:
        return 1
    elif attempt == 2:
        return 1
    else:
        a, b = 1, 1
        for _ in range(attempt - 2):
            a, b = b, a + b
        return min(b, 20)  # Cap at 20 seconds

queue_custom = Queue(
    redis_client,
    queue='analytics_worker',
    max_retries=6,
    retry_strategy=RetryStrategy.CUSTOM,
    retry_custom_function=fibonacci_retry_delay,
    retry_exceptions=[Exception]      # Retry for all exceptions
)

# Monitor retry statistics
stats = queue_exponential.get_retry_statistics()
print(f"Total retries: {stats['total_retries']}")
print(f"Success rate: {stats['success_rate']:.1f}%")
print(f"Dead letter jobs: {stats['dead_letter_jobs']}")

# Update retry configuration at runtime
queue_exponential.update_retry_config(
    max_retries=7,
    retry_delay=1,
    retry_strategy=RetryStrategy.LINEAR
)

# Reset retry statistics
queue_exponential.reset_retry_statistics()

📊 Metrics Configuration

from lara_queue import Queue, MetricsCollector

# Create queue with metrics enabled
queue = Queue(
    redis_client,
    queue='monitored_worker',
    enable_metrics=True,              # Enable metrics collection
    metrics_history_size=1000         # Keep last 1000 jobs in history
)

# Get comprehensive metrics
metrics = queue.get_metrics()
print(f"Total processed: {metrics['general']['total_processed']}")
print(f"Success rate: {metrics['general']['success_rate']:.1f}%")
print(f"Throughput: {metrics['performance']['throughput_per_second']:.2f} jobs/sec")
print(f"Avg processing time: {metrics['performance']['avg_processing_time']:.3f}s")

# Get metrics for specific job type
email_metrics = queue.get_job_type_metrics('App\\Jobs\\EmailJob')
if email_metrics:
    print(f"Email jobs: {email_metrics['total']} total, {email_metrics['success_rate']:.1f}% success")

# Get recent job history
recent_jobs = queue.get_recent_jobs(limit=10)
for job in recent_jobs:
    status = "✅" if job['success'] else "❌"
    print(f"{status} {job['name']} - {job['processing_time']:.3f}s")

# Get performance summary
summary = queue.get_performance_summary()
print(f"Uptime: {summary['general']['uptime_seconds']:.1f}s")
print(f"Total retries: {summary['general']['total_retries']}")

# Reset metrics
queue.reset_metrics()

# Disable metrics for better performance
queue_no_metrics = Queue(
    redis_client,
    queue='high_performance_worker',
    enable_metrics=False  # Disable metrics collection
)

🏷️ Type Hints (v0.0.4)

Complete type annotations for better IDE support and code safety:

  • Full type coverage for all methods and parameters
  • IDE autocompletion and type checking
  • Runtime type safety with proper annotations
  • Optional parameters with Optional[T] types
  • Generic types for collections and data structures
from typing import Dict, List, Any, Optional
from lara_queue import Queue

# Typed queue creation
queue: Queue = Queue(
    client=redis_client,
    queue='typed_worker',
    dead_letter_queue='typed_failed',
    max_retries=3
)

# Typed job processing
@queue.handler
def process_email(data: Dict[str, Any]) -> None:
    email_type: str = data.get('type', 'unknown')
    recipient: str = data.get('recipient', 'unknown')
    subject: Optional[str] = data.get('subject')
    
    # Type-safe processing
    if 'invalid' in recipient.lower():
        raise ValueError(f"Invalid email address: {recipient}")
    
    print(f"Email sent to {recipient}")

# Typed DLQ operations
failed_jobs: List[Dict[str, Any]] = queue.get_dead_letter_jobs(limit=100)
success: bool = queue.reprocess_dead_letter_job(failed_jobs[0])
cleared_count: int = queue.clear_dead_letter_queue()
import logging

# Enable logging for debugging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('lara_queue')
logger.setLevel(logging.DEBUG)

Installation

pip install LaraQueue

Usage

🚀 Async Usage (Recommended for High Performance)

For high-performance applications, use the async API:

import asyncio
import aioredis
from lara_queue import AsyncQueue, RetryStrategy

async def main():
    # Create async Redis client
    redis_client = await aioredis.from_url("redis://localhost:6379")
    
    # Create async queue with high performance settings
    queue = AsyncQueue(
        client=redis_client,
        queue='async_worker',
        max_concurrent_jobs=20,  # Process 20 jobs simultaneously
        enable_metrics=True,
        retry_strategy=RetryStrategy.EXPONENTIAL,
        max_retries=3
    )
    
    # Async job handler
    @queue.handler
    async def process_email(data):
        job_data = data.get('data', {})
        
        # Simulate async work (API calls, database operations, etc.)
        await asyncio.sleep(0.1)
        
        print(f"Email sent to: {job_data.get('to')}")
    
    # Add jobs asynchronously
    for i in range(100):
        await queue.push('App\\Jobs\\EmailJob', {
            'to': f'user{i}@example.com',
            'subject': f'Welcome Email {i}',
            'body': 'Welcome to our service!'
        })
    
    # Start processing
    await queue.listen()

# Run the async application
asyncio.run(main())

High-Performance Async Example

import asyncio
import aioredis
from lara_queue import AsyncQueue

async def high_performance_worker():
    redis_client = await aioredis.from_url("redis://localhost:6379")
    
    # High-performance queue configuration
    queue = AsyncQueue(
        client=redis_client,
        queue='high_perf_worker',
        max_concurrent_jobs=50,  # 50 concurrent jobs
        enable_metrics=True,
        metrics_history_size=10000
    )
    
    @queue.handler
    async def fast_processor(data):
        job_data = data.get('data', {})
        
        # Fast async processing
        await asyncio.sleep(0.05)  # 50ms processing time
        
        # Your business logic here
        result = await process_business_logic(job_data)
        return result
    
    # Process thousands of jobs efficiently
    await queue.listen()

async def process_business_logic(data):
    # Simulate business logic
    await asyncio.sleep(0.02)
    return f"Processed: {data.get('id')}"

# Run high-performance worker
asyncio.run(high_performance_worker())

Async with Laravel Integration

import asyncio
import aioredis
from lara_queue import AsyncQueue

async def laravel_async_integration():
    redis_client = await aioredis.from_url("redis://localhost:6379")
    
    # Queue for processing Laravel jobs
    queue = AsyncQueue(
        client=redis_client,
        queue='python_worker',  # Queue name Laravel sends to
        max_concurrent_jobs=10
    )
    
    @queue.handler
    async def handle_laravel_email(data):
        job_data = data.get('data', {})
        
        # Process Laravel email job
        await send_email_async(
            to=job_data.get('to'),
            subject=job_data.get('subject'),
            body=job_data.get('body')
        )
    
    @queue.handler
    async def handle_laravel_notification(data):
        job_data = data.get('data', {})
        
        # Process Laravel notification
        await send_notification_async(
            user_id=job_data.get('user_id'),
            message=job_data.get('message')
        )
    
    # Send jobs to Laravel
    laravel_queue = AsyncQueue(
        client=redis_client,
        queue='laravel_worker'  # Queue name Laravel listens to
    )
    
    await laravel_queue.push('App\\Jobs\\UpdateUserJob', {
        'user_id': 123,
        'data': {'last_login': time.time()}
    })
    
    # Start processing
    await queue.listen()

async def send_email_async(to, subject, body):
    # Your async email sending logic
    await asyncio.sleep(0.1)
    print(f"Email sent to {to}")

async def send_notification_async(user_id, message):
    # Your async notification logic
    await asyncio.sleep(0.05)
    print(f"Notification sent to user {user_id}")

# Run Laravel integration
asyncio.run(laravel_async_integration())

Synchronous Usage (Legacy)

Listen for jobs in Python

from lara_queue import Queue
from redis import Redis

r = Redis(host='localhost', port=6379, db=0)
queue_python = Queue(r, queue='python')

@queue_python.handler
def handle(data):
    name = data['name']  # job name
    job_data = data['data']  # job data
    print('Processing: ' + job_data['a'] + ' ' + job_data['b'] + ' ' + job_data['c'])

queue_python.listen()

Send jobs from Laravel

<?php
$job = new \App\Jobs\TestJob('hi', 'send to', 'python');
dispatch($job)->onQueue('python');

Send jobs to Laravel from Python

from lara_queue import Queue
from redis import Redis

r = Redis(host='localhost', port=6379, db=0)
queue_laravel = Queue(r, queue='laravel')
queue_laravel.push('App\\Jobs\\TestJob', {'a': 'hello', 'b': 'send to', 'c': 'laravel'})

TestJob in Laravel

<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Log;

class TestJob implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    public $a, $b, $c;

    /**
     * Create a new job instance.
     *
     * @return void
     */
    public function __construct($a, $b, $c)
    {
        $this->a = $a;
        $this->b = $b;
        $this->c = $c;
    }

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
        Log::info('TEST: ' . $this->a . ' ' . $this->b . ' ' . $this->c);
    }
}

Process jobs in Laravel

You need to :listen (or :work) the preferred queue name to handle jobs sent from Python in Laravel.

php artisan queue:listen --queue=laravel

Graceful Shutdown Example

import logging
from lara_queue import Queue
from redis import Redis

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

r = Redis(host='localhost', port=6379, db=0)
queue = Queue(r, queue='python_worker')

@queue.handler
def handle_job(data):
    logger.info(f"Processing job: {data['name']}")
    # Simulate some work
    import time
    time.sleep(5)
    logger.info("Job completed!")

logger.info("Worker starting...")
logger.info("Press Ctrl+C to trigger graceful shutdown")
logger.info("Current job will complete before stopping")

try:
    queue.listen()  # Signal handlers auto-registered
except KeyboardInterrupt:
    logger.info("Worker stopped gracefully")

Manual Shutdown Example

queue = Queue(r, queue='test')

@queue.handler
def handle_job(data):
    # Process job
    process_data(data)
    
    # Trigger shutdown programmatically
    if should_stop():
        queue.shutdown()

queue.listen()

Error Handling Example

from lara_queue import Queue
from redis import Redis
from redis.exceptions import ConnectionError

try:
    r = Redis(host='localhost', port=6379, db=0)
    queue = Queue(r, queue='python_worker')
    
    @queue.handler
    def handle_job(data):
        print(f"Processing job: {data['name']}")
    
    queue.listen()  # Worker is now resilient to Redis errors!
    
except ConnectionError as e:
    print(f"Failed to connect to Redis: {e}")
except KeyboardInterrupt:
    print("Worker stopped gracefully")

Retry Strategy Recommendations

Strategy Use Case Example
Exponential Network/DB temporary failures API calls, database connections
Linear Predictable resource limits Rate-limited APIs, queue backpressure
Fixed Simple retry scenarios File processing, simple validations
Custom Complex business logic Fibonacci delays, circuit breaker patterns

Best Practices:

  • Use jitter=True to prevent thundering herd problems
  • Set retry_exceptions to only retry recoverable errors
  • Monitor retry statistics to optimize your retry strategy
  • Use dead letter queues for permanently failed jobs
  • Consider max_delay limits to prevent excessive wait times

Features

  • Async Support (v1.0.0) - Full asyncio support for high-performance applications
  • Concurrent Processing - Configurable concurrent job processing (up to 50+ jobs)
  • Redis driver support - Queue communication between Python and Laravel
  • Bidirectional job processing - Send and receive jobs in both directions
  • PHP object serialization - Compatible with Laravel's job serialization format
  • Event-driven architecture - Simple decorator-based job handlers (sync & async)
  • Automatic reconnection - Resilient to network issues
  • Comprehensive error handling - Detailed logging and error recovery
  • Graceful shutdown - Signal handling (SIGINT, SIGTERM) with job completion
  • Advanced retry mechanisms - Multiple strategies with full configurability
  • Retry statistics and monitoring - Track performance and success rates
  • Comprehensive metrics collection - Real-time performance monitoring
  • Production ready - Battle-tested with extensive test coverage
  • Tested - 100+ unit and integration tests included (sync + async)

Requirements

  • Python 3.7+
  • Redis 4.0+
  • Laravel 8+ (for Laravel side)
  • aioredis 2.0+ (for async support)

Performance Recommendations

Async vs Sync Performance

Feature Sync Queue Async Queue Performance Gain
Concurrent Jobs 1 1-50+ 10-50x faster
Throughput ~100 jobs/sec ~1000+ jobs/sec 10x+ faster
Memory Usage Lower Slightly higher ~20% more
CPU Usage Higher Lower ~30% less
I/O Efficiency Blocking Non-blocking Much better

Recommended Settings

# High Performance Async Configuration
queue = AsyncQueue(
    client=redis_client,
    queue='high_perf',
    max_concurrent_jobs=20,  # Adjust based on your system
    enable_metrics=True,
    retry_strategy=RetryStrategy.EXPONENTIAL,
    max_retries=3
)

# For CPU-intensive tasks
queue = AsyncQueue(
    client=redis_client,
    queue='cpu_intensive',
    max_concurrent_jobs=4,  # Match CPU cores
    enable_metrics=True
)

# For I/O-intensive tasks (API calls, DB operations)
queue = AsyncQueue(
    client=redis_client,
    queue='io_intensive',
    max_concurrent_jobs=50,  # High concurrency
    enable_metrics=True
)

Development

# Install development dependencies
pip install -e .
pip install -r requirements-dev.txt

# Run tests
pytest tests/ -v

# Run async tests
pytest tests/test_async_queue.py -v

# Run specific test file
pytest tests/test_error_handling.py -v

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

MIT License - see LICENSE file for details.

Credits

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

laraqueue-1.0.1.tar.gz (83.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

laraqueue-1.0.1-py3-none-any.whl (28.9 kB view details)

Uploaded Python 3

File details

Details for the file laraqueue-1.0.1.tar.gz.

File metadata

  • Download URL: laraqueue-1.0.1.tar.gz
  • Upload date:
  • Size: 83.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.5

File hashes

Hashes for laraqueue-1.0.1.tar.gz
Algorithm Hash digest
SHA256 8bb15ad4e2b3fcc79b9a382485764555d9e0bc56fb3dbacf97729a89fd0f7796
MD5 f73ed732bbf969c4c18e7507dd5a8923
BLAKE2b-256 5a68c1ec7a5ab9728dd8dadaa3f1e1b7a5c821263d6bf623cc87f097b8e58e2f

See more details on using hashes here.

File details

Details for the file laraqueue-1.0.1-py3-none-any.whl.

File metadata

  • Download URL: laraqueue-1.0.1-py3-none-any.whl
  • Upload date:
  • Size: 28.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.5

File hashes

Hashes for laraqueue-1.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 102d191262649cf34035087a1da7b4c0044dc4051c3ece0c0bb1e3a68720e11d
MD5 77c342beef50a1c103b66bc44f1a5c84
BLAKE2b-256 5e100998c20b68a6080a8c2612a63428e115a0c9282a173e4e8dfa7f49a5eec0

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page