Skip to main content

A flexible async task queue manager for Python applications

Project description

Kew Task Queue Manager

A robust, Redis-backed asynchronous task queue manager for Python applications with support for priority-based queues and circuit breaker patterns.

Features

  • Multiple named queues with independent configurations
  • Priority-based task scheduling with millisecond precision
  • Redis-backed persistence for reliability
  • Configurable worker pools per queue with strict concurrency control
  • Built-in circuit breaker for fault tolerance
  • Comprehensive task lifecycle management
  • Proper semaphore-based worker slot management
  • Race condition protection in concurrent processing
  • Automatic task expiration (24-hour default)
  • Detailed logging and monitoring
  • Graceful shutdown handling
  • Thread-safe operations

Installation

pip install kew

Quick Start

import asyncio
from kew import TaskQueueManager, QueueConfig, QueuePriority

async def example_task(x: int):
    await asyncio.sleep(1)
    return x * 2

async def main():
    # Initialize the task queue manager with Redis connection
    manager = TaskQueueManager(redis_url="redis://localhost:6379")
    await manager.initialize()
    
    # Create a high-priority queue with concurrent processing limits
    await manager.create_queue(QueueConfig(
        name="high_priority",
        max_workers=4,  # Strictly enforced concurrent task limit
        max_size=1000,
        priority=QueuePriority.HIGH
    ))
    
    # Submit a task
    task_info = await manager.submit_task(
        task_id="task1",
        queue_name="high_priority",
        task_type="multiplication",
        task_func=example_task,
        priority=QueuePriority.HIGH,
        x=5
    )
    
    # Check task status
    await asyncio.sleep(2)
    status = await manager.get_task_status("task1")
    print(f"Task Result: {status.result}")
    
    # Graceful shutdown
    await manager.shutdown()

if __name__ == "__main__":
    asyncio.run(main())

Queue Configuration

Creating Queues

from kew import QueueConfig, QueuePriority

# Create a high-priority queue with strictly enforced concurrent processing
await manager.create_queue(QueueConfig(
    name="critical",
    max_workers=4,  # Maximum number of concurrent tasks
    max_size=1000,
    priority=QueuePriority.HIGH
))

Worker Pool Management

The queue manager now implements strict concurrency control:

  • Uses semaphores to guarantee max_workers limit is respected
  • Prevents task starvation through fair scheduling
  • Properly releases worker slots after task completion
  • Handles error cases with automatic worker slot cleanup
  • Protects against race conditions in concurrent processing

Queue Priority Levels

  • QueuePriority.HIGH (1)
  • QueuePriority.MEDIUM (2)
  • QueuePriority.LOW (3)

Tasks within the same priority level are processed in FIFO order with millisecond precision.

Task Management

Submitting Tasks

task_info = await manager.submit_task(
    task_id="unique_id",
    queue_name="critical",
    task_type="example",
    task_func=my_async_function,
    priority=QueuePriority.HIGH,
    *args,
    **kwargs
)

Monitoring Task Status

status = await manager.get_task_status("unique_id")
print(f"Status: {status.status}")  # QUEUED, PROCESSING, COMPLETED, FAILED
print(f"Queue: {status.queue_name}")
print(f"Priority: {status.priority}")
print(f"Result: {status.result}")
print(f"Error: {status.error}")

Queue Status Monitoring

status = await manager.get_queue_status("critical")
print(f"Queue Size: {status['queued_tasks']}")
print(f"Active Workers: {status['current_workers']}")  # Shows current concurrent tasks
print(f"Circuit Breaker: {status['circuit_breaker_status']}")

Advanced Features

Concurrent Processing

Each queue now implements robust concurrent task processing:

  • Strict enforcement of max_workers limit through semaphores
  • Fair scheduling of tasks to prevent starvation
  • Automatic cleanup of worker slots on task completion
  • Protected against race conditions in high-concurrency scenarios
  • Error handling with proper resource cleanup

Circuit Breaker

Each queue has a built-in circuit breaker that helps prevent cascade failures:

  • Opens after 3 consecutive failures (configurable)
  • Auto-resets after 60 seconds (configurable)
  • Provides circuit state monitoring
  • Integrates with concurrent processing controls

Task Expiration

Tasks automatically expire after 24 hours (configurable) to prevent resource leaks.

Redis Configuration

manager = TaskQueueManager(
    redis_url="redis://username:password@hostname:6379/0",
    cleanup_on_start=True  # Optional: cleans up existing tasks on startup
)

Error Handling

The system handles various error scenarios:

  • TaskAlreadyExistsError: Raised when submitting a task with a duplicate ID
  • TaskNotFoundError: Raised when querying a non-existent task
  • QueueNotFoundError: Raised when accessing an undefined queue
  • QueueProcessorError: Raised for queue processing failures

API Reference

TaskQueueManager

Core Methods:

  • async initialize()
  • async create_queue(config: QueueConfig)
  • async submit_task(task_id, queue_name, task_type, task_func, priority, *args, **kwargs)
  • async get_task_status(task_id)
  • async get_queue_status(queue_name)
  • async shutdown(wait=True, timeout=5.0)

QueueConfig

Configuration Parameters:

  • name: str
  • max_workers: int - Strictly enforced concurrent task limit
  • max_size: int
  • priority: QueuePriority

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kew-0.2.0.tar.gz (18.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kew-0.2.0-py3-none-any.whl (17.7 kB view details)

Uploaded Python 3

File details

Details for the file kew-0.2.0.tar.gz.

File metadata

  • Download URL: kew-0.2.0.tar.gz
  • Upload date:
  • Size: 18.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for kew-0.2.0.tar.gz
Algorithm Hash digest
SHA256 48be7e28f1db2a0bd4b3a4213b5e3ce9fb95f61c84fc13b4976c2e31ee1dab1e
MD5 7d3544c0fd0419ecd607437ddfcf4d69
BLAKE2b-256 8fd4f3dd1011d94b105f1890c916266595d45dee575cdcd81ab9a65cf652cd07

See more details on using hashes here.

Provenance

The following attestation bundles were made for kew-0.2.0.tar.gz:

Publisher: python-package.yml on justrach/kew

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file kew-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: kew-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 17.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for kew-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 8e0d189ffa5579c340056e4f83f9fe7e1d1573ef7d0a8e44793e40e995a8e3f1
MD5 0be58b25b772fc54e4dbcdb751c69ad2
BLAKE2b-256 dae5107a3b7f378d42e03542bf604ddad9c64dcec8c43bb22e734f67b63989c0

See more details on using hashes here.

Provenance

The following attestation bundles were made for kew-0.2.0-py3-none-any.whl:

Publisher: python-package.yml on justrach/kew

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page