Skip to main content

A flexible async task queue manager for Python applications

Project description

Kew Task Queue Manager

A robust, Redis-backed asynchronous task queue manager for Python applications with support for priority-based queues and circuit breaker patterns.

Features

  • Multiple named queues with independent configurations
  • Priority-based task scheduling with millisecond precision
  • Redis-backed persistence for reliability
  • Configurable worker pools per queue with strict concurrency control
  • Built-in circuit breaker for fault tolerance
  • Comprehensive task lifecycle management
  • Proper semaphore-based worker slot management
  • Race condition protection in concurrent processing
  • Automatic task expiration (24-hour default)
  • Detailed logging and monitoring
  • Graceful shutdown handling
  • Thread-safe operations

Installation

pip install kew

Quick Start

import asyncio
from kew import TaskQueueManager, QueueConfig, QueuePriority

async def example_task(x: int):
    await asyncio.sleep(1)
    return x * 2

async def main():
    # Initialize the task queue manager with Redis connection
    manager = TaskQueueManager(redis_url="redis://localhost:6379")
    await manager.initialize()
    
    # Create a high-priority queue with concurrent processing limits
    await manager.create_queue(QueueConfig(
        name="high_priority",
        max_workers=4,  # Strictly enforced concurrent task limit
        max_size=1000,
        priority=QueuePriority.HIGH
    ))
    
    # Submit a task
    task_info = await manager.submit_task(
        task_id="task1",
        queue_name="high_priority",
        task_type="multiplication",
        task_func=example_task,
        priority=QueuePriority.HIGH,
        x=5
    )
    
    # Check task status
    await asyncio.sleep(2)
    status = await manager.get_task_status("task1")
    print(f"Task Result: {status.result}")
    
    # Graceful shutdown
    await manager.shutdown()

if __name__ == "__main__":
    asyncio.run(main())

Queue Configuration

Creating Queues

from kew import QueueConfig, QueuePriority

# Create a high-priority queue with strictly enforced concurrent processing
await manager.create_queue(QueueConfig(
    name="critical",
    max_workers=4,  # Maximum number of concurrent tasks
    max_size=1000,
    priority=QueuePriority.HIGH
))

Worker Pool Management

The queue manager now implements strict concurrency control:

  • Uses semaphores to guarantee max_workers limit is respected
  • Prevents task starvation through fair scheduling
  • Properly releases worker slots after task completion
  • Handles error cases with automatic worker slot cleanup
  • Protects against race conditions in concurrent processing

Queue Priority Levels

  • QueuePriority.HIGH (1)
  • QueuePriority.MEDIUM (2)
  • QueuePriority.LOW (3)

Tasks within the same priority level are processed in FIFO order with millisecond precision.

Task Management

Submitting Tasks

task_info = await manager.submit_task(
    task_id="unique_id",
    queue_name="critical",
    task_type="example",
    task_func=my_async_function,
    priority=QueuePriority.HIGH,
    *args,
    **kwargs
)

Monitoring Task Status

status = await manager.get_task_status("unique_id")
print(f"Status: {status.status}")  # QUEUED, PROCESSING, COMPLETED, FAILED
print(f"Queue: {status.queue_name}")
print(f"Priority: {status.priority}")
print(f"Result: {status.result}")
print(f"Error: {status.error}")

Queue Status Monitoring

status = await manager.get_queue_status("critical")
print(f"Queue Size: {status['queued_tasks']}")
print(f"Active Workers: {status['current_workers']}")  # Shows current concurrent tasks
print(f"Circuit Breaker: {status['circuit_breaker_status']}")

Advanced Features

Concurrent Processing

Each queue now implements robust concurrent task processing:

  • Strict enforcement of max_workers limit through semaphores
  • Fair scheduling of tasks to prevent starvation
  • Automatic cleanup of worker slots on task completion
  • Protected against race conditions in high-concurrency scenarios
  • Error handling with proper resource cleanup

Circuit Breaker

Each queue has a built-in circuit breaker that helps prevent cascade failures:

  • Opens after 3 consecutive failures (configurable)
  • Auto-resets after 60 seconds (configurable)
  • Provides circuit state monitoring
  • Integrates with concurrent processing controls

Task Expiration

Tasks automatically expire after 24 hours (configurable) to prevent resource leaks.

Redis Configuration

manager = TaskQueueManager(
    redis_url="redis://username:password@hostname:6379/0",
    cleanup_on_start=True  # Optional: cleans up existing tasks on startup
)

Error Handling

The system handles various error scenarios:

  • TaskAlreadyExistsError: Raised when submitting a task with a duplicate ID
  • TaskNotFoundError: Raised when querying a non-existent task
  • QueueNotFoundError: Raised when accessing an undefined queue
  • QueueProcessorError: Raised for queue processing failures

API Reference

TaskQueueManager

Core Methods:

  • async initialize()
  • async create_queue(config: QueueConfig)
  • async submit_task(task_id, queue_name, task_type, task_func, priority, *args, **kwargs)
  • async get_task_status(task_id)
  • async get_queue_status(queue_name)
  • async shutdown(wait=True, timeout=5.0)

QueueConfig

Configuration Parameters:

  • name: str
  • max_workers: int - Strictly enforced concurrent task limit
  • max_size: int
  • priority: QueuePriority

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kew-0.1.8.tar.gz (15.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kew-0.1.8-py3-none-any.whl (13.5 kB view details)

Uploaded Python 3

File details

Details for the file kew-0.1.8.tar.gz.

File metadata

  • Download URL: kew-0.1.8.tar.gz
  • Upload date:
  • Size: 15.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for kew-0.1.8.tar.gz
Algorithm Hash digest
SHA256 7b6c8f44e3fa26c07caeabe3882eace30fbf6216ef8cd37e9430087eabc0c034
MD5 4eef43320ff776bd0e244eab519fa1fe
BLAKE2b-256 2487f54b6bf366ea3e761eed86ee5dfe9b88bb0deb61d594d986c6998168fcab

See more details on using hashes here.

Provenance

The following attestation bundles were made for kew-0.1.8.tar.gz:

Publisher: python-package.yml on justrach/kew

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file kew-0.1.8-py3-none-any.whl.

File metadata

  • Download URL: kew-0.1.8-py3-none-any.whl
  • Upload date:
  • Size: 13.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for kew-0.1.8-py3-none-any.whl
Algorithm Hash digest
SHA256 93d9beee80dd754e8271c8645d4d0a1026486608db205b00d38ecc5f68b719df
MD5 89ba6b60e594f1422215d91801f7dcb8
BLAKE2b-256 e311810d19d5712ade04c25d467355970836c36d09e24c3cad0f16c66afb3914

See more details on using hashes here.

Provenance

The following attestation bundles were made for kew-0.1.8-py3-none-any.whl:

Publisher: python-package.yml on justrach/kew

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page