Skip to main content

An async task runner with concurrency control, retries, and dynamic timeouts.

Project description

Robust Async Task Runner

Overview

This project provides a Python class, ResilientTaskRunner, designed to execute asynchronous tasks with enhanced robustness features. It manages concurrency, handles retries with exponential backoff and jitter, implements dynamic attempt timeouts based on historical performance, and provides detailed success/failure reporting.

Features

  • Concurrency Control: Limits the number of tasks running simultaneously using asyncio.Semaphore.
  • Automatic Retries: Automatically retries failed tasks up to a configurable maximum number of attempts.
  • Exponential Backoff & Jitter: Implements exponential backoff for retry delays, with added jitter to prevent thundering herd issues.
  • Attempt Timeouts: Each execution attempt has a timeout.
    • Dynamic Timeouts: Optionally adjusts attempt timeouts based on the performance (execution duration) of recently completed successful tasks. Uses a percentile of historical durations multiplied by a factor.
    • Timeout Backoff: The timeout duration for the next attempt is doubled after any failure (timeout or exception) within the current attempt.
  • Detailed Results: Returns separate lists of success and failure objects, including duration, attempts made, results, or error details.
  • Clean Logging: Uses Python's logging module with task-specific identifiers.

Configuration Parameters

The ResilientTaskRunner class is initialized with the following parameters:

  • max_concurrency (int, default: 10): Maximum number of tasks to run concurrently.
  • max_attempts (int, default: 3): Total maximum attempts per task (1 initial + max_attempts - 1 retries).
  • retry_delay_seconds (float, default: 1.0): Base delay in seconds before the first retry. Subsequent retries use exponential backoff (delay * 2^attempt_index).
  • default_attempt_timeout_seconds (Optional[float], default: 30.0): The timeout for each individual task attempt if dynamic timeouts are disabled or haven't gathered enough data.
  • enable_dynamic_timeouts (bool, default: True): If True, attempts to calculate attempt timeouts based on historical execution times.
  • min_completed_for_stats (int, default: 5): Minimum number of successful tasks needed before dynamic timeouts can be calculated.
  • completed_duration_history_size (int, default: 50): Maximum number of recent successful task durations to keep for calculating dynamic timeouts.
  • straggler_percentile (float, default: 90.0): The percentile of successful task durations used as the base for dynamic timeout calculation.
  • straggler_factor (float, default: 2.5): The multiplier applied to the percentile duration to get the dynamic attempt timeout.

Usage

  1. Import: from main import ResilientTaskRunner, asyncio
  2. Define Async Tasks: Create your asynchronous functions that the runner will execute.
  3. Instantiate Runner: Create an instance of ResilientTaskRunner, optionally overriding default configuration parameters.
    runner = ResilientTaskRunner(
        max_concurrency=5,
        max_attempts=4,
        default_attempt_timeout_seconds=10.0
    )
    
  4. Add Tasks: Use the add_task method to add tasks to the runner's queue.
    async def my_task(arg1, arg2):
        # ... do async work ...
        await asyncio.sleep(1)
        return f"Result: {arg1 + arg2}"
    
    runner.add_task(my_task, 10, 20)
    runner.add_task(another_async_func, arg_for_it)
    # ... add more tasks
    
  5. Run: Call the run method to start processing the tasks. This is an async method.
    async def main():
        # ... instantiate runner and add tasks ...
        successes, failures = await runner.run()
        print("--- Successes ---")
        for s in successes:
            print(f"  Task {s.task_id}: Result='{s.result}', Duration={s.duration:.2f}s, Attempts={s.total_attempts_made}")
        print("--- Failures ---")
        # ... print failure details ...
    
    if __name__ == "__main__":
        asyncio.run(main())
    
  6. Execute: Run the script containing your main function.
    python main.py
    
    (Or uv run python main.py if using uv)

Dependencies

This script requires only standard Python 3 libraries (asyncio, logging, time, random, statistics, collections, typing, dataclasses).

If using the provided pyproject.toml and uv.lock, you can manage the environment with uv.

# Sync environment (optional, if using uv)
uv sync

# Run the example
uv run python main.py

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

resilient_runner-0.1.0.tar.gz (6.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

resilient_runner-0.1.0-py3-none-any.whl (6.6 kB view details)

Uploaded Python 3

File details

Details for the file resilient_runner-0.1.0.tar.gz.

File metadata

  • Download URL: resilient_runner-0.1.0.tar.gz
  • Upload date:
  • Size: 6.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.6.17

File hashes

Hashes for resilient_runner-0.1.0.tar.gz
Algorithm Hash digest
SHA256 03ddf358b13b14197a229641511e1b579c28f0ce85fd9dd25235c55d51fc03cf
MD5 9028e4ee9075db33ccf5a1d96ca3b272
BLAKE2b-256 2a786d33978dafa8fae928e998b7df96285811af408487f1ba0c3e35416518d1

See more details on using hashes here.

File details

Details for the file resilient_runner-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for resilient_runner-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 9dc436fe22991c798a1b41560a7539514f58a85518b72d0e9be5b6dbcd081af7
MD5 8ce4461e4be6cdd41ffd88551ec9e361
BLAKE2b-256 9a65dbc27cc4d2d1ff94dfdf4a4fe18ed88b8c917a03d216d971c5aa11f63ab5

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page