Skip to main content

A Redis-based queue task manager for LLM processing with intelligent token budget management

Project description

LLM Queue Task Manager

A Redis-based queue system for managing LLM processing tasks with intelligent token budget management, priority queuing, and batch processing capabilities.

Python 3.8+ License: MIT Redis

🚀 Features

  • 🎯 Priority-based Queuing: Automatically categorizes tasks by estimated token usage (low/medium/long)
  • 💰 Token Budget Management: Prevents API quota exhaustion with intelligent rate limiting
  • ⚡ Batch Processing: Implements 3-2-1 cycle processing (3 low, 2 medium, 1 long priority tasks)
  • 🔄 Retry Logic: Automatic retry with exponential backoff for failed requests
  • 📊 Real-time Monitoring: Comprehensive metrics and status tracking
  • 🔌 Extensible: Easy to implement custom processors for different LLM providers
  • 🏗️ Production Ready: Built for high-throughput production environments
  • 🛡️ Error Resilient: Graceful error handling and recovery mechanisms

📦 Installation

Using uv (recommended)

# Install uv if you haven't already
curl -LsSf https://astral.sh/uv/install.sh | sh

# Install the package
uv add llm-queue-task-manager

# With optional dependencies for specific providers
uv add "llm-queue-task-manager[aws]"      # For AWS Bedrock
uv add "llm-queue-task-manager[openai]"   # For OpenAI
uv add "llm-queue-task-manager[fastapi]"  # For FastAPI integration
uv add "llm-queue-task-manager[all]"      # All optional dependencies


### Using pip

