Skip to main content

A flexible async task queue manager for Python applications

Project description

Kew Task Queue Manager

A robust, Redis-backed asynchronous task queue manager for Python applications with support for priority-based queues and circuit breaker patterns.

Features

  • Multiple named queues with independent configurations
  • Priority-based task scheduling with millisecond precision
  • Redis-backed persistence for reliability
  • Configurable worker pools per queue with strict concurrency control
  • Built-in circuit breaker for fault tolerance
  • Comprehensive task lifecycle management
  • Proper semaphore-based worker slot management
  • Race condition protection in concurrent processing
  • Automatic task expiration (24-hour default)
  • Detailed logging and monitoring
  • Graceful shutdown handling
  • Thread-safe operations

Installation

pip install kew

Quick Start

import asyncio
from kew import TaskQueueManager, QueueConfig, QueuePriority

async def example_task(x: int):
    await asyncio.sleep(1)
    return x * 2

async def main():
    # Initialize the task queue manager with Redis connection
    manager = TaskQueueManager(redis_url="redis://localhost:6379")
    await manager.initialize()
    
    # Create a high-priority queue with concurrent processing limits
    await manager.create_queue(QueueConfig(
        name="high_priority",
        max_workers=4,  # Strictly enforced concurrent task limit
        max_size=1000,
        priority=QueuePriority.HIGH
    ))
    
    # Submit a task
    task_info = await manager.submit_task(
        task_id="task1",
        queue_name="high_priority",
        task_type="multiplication",
        task_func=example_task,
        priority=QueuePriority.HIGH,
        x=5
    )
    
    # Check task status
    await asyncio.sleep(2)
    status = await manager.get_task_status("task1")
    print(f"Task Result: {status.result}")
    
    # Graceful shutdown
    await manager.shutdown()

if __name__ == "__main__":
    asyncio.run(main())

Queue Configuration

Creating Queues

from kew import QueueConfig, QueuePriority

# Create a high-priority queue with strictly enforced concurrent processing
await manager.create_queue(QueueConfig(
    name="critical",
    max_workers=4,  # Maximum number of concurrent tasks
    max_size=1000,
    priority=QueuePriority.HIGH
))

Worker Pool Management

The queue manager now implements strict concurrency control:

  • Uses semaphores to guarantee max_workers limit is respected
  • Prevents task starvation through fair scheduling
  • Properly releases worker slots after task completion
  • Handles error cases with automatic worker slot cleanup
  • Protects against race conditions in concurrent processing

Queue Priority Levels

  • QueuePriority.HIGH (1)
  • QueuePriority.MEDIUM (2)
  • QueuePriority.LOW (3)

Tasks within the same priority level are processed in FIFO order with millisecond precision.

Task Management

Submitting Tasks

task_info = await manager.submit_task(
    task_id="unique_id",
    queue_name="critical",
    task_type="example",
    task_func=my_async_function,
    priority=QueuePriority.HIGH,
    *args,
    **kwargs
)

Monitoring Task Status

status = await manager.get_task_status("unique_id")
print(f"Status: {status.status}")  # QUEUED, PROCESSING, COMPLETED, FAILED
print(f"Queue: {status.queue_name}")
print(f"Priority: {status.priority}")
print(f"Result: {status.result}")
print(f"Error: {status.error}")

Queue Status Monitoring

status = await manager.get_queue_status("critical")
print(f"Queue Size: {status['queued_tasks']}")
print(f"Active Workers: {status['current_workers']}")  # Shows current concurrent tasks
print(f"Circuit Breaker: {status['circuit_breaker_status']}")

Advanced Features

Concurrent Processing

Each queue now implements robust concurrent task processing:

  • Strict enforcement of max_workers limit through semaphores
  • Fair scheduling of tasks to prevent starvation
  • Automatic cleanup of worker slots on task completion
  • Protected against race conditions in high-concurrency scenarios
  • Error handling with proper resource cleanup

Circuit Breaker

Each queue has a built-in circuit breaker that helps prevent cascade failures:

  • Opens after 3 consecutive failures (configurable)
  • Auto-resets after 60 seconds (configurable)
  • Provides circuit state monitoring
  • Integrates with concurrent processing controls

Task Expiration

Tasks automatically expire after 24 hours (configurable) to prevent resource leaks.

Redis Configuration

manager = TaskQueueManager(
    redis_url="redis://username:password@hostname:6379/0",
    cleanup_on_start=True  # Optional: cleans up existing tasks on startup
)

Error Handling

The system handles various error scenarios:

  • TaskAlreadyExistsError: Raised when submitting a task with a duplicate ID
  • TaskNotFoundError: Raised when querying a non-existent task
  • QueueNotFoundError: Raised when accessing an undefined queue
  • QueueProcessorError: Raised for queue processing failures

API Reference

TaskQueueManager

Core Methods:

  • async initialize()
  • async create_queue(config: QueueConfig)
  • async submit_task(task_id, queue_name, task_type, task_func, priority, *args, **kwargs)
  • async get_task_status(task_id)
  • async get_queue_status(queue_name)
  • async shutdown(wait=True, timeout=5.0)

QueueConfig

Configuration Parameters:

  • name: str
  • max_workers: int - Strictly enforced concurrent task limit
  • max_size: int
  • priority: QueuePriority

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kew-0.2.1.tar.gz (20.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kew-0.2.1-py3-none-any.whl (19.4 kB view details)

Uploaded Python 3

File details

Details for the file kew-0.2.1.tar.gz.

File metadata

  • Download URL: kew-0.2.1.tar.gz
  • Upload date:
  • Size: 20.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for kew-0.2.1.tar.gz
Algorithm Hash digest
SHA256 f5d05bd468cfa6513bbc2bc99416ecc2f0fbf595ee2d1e7a2791f7b79e3b4ee9
MD5 95b3376b390024cedcde1bc6caac38e8
BLAKE2b-256 697e0f3a871db2fbd732885e2563ba640a42256bbd5138a0707108b9151b42ea

See more details on using hashes here.

Provenance

The following attestation bundles were made for kew-0.2.1.tar.gz:

Publisher: python-package.yml on justrach/kew

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file kew-0.2.1-py3-none-any.whl.

File metadata

  • Download URL: kew-0.2.1-py3-none-any.whl
  • Upload date:
  • Size: 19.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for kew-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 94be8983d0cfc230012093f343dcdd844a633e27867316fe2d0a5a98b6f79c6c
MD5 aded4661422624daec0f2d8706be189e
BLAKE2b-256 bf26e31434c1ea7d8eeb1ab2a7008fc4c9a6665f66410d8bdf251c77633e68a0

See more details on using hashes here.

Provenance

The following attestation bundles were made for kew-0.2.1-py3-none-any.whl:

Publisher: python-package.yml on justrach/kew

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page