Skip to main content

A Redis-based queue task manager for LLM processing with intelligent token budget management

Project description

LLM Queue Task Manager

A Redis-based queue system for managing LLM processing tasks with intelligent token budget management, priority queuing, and batch processing capabilities.

Python 3.8+ License: MIT Redis

🚀 Features

  • 🎯 Priority-based Queuing: Automatically categorizes tasks by estimated token usage (low/medium/long)
  • 💰 Token Budget Management: Prevents API quota exhaustion with intelligent rate limiting
  • ⚡ Batch Processing: Implements 3-2-1 cycle processing (3 low, 2 medium, 1 long priority tasks)
  • 🔄 Retry Logic: Automatic retry with exponential backoff for failed requests
  • 📊 Real-time Monitoring: Comprehensive metrics and status tracking
  • 🔌 Extensible: Easy to implement custom processors for different LLM providers
  • 🏗️ Production Ready: Built for high-throughput production environments
  • 🛡️ Error Resilient: Graceful error handling and recovery mechanisms

📦 Installation

Using uv (recommended)

# Install uv if you haven't already
curl -LsSf https://astral.sh/uv/install.sh | sh

# Install the package
uv add llm-queue-task-manager

# With optional dependencies for specific providers
uv add "llm-queue-task-manager[aws]"      # For AWS Bedrock
uv add "llm-queue-task-manager[openai]"   # For OpenAI
uv add "llm-queue-task-manager[fastapi]"  # For FastAPI integration
uv add "llm-queue-task-manager[all]"      # All optional dependencies


### Using pip

```bash
pip install llm-queue-task-manager

# With optional dependencies
pip install "llm-queue-task-manager[aws,openai,fastapi]"

🏃 Quick Start

1. Basic Usage

import asyncio
from llm_queue_manager import (
    RedisQueueManager, 
    RedisBatchProcessor, 
    QueuedRequest,
    BaseRequestProcessor
)

# Create a custom processor
class MyLLMProcessor(BaseRequestProcessor):
    async def process_request(self, request_data):
        # Your LLM processing logic here
        prompt = request_data.get('prompt', '')
        # ... process with your LLM provider
        return {"response": f"Processed: {prompt}"}

# Initialize components
processor = MyLLMProcessor()
queue_manager = RedisQueueManager()
batch_processor = RedisBatchProcessor(processor, queue_manager)

# Start processing
async def main():
    await batch_processor.start_processing()
    
    # Submit a request
    request = QueuedRequest.create(
        request_data={'prompt': 'Hello, world!'},
        estimated_tokens=1000
    )
    
    request_id = await queue_manager.add_request(request)
    print(f"Request {request_id} queued")
    
    # Check status
    status = await queue_manager.get_request_status(request_id)
    print(f"Status: {status}")

asyncio.run(main())

2. AWS Bedrock Integration

from llm_queue_manager import RedisQueueManager, RedisBatchProcessor, QueuedRequest
from llm_queue_manager.processors.examples import BedrockProcessor

# Initialize with Bedrock processor
processor = BedrockProcessor(region_name='us-east-1')
queue_manager = RedisQueueManager()
batch_processor = RedisBatchProcessor(processor, queue_manager)

async def process_claude_request():
    await batch_processor.start_processing()
    
    request = QueuedRequest.create(
        request_data={
            'messages': [
                {
                    'role': 'user',
                    'content': [{'text': 'Explain quantum computing'}]
                }
            ],
            'model': 'claude-3-sonnet',
            'max_tokens': 2000,
            'temperature': 0.7
        },
        estimated_tokens=10000
    )
    
    request_id = await queue_manager.add_request(request)
    return request_id

3. OpenAI Integration

from llm_queue_manager.processors.examples import OpenAIProcessor

# Initialize with OpenAI processor
processor = OpenAIProcessor(api_key="your-openai-api-key")
queue_manager = RedisQueueManager()
batch_processor = RedisBatchProcessor(processor, queue_manager)

async def process_gpt_request():
    await batch_processor.start_processing()
    
    request = QueuedRequest.create(
        request_data={
            'messages': [
                {'role': 'user', 'content': 'Write a Python function for binary search'}
            ],
            'model': 'gpt-4',
            'max_tokens': 1000,
            'temperature': 0.3
        },
        estimated_tokens=5000
    )
    
    request_id = await queue_manager.add_request(request)
    return request_id

4. FastAPI Integration

from fastapi import FastAPI
from llm_queue_manager import RedisQueueManager, RedisBatchProcessor, QueuedRequest
from your_processor import YourLLMProcessor

app = FastAPI()
processor = YourLLMProcessor()
queue_manager = RedisQueueManager()
batch_processor = RedisBatchProcessor(processor, queue_manager)

@app.on_event("startup")
async def startup():
    await batch_processor.start_processing()

@app.post("/submit")
async def submit_request(prompt: str, max_tokens: int = 1000):
    request = QueuedRequest.create(
        request_data={'prompt': prompt, 'max_tokens': max_tokens},
        estimated_tokens=max_tokens * 2
    )
    
    request_id = await queue_manager.add_request(request)
    return {"request_id": request_id, "status": "queued"}

@app.get("/status/{request_id}")
async def get_status(request_id: str):
    return await queue_manager.get_request_status(request_id)

@app.get("/metrics")
async def get_metrics():
    return queue_manager.get_metrics()

⚙️ Configuration

Environment Variables

# Redis Configuration
REDIS_HOST=localhost                     # Redis host
REDIS_PORT=6379                         # Redis port
REDIS_DB=0                              # Redis database number
REDIS_PASSWORD=your_password            # Redis password (optional)
REDIS_MAX_CONNECTIONS=20                # Maximum Redis connections

# Queue Configuration
LLM_QUEUE_TOTAL_TPM=1000000            # Total tokens per minute
LLM_QUEUE_API_ALLOCATION_PERCENT=40    # Percentage for API allocation
LLM_QUEUE_BATCH_LOW_COUNT=3            # Low priority batch size
LLM_QUEUE_BATCH_MEDIUM_COUNT=2         # Medium priority batch size
LLM_QUEUE_BATCH_LONG_COUNT=1           # Long priority batch size
LLM_QUEUE_MAX_EMPTY_CYCLES=5           # Max empty cycles before longer wait
LLM_QUEUE_QUOTA_WAIT_DURATION=65       # Seconds to wait for quota reset

Redis Setup

# Using Docker
docker run -d --name redis -p 6379:6379 redis:latest

# Using Docker Compose
version: '3.8'
services:
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    command: redis-server --appendonly yes

volumes:
  redis_data:

🔧 Custom Processors

Create custom processors by extending BaseRequestProcessor:

from llm_queue_manager import BaseRequestProcessor
import httpx

class CustomLLMProcessor(BaseRequestProcessor):
    def __init__(self, api_key: str, base_url: str):
        self.api_key = api_key
        self.base_url = base_url
        self.client = httpx.AsyncClient()
    
    def get_required_fields(self):
        return ['prompt', 'model']
    
    async def validate_request(self, request_data):
        # Custom validation logic
        if not request_data.get('prompt'):
            return False
        return await super().validate_request(request_data)
    
    async def process_request(self, request_data):
        # Custom processing logic
        response = await self.client.post(
            f"{self.base_url}/completions",
            headers={"Authorization": f"Bearer {self.api_key}"},
            json={
                "prompt": request_data['prompt'],
                "model": request_data.get('model', 'default'),
                "max_tokens": request_data.get('max_tokens', 1000)
            }
        )
        
        return response.json()
    
    async def estimate_tokens(self, request_data):
        # Custom token estimation
        prompt = request_data.get('prompt', '')
        max_tokens = request_data.get('max_tokens', 1000)
        return len(prompt) // 4 + max_tokens
    
    async def handle_error(self, error, request_data):
        # Custom error handling
        if "rate_limit" in str(error).lower():
            return None  # Trigger retry
        return {"error": str(error)}

📊 Monitoring and Metrics

Queue Metrics

metrics = queue_manager.get_metrics()

print(f"Total in queue: {metrics['total_in_queue']}")
print(f"Success rate: {metrics['success_rate_percent']}%")
print(f"Token utilization: {metrics['token_usage']['utilization_percent']}%")

# Per-priority metrics
for priority in ['low', 'medium', 'long']:
    completed = metrics[f'{priority}_completed']
    failed = metrics[f'{priority}_failed']
    print(f"{priority.upper()}: {completed} completed, {failed} failed")

Processor Status

status = batch_processor.get_processor_status()

print(f"Processing: {status['is_processing']}")
print(f"Current cycle: {status['current_cycle_position']}/6")
print(f"Current task type: {status['current_task_type']}")
print(f"Health status: {status['health_status']}")
print(f"Processing rate: {status['processing_rate_per_minute']} req/min")

Health Checks

# Test Redis connection
from llm_queue_manager.config import test_redis_connection
redis_healthy = test_redis_connection()

# Get system health
metrics = queue_manager.get_metrics()
health = metrics.get('system_health', {})
print(f"System status: {health.get('status', 'unknown')}")
print(f"Health score: {health.get('score', 0)}/100")

🏗️ Architecture

The system consists of several key components:

Core Components

  1. QueuedRequest: Data model for requests with automatic task type classification
  2. RedisQueueManager: Handles queue operations, token budgeting, and metrics
  3. RedisBatchProcessor: Implements the 3-2-1 batch processing cycle
  4. BaseRequestProcessor: Abstract base class for implementing custom LLM processors
  5. TokenEstimator: Estimates token usage for different request types

Processing Flow

Request Submission → Token Estimation → Priority Classification → Queue Assignment
                                                                          ↓
Response Delivery ← Result Processing ← LLM API Call ← Batch Processing ← Queue Processing

Priority System

Requests are automatically categorized into three priority levels:

  • 🟢 Low Priority (< 15K tokens): Quick responses, simple prompts
  • 🟡 Medium Priority (15K-75K tokens): Standard requests, moderate complexity
  • 🔴 Long Priority (> 75K tokens): Complex prompts, detailed responses

The 3-2-1 processing cycle ensures balanced throughput:

  • 3 low priority requests
  • 2 medium priority requests
  • 1 long priority request
  • Repeat cycle

🛡️ Token Budget Management

The system implements intelligent token budget management:

  • 📊 Per-minute quotas: Tracks token usage per minute to prevent API limits
  • 🛡️ Safety buffers: Maintains 10% buffer to prevent quota exhaustion
  • ⏸️ Automatic throttling: Pauses processing when approaching limits
  • 🔄 Quota reset detection: Automatically resumes when quotas reset
  • 📈 Usage tracking: Comprehensive token usage analytics

🔄 Error Handling and Resilience

Retry Strategies

  • Exponential backoff: Increasing delays between retry attempts
  • Error classification: Different handling for different error types
  • Quota-aware retries: Special handling for rate limit errors
  • Maximum retry limits: Prevents infinite retry loops

Error Types

  • Rate Limiting: Automatic retry with backoff
  • Quota Exhaustion: Queue pausing until reset
  • Validation Errors: Request modification and retry
  • Authentication: Immediate failure with clear error
  • Server Errors: Retry with exponential backoff
  • Network Timeouts: Retry with connection pooling

📚 Examples

The package includes comprehensive examples:

  • examples/basic_usage.py: Simple echo processor demonstration
  • examples/fastapi_integration.py: Production FastAPI service
  • examples/bedrock_example.py: AWS Bedrock with Claude models
  • examples/openai_example.py: OpenAI GPT models with function calling

Run examples:

# Basic usage
python examples/basic_usage.py

# FastAPI service
python examples/fastapi_integration.py

# AWS Bedrock (requires AWS credentials)
python examples/bedrock_example.py

# OpenAI (requires OPENAI_API_KEY)
python examples/openai_example.py

🚀 Production Deployment

Docker Deployment

FROM python:3.11-slim

WORKDIR /app

# Install uv
RUN pip install uv

# Copy requirements
COPY pyproject.toml ./
RUN uv pip install --system -e ".[all]"

# Copy application
COPY . .

# Run application
CMD ["python", "-m", "your_app"]

Kubernetes Deployment

apiVersion: apps/v1
kind: Deployment
metadata:
  name: llm-queue-manager
spec:
  replicas: 3
  selector:
    matchLabels:
      app: llm-queue-manager
  template:
    metadata:
      labels:
        app: llm-queue-manager
    spec:
      containers:
      - name: app
        image: your-registry/llm-queue-manager:latest
        env:
        - name: REDIS_HOST
          value: "redis-service"
        - name: LLM_QUEUE_TOTAL_TPM
          value: "1000000"
        ports:
        - containerPort: 8000
        livenessProbe:
          httpGet:
            path: /v1/queue/health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
  name: redis-service
spec:
  selector:
    app: redis
  ports:
  - port: 6379
    targetPort: 6379

Monitoring Setup

# Prometheus monitoring
- job_name: 'llm-queue-manager'
  static_configs:
  - targets: ['llm-queue-manager:8000']
  metrics_path: '/v1/queue/metrics'
  scrape_interval: 30s

🤝 Contributing

We welcome contributions! Please see our Contributing Guide for details.

Development Setup

# Clone the repository
git clone https://github.com/AryamanGurjar/llm-queue-task-manager.git
cd llm-queue-task-manager

# Install uv
curl -LsSf https://astral.sh/uv/install.sh | sh

# Install dependencies
uv sync --all-extras --dev

# Install pre-commit hooks
uv run pre-commit install

# Run tests
uv run pytest

# Run examples
uv run python examples/basic_usage.py

Code Quality

# Format code
uv run black src/ examples/
uv run isort src/ examples/

# Type checking
uv run mypy src/

# Linting
uv run flake8 src/ examples/

📄 License

This project is licensed under the MIT License - see the LICENSE file for details.

🙏 Acknowledgments

  • Built with Redis for high-performance queuing
  • Inspired by production LLM processing challenges
  • Thanks to the open-source community for tools and libraries

📞 Support


Made with ❤️ for the LLM community


This completes the full package structure with all the necessary files. The package is now ready to be built and published using `uv`. Here's how to build and publish it:

```bash
# Build the package
uv build

# Publish to PyPI (requires authentication)
uv publish

# Or publish to test PyPI first
uv publish --repository testpypi

The package provides a complete, production-ready solution for LLM queue management with comprehensive examples, documentation, and extensibility.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

llm_queue_task_manager-0.1.1.tar.gz (37.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

llm_queue_task_manager-0.1.1-py3-none-any.whl (39.2 kB view details)

Uploaded Python 3

File details

Details for the file llm_queue_task_manager-0.1.1.tar.gz.

File metadata

  • Download URL: llm_queue_task_manager-0.1.1.tar.gz
  • Upload date:
  • Size: 37.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.0

File hashes

Hashes for llm_queue_task_manager-0.1.1.tar.gz
Algorithm Hash digest
SHA256 6b16ec1f2d313146805e932ad6545ffbfa9372cff75e99ca8595831a3195f2ef
MD5 94d447d4c8937f9b4cf6d874faec53a7
BLAKE2b-256 0e3ff95deec10f0515f0f5b267f29a586f7d8dad5749e722468f67d9195207cb

See more details on using hashes here.

File details

Details for the file llm_queue_task_manager-0.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for llm_queue_task_manager-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 8e0856d8b4608c8163953d81f06adca71ce839b4088cf296074fdf8e12c67df0
MD5 4673357584be2b59bce7fcb213c785ce
BLAKE2b-256 6163230acc9a7800856437f4fb6d8eee174e314b5806c89cddb2557c8775903c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page