A flexible async task queue manager for Python applications
Project description
Kew Task Queue Manager
A robust, Redis-backed asynchronous task queue manager for Python applications with support for priority-based queues and circuit breaker patterns.
Features
- Multiple named queues with independent configurations
- Priority-based task scheduling with millisecond precision
- Redis-backed persistence for reliability
- Configurable worker pools per queue with strict concurrency control
- Built-in circuit breaker for fault tolerance
- Comprehensive task lifecycle management
- Proper semaphore-based worker slot management
- Race condition protection in concurrent processing
- Automatic task expiration (24-hour default)
- Detailed logging and monitoring
- Graceful shutdown handling
- Thread-safe operations
Installation
pip install kew
Quick Start
import asyncio
from kew import TaskQueueManager, QueueConfig, QueuePriority
async def example_task(x: int):
await asyncio.sleep(1)
return x * 2
async def main():
# Initialize the task queue manager with Redis connection
manager = TaskQueueManager(redis_url="redis://localhost:6379")
await manager.initialize()
# Create a high-priority queue with concurrent processing limits
await manager.create_queue(QueueConfig(
name="high_priority",
max_workers=4, # Strictly enforced concurrent task limit
max_size=1000,
priority=QueuePriority.HIGH
))
# Submit a task
task_info = await manager.submit_task(
task_id="task1",
queue_name="high_priority",
task_type="multiplication",
task_func=example_task,
priority=QueuePriority.HIGH,
x=5
)
# Check task status
await asyncio.sleep(2)
status = await manager.get_task_status("task1")
print(f"Task Result: {status.result}")
# Graceful shutdown
await manager.shutdown()
if __name__ == "__main__":
asyncio.run(main())
Queue Configuration
Creating Queues
from kew import QueueConfig, QueuePriority
# Create a high-priority queue with strictly enforced concurrent processing
await manager.create_queue(QueueConfig(
name="critical",
max_workers=4, # Maximum number of concurrent tasks
max_size=1000,
priority=QueuePriority.HIGH
))
Worker Pool Management
The queue manager now implements strict concurrency control:
- Uses semaphores to guarantee max_workers limit is respected
- Prevents task starvation through fair scheduling
- Properly releases worker slots after task completion
- Handles error cases with automatic worker slot cleanup
- Protects against race conditions in concurrent processing
Queue Priority Levels
QueuePriority.HIGH(1)QueuePriority.MEDIUM(2)QueuePriority.LOW(3)
Tasks within the same priority level are processed in FIFO order with millisecond precision.
Task Management
Submitting Tasks
task_info = await manager.submit_task(
task_id="unique_id",
queue_name="critical",
task_type="example",
task_func=my_async_function,
priority=QueuePriority.HIGH,
*args,
**kwargs
)
Monitoring Task Status
status = await manager.get_task_status("unique_id")
print(f"Status: {status.status}") # QUEUED, PROCESSING, COMPLETED, FAILED
print(f"Queue: {status.queue_name}")
print(f"Priority: {status.priority}")
print(f"Result: {status.result}")
print(f"Error: {status.error}")
Queue Status Monitoring
status = await manager.get_queue_status("critical")
print(f"Queue Size: {status['queued_tasks']}")
print(f"Active Workers: {status['current_workers']}") # Shows current concurrent tasks
print(f"Circuit Breaker: {status['circuit_breaker_status']}")
Advanced Features
Concurrent Processing
Each queue now implements robust concurrent task processing:
- Strict enforcement of max_workers limit through semaphores
- Fair scheduling of tasks to prevent starvation
- Automatic cleanup of worker slots on task completion
- Protected against race conditions in high-concurrency scenarios
- Error handling with proper resource cleanup
Circuit Breaker
Each queue has a built-in circuit breaker that helps prevent cascade failures:
- Opens after 3 consecutive failures (configurable)
- Auto-resets after 60 seconds (configurable)
- Provides circuit state monitoring
- Integrates with concurrent processing controls
Task Expiration
Tasks automatically expire after 24 hours (configurable) to prevent resource leaks.
Redis Configuration
manager = TaskQueueManager(
redis_url="redis://username:password@hostname:6379/0",
cleanup_on_start=True # Optional: cleans up existing tasks on startup
)
Error Handling
The system handles various error scenarios:
TaskAlreadyExistsError: Raised when submitting a task with a duplicate IDTaskNotFoundError: Raised when querying a non-existent taskQueueNotFoundError: Raised when accessing an undefined queueQueueProcessorError: Raised for queue processing failures
API Reference
TaskQueueManager
Core Methods:
async initialize()async create_queue(config: QueueConfig)async submit_task(task_id, queue_name, task_type, task_func, priority, *args, **kwargs)async get_task_status(task_id)async get_queue_status(queue_name)async shutdown(wait=True, timeout=5.0)
QueueConfig
Configuration Parameters:
name: strmax_workers: int- Strictly enforced concurrent task limitmax_size: intpriority: QueuePriority
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
License
This project is licensed under the MIT License - see the LICENSE file for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file kew-0.1.8.tar.gz.
File metadata
- Download URL: kew-0.1.8.tar.gz
- Upload date:
- Size: 15.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7b6c8f44e3fa26c07caeabe3882eace30fbf6216ef8cd37e9430087eabc0c034
|
|
| MD5 |
4eef43320ff776bd0e244eab519fa1fe
|
|
| BLAKE2b-256 |
2487f54b6bf366ea3e761eed86ee5dfe9b88bb0deb61d594d986c6998168fcab
|
Provenance
The following attestation bundles were made for kew-0.1.8.tar.gz:
Publisher:
python-package.yml on justrach/kew
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
kew-0.1.8.tar.gz -
Subject digest:
7b6c8f44e3fa26c07caeabe3882eace30fbf6216ef8cd37e9430087eabc0c034 - Sigstore transparency entry: 868649551
- Sigstore integration time:
-
Permalink:
justrach/kew@c7e2d6b3d066ed72479d4060bce61876ea43ba97 -
Branch / Tag:
refs/tags/v0.1.8 - Owner: https://github.com/justrach
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
python-package.yml@c7e2d6b3d066ed72479d4060bce61876ea43ba97 -
Trigger Event:
release
-
Statement type:
File details
Details for the file kew-0.1.8-py3-none-any.whl.
File metadata
- Download URL: kew-0.1.8-py3-none-any.whl
- Upload date:
- Size: 13.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
93d9beee80dd754e8271c8645d4d0a1026486608db205b00d38ecc5f68b719df
|
|
| MD5 |
89ba6b60e594f1422215d91801f7dcb8
|
|
| BLAKE2b-256 |
e311810d19d5712ade04c25d467355970836c36d09e24c3cad0f16c66afb3914
|
Provenance
The following attestation bundles were made for kew-0.1.8-py3-none-any.whl:
Publisher:
python-package.yml on justrach/kew
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
kew-0.1.8-py3-none-any.whl -
Subject digest:
93d9beee80dd754e8271c8645d4d0a1026486608db205b00d38ecc5f68b719df - Sigstore transparency entry: 868649553
- Sigstore integration time:
-
Permalink:
justrach/kew@c7e2d6b3d066ed72479d4060bce61876ea43ba97 -
Branch / Tag:
refs/tags/v0.1.8 - Owner: https://github.com/justrach
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
python-package.yml@c7e2d6b3d066ed72479d4060bce61876ea43ba97 -
Trigger Event:
release
-
Statement type: