An async task runner with concurrency control, retries, and dynamic timeouts.
Project description
Robust Async Task Runner
Overview
This project provides a Python class, ResilientTaskRunner, designed to execute asynchronous tasks with enhanced robustness features. It manages concurrency, handles retries with exponential backoff and jitter, implements dynamic attempt timeouts based on historical performance, and provides detailed success/failure reporting.
Features
- Concurrency Control: Limits the number of tasks running simultaneously using
asyncio.Semaphore. - Automatic Retries: Automatically retries failed tasks up to a configurable maximum number of attempts.
- Exponential Backoff & Jitter: Implements exponential backoff for retry delays, with added jitter to prevent thundering herd issues.
- Attempt Timeouts: Each execution attempt has a timeout.
- Dynamic Timeouts: Optionally adjusts attempt timeouts based on the performance (execution duration) of recently completed successful tasks. Uses a percentile of historical durations multiplied by a factor.
- Timeout Backoff: The timeout duration for the next attempt is doubled after any failure (timeout or exception) within the current attempt.
- Detailed Results: Returns separate lists of success and failure objects, including duration, attempts made, results, or error details.
- Clean Logging: Uses Python's
loggingmodule with task-specific identifiers.
Configuration Parameters
The ResilientTaskRunner class is initialized with the following parameters:
max_concurrency(int, default: 10): Maximum number of tasks to run concurrently.max_attempts(int, default: 3): Total maximum attempts per task (1 initial +max_attempts - 1retries).retry_delay_seconds(float, default: 1.0): Base delay in seconds before the first retry. Subsequent retries use exponential backoff (delay * 2^attempt_index).default_attempt_timeout_seconds(Optional[float], default: 30.0): The timeout for each individual task attempt if dynamic timeouts are disabled or haven't gathered enough data.enable_dynamic_timeouts(bool, default: True): IfTrue, attempts to calculate attempt timeouts based on historical execution times.min_completed_for_stats(int, default: 5): Minimum number of successful tasks needed before dynamic timeouts can be calculated.completed_duration_history_size(int, default: 50): Maximum number of recent successful task durations to keep for calculating dynamic timeouts.straggler_percentile(float, default: 90.0): The percentile of successful task durations used as the base for dynamic timeout calculation.straggler_factor(float, default: 2.5): The multiplier applied to the percentile duration to get the dynamic attempt timeout.
Usage
- Import:
from main import ResilientTaskRunner, asyncio - Define Async Tasks: Create your asynchronous functions that the runner will execute.
- Instantiate Runner: Create an instance of
ResilientTaskRunner, optionally overriding default configuration parameters.runner = ResilientTaskRunner( max_concurrency=5, max_attempts=4, default_attempt_timeout_seconds=10.0 )
- Add Tasks: Use the
add_taskmethod to add tasks to the runner's queue.async def my_task(arg1, arg2): # ... do async work ... await asyncio.sleep(1) return f"Result: {arg1 + arg2}" runner.add_task(my_task, 10, 20) runner.add_task(another_async_func, arg_for_it) # ... add more tasks
- Run: Call the
runmethod to start processing the tasks. This is an async method.async def main(): # ... instantiate runner and add tasks ... successes, failures = await runner.run() print("--- Successes ---") for s in successes: print(f" Task {s.task_id}: Result='{s.result}', Duration={s.duration:.2f}s, Attempts={s.total_attempts_made}") print("--- Failures ---") # ... print failure details ... if __name__ == "__main__": asyncio.run(main())
- Execute: Run the script containing your
mainfunction.python main.py(Oruv run python main.pyif usinguv)
Dependencies
This script requires only standard Python 3 libraries (asyncio, logging, time, random, statistics, collections, typing, dataclasses).
If using the provided pyproject.toml and uv.lock, you can manage the environment with uv.
# Sync environment (optional, if using uv)
uv sync
# Run the example
uv run python main.py
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file resilient_runner-0.1.0.tar.gz.
File metadata
- Download URL: resilient_runner-0.1.0.tar.gz
- Upload date:
- Size: 6.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.6.17
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
03ddf358b13b14197a229641511e1b579c28f0ce85fd9dd25235c55d51fc03cf
|
|
| MD5 |
9028e4ee9075db33ccf5a1d96ca3b272
|
|
| BLAKE2b-256 |
2a786d33978dafa8fae928e998b7df96285811af408487f1ba0c3e35416518d1
|
File details
Details for the file resilient_runner-0.1.0-py3-none-any.whl.
File metadata
- Download URL: resilient_runner-0.1.0-py3-none-any.whl
- Upload date:
- Size: 6.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.6.17
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9dc436fe22991c798a1b41560a7539514f58a85518b72d0e9be5b6dbcd081af7
|
|
| MD5 |
8ce4461e4be6cdd41ffd88551ec9e361
|
|
| BLAKE2b-256 |
9a65dbc27cc4d2d1ff94dfdf4a4fe18ed88b8c917a03d216d971c5aa11f63ab5
|