An asynchronous task manager that handles concurrent task execution with automatic retries, status monitoring, and graceful cancellation.
Project description
AsyncWeaver
AsyncWeaver is a Python library for managing asynchronous tasks with retry capabilities and status monitoring. It provides a way to queue, execute, and track multiple concurrent tasks while handling failures and retries automatically.
Features
- Concurrent task execution with configurable number of workers
- Automatic task retries with customizable delay
- Task status monitoring
- Support for task cancellation (individual or bulk)
- Callback support for task completion, failure, and cancellation
- Custom task ID generation
- Type hints for better IDE support
Installation
pip install async-weave
Basic Usage
import asyncio
from async_weave import AsyncWeaver
async def example():
# Create a manager with 5 workers, max 3 retries, and 5 minute retry delay
async with AsyncWeaver(worker_count=5, max_retries=3, retry_delay=300) as manager:
# Define an async task
async def my_task(param):
# Your task logic here
return f"Processed {param}"
# Add tasks to the queue
task_id = await manager.add_task(my_task, "data")
# Wait for result
result = await manager.get_task_result(task_id)
print(f"Task completed with result: {result}")
# Run the example
asyncio.run(example())
Advanced Features
Custom Task IDs
task_id = await manager.add_task(my_task, task_id="custom_id_123")
Callbacks
By default the task_id is a str, but if you set a custom task_id or a custom task id generator it will be the type you specified.
async def example_with_callbacks():
async with AsyncWeaver() as manager:
def on_complete(task_id: str, result):
print(f"Task completed with: {result}")
def on_error(task_id: str, error):
print(f"Task failed with: {error}")
def on_cancel(task_id: str):
print("Task was cancelled")
await manager.add_task(
my_task,
on_complete=on_complete,
on_error=on_error,
on_cancel=on_cancel
)
Task Cancellation
# Cancel a specific task
await manager.cancel_task(task_id)
# Cancel all running tasks
await manager.cancel_all_tasks()
Custom ID Generator
def custom_id_generator():
return f"task_{time.time()}"
manager = AsyncWeaver(id_generator=custom_id_generator)
Configuration Options
worker_count
: Number of concurrent workers (default: 10)max_retries
: Maximum number of retry attempts for failed tasks (default: 1)retry_delay
: Time to wait before retrying failed tasks in seconds (default: 300)monitor_interval
: Interval for monitoring task status in seconds (default: 60)id_generator
: Custom function for generating task IDs
Task States
Tasks can be in the following states:
PENDING
: Task is queued but not yet startedIN_PROGRESS
: Task is currently runningCOMPLETED
: Task finished successfullyFAILED
: Task failed and exceeded retry attemptsCANCELLED
: Task was cancelled
Waiting for Task Completion
To wait for all tasks to complete, use the wait_for_completion
method:
# Wait indefinitely for all tasks to complete
await manager.wait_for_completion()
# Wait with a timeout
await manager.wait_for_completion(timeout=60) # Wait up to 60 seconds
For individual tasks, you can wait for specific results:
# Wait for a specific task to complete
result = await manager.get_task_result(task_id, timeout=30)
Note: When shutting down with shutdown()
, you can control whether to wait for completion:
# Wait for completion during shutdown
await manager.shutdown(timeout=60, cancel_running=False)
# Cancel running tasks during shutdown
await manager.shutdown(cancel_running=True)
## Testing
The project includes a comprehensive test suite. To run the tests:
```bash
pytest tests/
License
[Insert your license here]
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file async_weave-0.1.1.tar.gz
.
File metadata
- Download URL: async_weave-0.1.1.tar.gz
- Upload date:
- Size: 10.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.5.13
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 4950feb1e48ff5776636db01c1040bb6c61c0701d4d8c8fe7ce3af01a89e0f9d |
|
MD5 | b53d08155937505dced52d1281e117eb |
|
BLAKE2b-256 | ad8c601e22187e25fa2e5b6bc15816ef8ca5cdf785b4d1ba6c2f67fe4ef60494 |
File details
Details for the file async_weave-0.1.1-py3-none-any.whl
.
File metadata
- Download URL: async_weave-0.1.1-py3-none-any.whl
- Upload date:
- Size: 6.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.5.13
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 537697e80ea5591807e445977d3b86dc308178e2c146f866a9f45666c6a3d7c3 |
|
MD5 | b72b328003efcb4b41c9ee94fb04eb1f |
|
BLAKE2b-256 | 4899ac12f004cb3359593c045bc2f403a71de4937f6e256eab5f434e7923b977 |