```bash
pip install llm-queue-task-manager

# With optional dependencies
pip install "llm-queue-task-manager[aws,openai,fastapi]"

🏃 Quick Start

1. Basic Usage

import asyncio
from llm_queue_manager import (
    RedisQueueManager, 
    RedisBatchProcessor, 
    QueuedRequest,
    BaseRequestProcessor
)

# Create a custom processor
class MyLLMProcessor(BaseRequestProcessor):
    async def process_request(self, request_data):
        # Your LLM processing logic here
        prompt = request_data.get('prompt', '')
        # ... process with your LLM provider
        return {"response": f"Processed: {prompt}"}

# Initialize components
processor = MyLLMProcessor()
queue_manager = RedisQueueManager()
batch_processor = RedisBatchProcessor(processor, queue_manager)

# Start processing
async def main():
    await batch_processor.start_processing()
    
    # Submit a request
    request = QueuedRequest.create(
        request_data={'prompt': 'Hello, world!'},
        estimated_tokens=1000
    )
    
    request_id = await queue_manager.add_request(request)
    print(f"Request {request_id} queued")
    
    # Check status
    status = await queue_manager.get_request_status(request_id)
    print(f"Status: {status}")

asyncio.run(main())

2. AWS Bedrock Integration

from llm_queue_manager import RedisQueueManager, RedisBatchProcessor, QueuedRequest
from llm_queue_manager.processors.examples import BedrockProcessor

# Initialize with Bedrock processor
processor = BedrockProcessor(region_name='us-east-1')
queue_manager = RedisQueueManager()
batch_processor = RedisBatchProcessor(processor, queue_manager)

async def process_claude_request():
    await batch_processor.start_processing()
    
    request = QueuedRequest.create(
        request_data={
            'messages': [
                {
                    'role': 'user',
                    'content': [{'text': 'Explain quantum computing'}]
                }
            ],
            'model': 'claude-3-sonnet',
            'max_tokens': 2000,
            'temperature': 0.7
        },
        estimated_tokens=10000
    )
    
    request_id = await queue_manager.add_request(request)
    return request_id

3. OpenAI Integration

from llm_queue_manager.processors.examples import OpenAIProcessor

# Initialize with OpenAI processor
processor = OpenAIProcessor(api_key="your-openai-api-key")
queue_manager = RedisQueueManager()
batch_processor = RedisBatchProcessor(processor, queue_manager)

async def process_gpt_request():
    await batch_processor.start_processing()
    
    request = QueuedRequest.create(
        request_data={
            'messages': [
                {'role': 'user', 'content': 'Write a Python function for binary search'}
            ],
            'model': 'gpt-4',
            'max_tokens': 1000,
            'temperature': 0.3
        },
        estimated_tokens=5000
    )
    
    request_id = await queue_manager.add_request(request)
    return request_id

4. FastAPI Integration

from fastapi import FastAPI
from llm_queue_manager import RedisQueueManager, RedisBatchProcessor, QueuedRequest
from your_processor import YourLLMProcessor

app = FastAPI()
processor = YourLLMProcessor()
queue_manager = RedisQueueManager()
batch_processor = RedisBatchProcessor(processor, queue_manager)

@app.on_event("startup")
async def startup():
    await batch_processor.start_processing()

@app.post("/submit")
async def submit_request(prompt: str, max_tokens: int = 1000):
    request = QueuedRequest.create(
        request_data={'prompt': prompt, 'max_tokens': max_tokens},
        estimated_tokens=max_tokens * 2
    )
    
    request_id = await queue_manager.add_request(request)
    return {"request_id": request_id, "status": "queued"}

@app.get("/status/{request_id}")
async def get_status(request_id: str):
    return await queue_manager.get_request_status(request_id)

@app.get("/metrics")
async def get_metrics():
    return queue_manager.get_metrics()

⚙️ Configuration

Environment Variables

# Redis Configuration
REDIS_HOST=localhost                     # Redis host
REDIS_PORT=6379                         # Redis port
REDIS_DB=0                              # Redis database number
REDIS_PASSWORD=your_password            # Redis password (optional)
REDIS_MAX_CONNECTIONS=20                # Maximum Redis connections

# Queue Configuration
LLM_QUEUE_TOTAL_TPM=1000000            # Total tokens per minute
LLM_QUEUE_API_ALLOCATION_PERCENT=40    # Percentage for API allocation
LLM_QUEUE_BATCH_LOW_COUNT=3            # Low priority batch size
LLM_QUEUE_BATCH_MEDIUM_COUNT=2         # Medium priority batch size
LLM_QUEUE_BATCH_LONG_COUNT=1           # Long priority batch size
LLM_QUEUE_MAX_EMPTY_CYCLES=5           # Max empty cycles before longer wait
LLM_QUEUE_QUOTA_WAIT_DURATION=65       # Seconds to wait for quota reset

Redis Setup

# Using Docker
docker run -d --name redis -p 6379:6379 redis:latest

# Using Docker Compose
version: '3.8'
services:
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    command: redis-server --appendonly yes

volumes:
  redis_data:

🔧 Custom Processors

Create custom processors by extending BaseRequestProcessor:

from llm_queue_manager import BaseRequestProcessor
import httpx

class CustomLLMProcessor(BaseRequestProcessor):
    def __init__(self, api_key: str, base_url: str):
        self.api_key = api_key
        self.base_url = base_url
        self.client = httpx.AsyncClient()
    
    def get_required_fields(self):
        return ['prompt', 'model']
    
    async def validate_request(self, request_data):
        # Custom validation logic
        if not request_data.get('prompt'):
            return False
        return await super().validate_request(request_data)
    
    async def process_request(self, request_data):
        # Custom processing logic
        response = await self.client.post(
            f"{self.base_url}/completions",
            headers={"Authorization": f"Bearer {self.api_key}"},
            json={
                "prompt": request_data['prompt'],
                "model": request_data.get('model', 'default'),
                "max_tokens": request_data.get('max_tokens', 1000)
            }
        )
        
        return response.json()
    
    async def estimate_tokens(self, request_data):
        # Custom token estimation
        prompt = request_data.get('prompt', '')
        max_tokens = request_data.get('max_tokens', 1000)
        return len(prompt) // 4 + max_tokens
    
    async def handle_error(self, error, request_data):
        # Custom error handling
        if "rate_limit" in str(error).lower():
            return None  # Trigger retry
        return {"error": str(error)}

📊 Monitoring and Metrics

Queue Metrics

metrics = queue_manager.get_metrics()

print(f"Total in queue: {metrics['total_in_queue']}")
print(f"Success rate: {metrics['success_rate_percent']}%")
print(f"Token utilization: {metrics['token_usage']['utilization_percent']}%")

# Per-priority metrics
for priority in ['low', 'medium', 'long']:
    completed = metrics[f'{priority}_completed']
    failed = metrics[f'{priority}_failed']
    print(f"{priority.upper()}: {completed} completed, {failed} failed")

Processor Status

status = batch_processor.get_processor_status()

print(f"Processing: {status['is_processing']}")
print(f"Current cycle: {status['current_cycle_position']}/6")
print(f"Current task type: {status['current_task_type']}")
print(f"Health status: {status['health_status']}")
print(f"Processing rate: {status['processing_rate_per_minute']} req/min")

Health Checks

# Test Redis connection
from llm_queue_manager.config import test_redis_connection
redis_healthy = test_redis_connection()

# Get system health
metrics = queue_manager.get_metrics()
health = metrics.get('system_health', {})
print(f"System status: {health.get('status', 'unknown')}")
print(f"Health score: {health.get('score', 0)}/100")

🏗️ Architecture

The system consists of several key components:

Core Components

  1. QueuedRequest: Data model for requests with automatic task type classification
  2. RedisQueueManager: Handles queue operations, token budgeting, and metrics
  3. RedisBatchProcessor: Implements the 3-2-1 batch processing cycle
  4. BaseRequestProcessor: Abstract base class for implementing custom LLM processors
  5. TokenEstimator: Estimates token usage for different request types

Processing Flow

Request Submission → Token Estimation → Priority Classification → Queue Assignment
                                                                          ↓
Response Delivery ← Result Processing ← LLM API Call ← Batch Processing ← Queue Processing

Priority System

Requests are automatically categorized into three priority levels:

  • 🟢 Low Priority (< 15K tokens): Quick responses, simple prompts
  • 🟡 Medium Priority (15K-75K tokens): Standard requests, moderate complexity
  • 🔴 Long Priority (> 75K tokens): Complex prompts, detailed responses

The 3-2-1 processing cycle ensures balanced throughput:

  • 3 low priority requests
  • 2 medium priority requests
  • 1 long priority request
  • Repeat cycle

🛡️ Token Budget Management

The system implements intelligent token budget management:

  • 📊 Per-minute quotas: Tracks token usage per minute to prevent API limits
  • 🛡️ Safety buffers: Maintains 10% buffer to prevent quota exhaustion
  • ⏸️ Automatic throttling: Pauses processing when approaching limits
  • 🔄 Quota reset detection: Automatically resumes when quotas reset
  • 📈 Usage tracking: Comprehensive token usage analytics

🔄 Error Handling and Resilience

Retry Strategies

  • Exponential backoff: Increasing delays between retry attempts
  • Error classification: Different handling for different error types
  • Quota-aware retries: Special handling for rate limit errors
  • Maximum retry limits: Prevents infinite retry loops

Error Types

  • Rate Limiting: Automatic retry with backoff
  • Quota Exhaustion: Queue pausing until reset
  • Validation Errors: Request modification and retry
  • Authentication: Immediate failure with clear error
  • Server Errors: Retry with exponential backoff
  • Network Timeouts: Retry with connection pooling

📚 Examples

The package includes comprehensive examples:

  • examples/basic_usage.py: Simple echo processor demonstration
  • examples/fastapi_integration.py: Production FastAPI service
  • examples/bedrock_example.py: AWS Bedrock with Claude models
  • examples/openai_example.py: OpenAI GPT models with function calling

Run examples:

# Basic usage
python examples/basic_usage.py

# FastAPI service
python examples/fastapi_integration.py

# AWS Bedrock (requires AWS credentials)
python examples/bedrock_example.py

# OpenAI (requires OPENAI_API_KEY)
python examples/openai_example.py

🚀 Production Deployment

Docker Deployment

FROM python:3.11-slim

WORKDIR /app

# Install uv
RUN pip install uv

# Copy requirements
COPY pyproject.toml ./
RUN uv pip install --system -e ".[all]"

# Copy application
COPY . .

# Run application
CMD ["python", "-m", "your_app"]

Kubernetes Deployment

apiVersion: apps/v1
kind: Deployment
metadata:
  name: llm-queue-manager
spec:
  replicas: 3
  selector:
    matchLabels:
      app: llm-queue-manager
  template:
    metadata:
      labels:
        app: llm-queue-manager
    spec:
      containers:
      - name: app
        image: your-registry/llm-queue-manager:latest
        env:
        - name: REDIS_HOST
          value: "redis-service"
        - name: LLM_QUEUE_TOTAL_TPM
          value: "1000000"
        ports:
        - containerPort: 8000
        livenessProbe:
          httpGet:
            path: /v1/queue/health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
  name: redis-service
spec:
  selector:
    app: redis
  ports:
  - port: 6379
    targetPort: 6379

Monitoring Setup

# Prometheus monitoring
- job_name: 'llm-queue-manager'
  static_configs:
  - targets: ['llm-queue-manager:8000']
  metrics_path: '/v1/queue/metrics'
  scrape_interval: 30s

🤝 Contributing

We welcome contributions! Please see our Contributing Guide for details.

Development Setup

# Clone the repository
git clone https://github.com/AryamanGurjar/llm-queue-task-manager.git
cd llm-queue-task-manager

# Install uv
curl -LsSf https://astral.sh/uv/install.sh | sh

# Install dependencies
uv sync --all-extras --dev

# Install pre-commit hooks
uv run pre-commit install

# Run tests
uv run pytest

# Run examples
uv run python examples/basic_usage.py

Code Quality

# Format code
uv run black src/ examples/
uv run isort src/ examples/

# Type checking
uv run mypy src/

# Linting
uv run flake8 src/ examples/

📄 License

This project is licensed under the MIT License - see the LICENSE file for details.

🙏 Acknowledgments

  • Built with Redis for high-performance queuing
  • Inspired by production LLM processing challenges
  • Thanks to the open-source community for tools and libraries

📞 Support


Made with ❤️ for the LLM community


This completes the full package structure with all the necessary files. The package is now ready to be built and published using `uv`. Here's how to build and publish it:

```bash
# Build the package
uv build

