A flexible async task queue manager for Python applications
Project description
Kew Task Queue Manager
A robust, Redis-backed asynchronous task queue manager for Python applications with support for priority-based queues and circuit breaker patterns.
Features
- Multiple named queues with independent configurations
- Priority-based task scheduling with millisecond precision
- Redis-backed persistence for reliability
- Configurable worker pools per queue
- Built-in circuit breaker for fault tolerance
- Comprehensive task lifecycle management
- Automatic task expiration (24-hour default)
- Detailed logging and monitoring
- Graceful shutdown handling
- Thread-safe operations
Installation
pip install kew
Quick Start
import asyncio
from kew import TaskQueueManager, QueueConfig, QueuePriority
async def example_task(x: int):
await asyncio.sleep(1)
return x * 2
async def main():
# Initialize the task queue manager with Redis connection
manager = TaskQueueManager(redis_url="redis://localhost:6379")
await manager.initialize()
# Create a high-priority queue
await manager.create_queue(QueueConfig(
name="high_priority",
max_workers=4,
max_size=1000,
priority=QueuePriority.HIGH
))
# Submit a task
task_info = await manager.submit_task(
task_id="task1",
queue_name="high_priority",
task_type="multiplication",
task_func=example_task,
priority=QueuePriority.HIGH,
x=5
)
# Check task status
await asyncio.sleep(2)
status = await manager.get_task_status("task1")
print(f"Task Result: {status.result}")
# Graceful shutdown
await manager.shutdown()
if __name__ == "__main__":
asyncio.run(main())
Queue Configuration
Creating Queues
from kew import QueueConfig, QueuePriority
# Create a high-priority queue with 4 workers
await manager.create_queue(QueueConfig(
name="critical",
max_workers=4,
max_size=1000,
priority=QueuePriority.HIGH
))
Queue Priority Levels
QueuePriority.HIGH
(1)QueuePriority.MEDIUM
(2)QueuePriority.LOW
(3)
Tasks within the same priority level are processed in FIFO order with millisecond precision.
Task Management
Submitting Tasks
task_info = await manager.submit_task(
task_id="unique_id",
queue_name="critical",
task_type="example",
task_func=my_async_function,
priority=QueuePriority.HIGH,
*args,
**kwargs
)
Monitoring Task Status
status = await manager.get_task_status("unique_id")
print(f"Status: {status.status}") # QUEUED, PROCESSING, COMPLETED, FAILED
print(f"Queue: {status.queue_name}")
print(f"Priority: {status.priority}")
print(f"Result: {status.result}")
print(f"Error: {status.error}")
Queue Status Monitoring
status = await manager.get_queue_status("critical")
print(f"Queue Size: {status['queued_tasks']}")
print(f"Active Workers: {status['current_workers']}")
print(f"Circuit Breaker: {status['circuit_breaker_status']}")
Advanced Features
Circuit Breaker
Each queue has a built-in circuit breaker that helps prevent cascade failures:
- Opens after 3 consecutive failures (configurable)
- Auto-resets after 60 seconds (configurable)
- Provides circuit state monitoring
Task Expiration
Tasks automatically expire after 24 hours (configurable) to prevent resource leaks.
Redis Configuration
manager = TaskQueueManager(
redis_url="redis://username:password@hostname:6379/0",
cleanup_on_start=True # Optional: cleans up existing tasks on startup
)
Error Handling
The system handles various error scenarios:
TaskAlreadyExistsError
: Raised when submitting a task with a duplicate IDTaskNotFoundError
: Raised when querying a non-existent taskQueueNotFoundError
: Raised when accessing an undefined queueQueueProcessorError
: Raised for queue processing failures
API Reference
TaskQueueManager
Core Methods:
async initialize()
async create_queue(config: QueueConfig)
async submit_task(task_id, queue_name, task_type, task_func, priority, *args, **kwargs)
async get_task_status(task_id)
async get_queue_status(queue_name)
async shutdown(wait=True, timeout=5.0)
QueueConfig
Configuration Parameters:
name: str
max_workers: int
max_size: int
priority: QueuePriority
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
License
This project is licensed under the MIT License - see the LICENSE file for details.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file kew-0.1.4.tar.gz
.
File metadata
- Download URL: kew-0.1.4.tar.gz
- Upload date:
- Size: 15.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.12.5
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 2342f389d20d97ff524cda435545fa77db0626945ff528e0a44c675da3b40314 |
|
MD5 | 5d2dae592c38f912d6d10497f25a3c5f |
|
BLAKE2b-256 | 9fc185e8d5f9c95c12c162121344ecc28473d9cc0507be0f1c890f2b6e401cbb |
File details
Details for the file kew-0.1.4-py3-none-any.whl
.
File metadata
- Download URL: kew-0.1.4-py3-none-any.whl
- Upload date:
- Size: 12.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.12.5
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 0f0ca7d4e78b3da0af034f9c470668b9a5c67c898885bacaf9b677e6459e2438 |
|
MD5 | bab47e28567450e8c6b486c3960afdba |
|
BLAKE2b-256 | a77ec1a24dee7e925f98c713ae4d4f922b678ad172797996b1601f8859e7c71f |