Skip to main content

A flexible async task queue manager for Python applications

Project description

Kew Task Queue Manager

A robust, Redis-backed asynchronous task queue manager for Python applications with support for priority-based queues and circuit breaker patterns.

Features

  • Multiple named queues with independent configurations
  • Priority-based task scheduling with millisecond precision
  • Redis-backed persistence for reliability
  • Configurable worker pools per queue
  • Built-in circuit breaker for fault tolerance
  • Comprehensive task lifecycle management
  • Automatic task expiration (24-hour default)
  • Detailed logging and monitoring
  • Graceful shutdown handling
  • Thread-safe operations

Installation

pip install kew

Quick Start

import asyncio
from kew import TaskQueueManager, QueueConfig, QueuePriority

async def example_task(x: int):
    await asyncio.sleep(1)
    return x * 2

async def main():
    # Initialize the task queue manager with Redis connection
    manager = TaskQueueManager(redis_url="redis://localhost:6379")
    await manager.initialize()
    
    # Create a high-priority queue
    await manager.create_queue(QueueConfig(
        name="high_priority",
        max_workers=4,
        max_size=1000,
        priority=QueuePriority.HIGH
    ))
    
    # Submit a task
    task_info = await manager.submit_task(
        task_id="task1",
        queue_name="high_priority",
        task_type="multiplication",
        task_func=example_task,
        priority=QueuePriority.HIGH,
        x=5
    )
    
    # Check task status
    await asyncio.sleep(2)
    status = await manager.get_task_status("task1")
    print(f"Task Result: {status.result}")
    
    # Graceful shutdown
    await manager.shutdown()

if __name__ == "__main__":
    asyncio.run(main())

Queue Configuration

Creating Queues

from kew import QueueConfig, QueuePriority

# Create a high-priority queue with 4 workers
await manager.create_queue(QueueConfig(
    name="critical",
    max_workers=4,
    max_size=1000,
    priority=QueuePriority.HIGH
))

Queue Priority Levels

  • QueuePriority.HIGH (1)
  • QueuePriority.MEDIUM (2)
  • QueuePriority.LOW (3)

Tasks within the same priority level are processed in FIFO order with millisecond precision.

Task Management

Submitting Tasks

task_info = await manager.submit_task(
    task_id="unique_id",
    queue_name="critical",
    task_type="example",
    task_func=my_async_function,
    priority=QueuePriority.HIGH,
    *args,
    **kwargs
)

Monitoring Task Status

status = await manager.get_task_status("unique_id")
print(f"Status: {status.status}")  # QUEUED, PROCESSING, COMPLETED, FAILED
print(f"Queue: {status.queue_name}")
print(f"Priority: {status.priority}")
print(f"Result: {status.result}")
print(f"Error: {status.error}")

Queue Status Monitoring

status = await manager.get_queue_status("critical")
print(f"Queue Size: {status['queued_tasks']}")
print(f"Active Workers: {status['current_workers']}")
print(f"Circuit Breaker: {status['circuit_breaker_status']}")

Advanced Features

Circuit Breaker

Each queue has a built-in circuit breaker that helps prevent cascade failures:

  • Opens after 3 consecutive failures (configurable)
  • Auto-resets after 60 seconds (configurable)
  • Provides circuit state monitoring

Task Expiration

Tasks automatically expire after 24 hours (configurable) to prevent resource leaks.

Redis Configuration

manager = TaskQueueManager(
    redis_url="redis://username:password@hostname:6379/0",
    cleanup_on_start=True  # Optional: cleans up existing tasks on startup
)

Error Handling

The system handles various error scenarios:

  • TaskAlreadyExistsError: Raised when submitting a task with a duplicate ID
  • TaskNotFoundError: Raised when querying a non-existent task
  • QueueNotFoundError: Raised when accessing an undefined queue
  • QueueProcessorError: Raised for queue processing failures

API Reference

TaskQueueManager

Core Methods:

  • async initialize()
  • async create_queue(config: QueueConfig)
  • async submit_task(task_id, queue_name, task_type, task_func, priority, *args, **kwargs)
  • async get_task_status(task_id)
  • async get_queue_status(queue_name)
  • async shutdown(wait=True, timeout=5.0)

QueueConfig

Configuration Parameters:

  • name: str
  • max_workers: int
  • max_size: int
  • priority: QueuePriority

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kew-0.1.4.tar.gz (15.5 kB view details)

Uploaded Source

Built Distribution

kew-0.1.4-py3-none-any.whl (12.9 kB view details)

Uploaded Python 3

File details

Details for the file kew-0.1.4.tar.gz.

File metadata

  • Download URL: kew-0.1.4.tar.gz
  • Upload date:
  • Size: 15.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.12.5

File hashes

Hashes for kew-0.1.4.tar.gz
Algorithm Hash digest
SHA256 2342f389d20d97ff524cda435545fa77db0626945ff528e0a44c675da3b40314
MD5 5d2dae592c38f912d6d10497f25a3c5f
BLAKE2b-256 9fc185e8d5f9c95c12c162121344ecc28473d9cc0507be0f1c890f2b6e401cbb

See more details on using hashes here.

File details

Details for the file kew-0.1.4-py3-none-any.whl.

File metadata

  • Download URL: kew-0.1.4-py3-none-any.whl
  • Upload date:
  • Size: 12.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.12.5

File hashes

Hashes for kew-0.1.4-py3-none-any.whl
Algorithm Hash digest
SHA256 0f0ca7d4e78b3da0af034f9c470668b9a5c67c898885bacaf9b677e6459e2438
MD5 bab47e28567450e8c6b486c3960afdba
BLAKE2b-256 a77ec1a24dee7e925f98c713ae4d4f922b678ad172797996b1601f8859e7c71f

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page