# Publish to PyPI (requires authentication)
uv publish

# Or publish to test PyPI first
uv publish --repository testpypi

The package provides a complete, production-ready solution for LLM queue management with comprehensive examples, documentation, and extensibility.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

llm_queue_task_manager-0.1.2.tar.gz (37.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

llm_queue_task_manager-0.1.2-py3-none-any.whl (39.2 kB view details)

Uploaded Python 3

File details

Details for the file llm_queue_task_manager-0.1.2.tar.gz.

File metadata

  • Download URL: llm_queue_task_manager-0.1.2.tar.gz
  • Upload date:
  • Size: 37.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.0

File hashes

Hashes for llm_queue_task_manager-0.1.2.tar.gz
Algorithm Hash digest
SHA256 e2c9a015475076d760df8e85f04be07442f8b5c1082f96d9266b9d92bc0b2530
MD5 dec44d6380d4ffb69db994016caba4f4
BLAKE2b-256 656b96d739f1cda41d6100b98f58d1ac81d383a83fcf99eea581f52f1c51e7fc

See more details on using hashes here.

File details

Details for the file llm_queue_task_manager-0.1.2-py3-none-any.whl.

File metadata

File hashes

Hashes for llm_queue_task_manager-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 87cb5d687f5e9951082c1627c8ac847c94a0546099f88d9236636e2c2d3341c3
MD5 9db6ec0ae652ff776319833926d45b35
BLAKE2b-256 331038a1cf06d3d664c4cf39e21646b8f6a5c9b139e661e4217ab86d3b9c67a7

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page