Skip to main content

Generic async resource pool with rotation, cooldown, and retry

Project description

rotapool

Async resource pool with inline health feedback, automatic cooldown, and retry — for API keys, proxies, GPU workers, or anything that can rate-limit you or go down.

Core idea

Most resource pools are passive — they hand out resources round-robin or at random, and rely on external health checks to detect and remove bad ones. rotapool closes that gap: every call through the pool is also a health probe. The pool learns from caller signals in real time and immediately adjusts which resources to offer — no external probers or manual updates needed.

Not every failure means the resource is bad — an HTTP 400 is your bug, but a 429 is the key's problem. You tell rotapool which is which by raising exceptions from inside your operation, and the pool reacts accordingly:

Signal Meaning
normal return / any other exception Resource is healthy
CooldownResource Temporarily overloaded (e.g. 429)
DisableResource Permanently unusable (e.g. revoked key)

rotapool handles the rest — picks the best resource, cools down bad ones, cancels doomed in-flight work, and retries automatically.

Install

pip install rotapool
# or
uv add rotapool

Requires Python 3.10+.

Quick start

Initialize the pool

from rotapool import CooldownResource, DisableResource, Pool, Resource

# Define your resources (e.g. API keys)
pool = Pool(
    # A list or dict of Resource objects. Dict keys are used as resource IDs.
    resources=[
        Resource(
            resource_id="key-1",                 # Unique identifier (used in logs, metrics, snapshot)
            value="sk-aaa",                      # The actual resource value (generic type T)
            # max_in_flight=None,                # Max concurrent usages per resource (None = unlimited)
        ),
        Resource(resource_id="key-2", value="sk-bbb"),
        Resource(resource_id="key-3", value="sk-ccc"),
    ],
    max_attempts=3,                              # Total retry budget per run() call (capped at len(resources))
    cooldown_table=(30.0, 120.0, 300.0, 600.0),  # Escalation: 1st=30s, 2nd=120s, 3rd=300s, 4th+=600s
)

Option 1: Use the decorator

# Resource rotation happens automatically.
# All parameters are optional and forward to pool.run() on every call.
@pool.rotated(
    max_attempts=None,       # Override the pool's max_attempts for this decorated function
    deadline=None,           # Absolute time.monotonic() deadline; None = no deadline
    retry_delay=0.5,         # Seconds to pause between failed attempts
    request_id=None,         # Opaque string attached to every Usage (e.g. HTTP request-id); auto-UUID if None
)
async def call_upstream(resource, url, payload):
    async with httpx.AsyncClient() as client:
        resp = await client.post(
            url,
            headers={"Authorization": f"Bearer {resource.value}"},
            json=payload,
        )

    if resp.status_code == 429:
        raise CooldownResource(
            cooldown_seconds=parse_retry_after(resp.headers.get("retry-after")),
            reason="rate limited",
        )

    if resp.status_code == 401:
        raise DisableResource(reason="invalid key")

    return resp.json()

# Call it — the framework picks the best key and retries on failure
result = await call_upstream("https://api.example.com/v1/chat", {"prompt": "hi"})

Option 2: Direct run()

@pool.rotated() is a thin shim over pool.run(). Use run() directly when you want per-call overrides or when the call site can't be decorated:

async def call_upstream(resource, url, payload):
    async with httpx.AsyncClient() as client:
        resp = await client.post(
            url,
            headers={"Authorization": f"Bearer {resource.value}"},
            json=payload,
        )

    if resp.status_code == 429:
        raise CooldownResource(reason="rate limited")
    if resp.status_code == 401:
        raise DisableResource(reason="invalid key")

    return resp.json()

# Operation receives the selected Resource as its first argument.
result = await pool.run(
    lambda resource: call_upstream(resource, "https://api.example.com/v1/chat", {"prompt": "hi"}),
    max_attempts=None,               # Override the pool's max_attempts for this call only
    deadline=time.monotonic() + 30,  # Absolute time.monotonic() deadline bounding total retry time
    retry_delay=0.5,                 # Seconds to pause between failed attempts
    request_id="req-abc",            # Opaque string attached to every Usage; auto-UUID when None
)

How it works

Selection

When multiple resources are healthy, the pool picks the one with:

  1. Fewest in-flight usages (load spreading)
  2. Oldest last_acquired_at (round-robin fairness)

Selection and usage registration are atomic under one lock acquisition.

Cooldown escalation

Each consecutive CooldownResource from the same resource escalates the cooldown:

Consecutive count Cooldown
1st 30s
2nd 120s
3rd 300s
4th+ 600s

You can override per-event: CooldownResource(cooldown_seconds=5) (e.g. from a Retry-After header). The counter resets on the next success.

Custom tables are supported per pool:

pool = Pool(
    resources=[...],
    cooldown_table=(10.0, 30.0, 60.0, 120.0),
)

In-flight cancellation (best-effort)

When a resource receives a CooldownResource or DisableResource signal, the framework cancels younger in-flight usages on the same resource. Older usages are left alone — they may still succeed. This maximises throughput while avoiding doomed requests.

Cancellation is best-effort: it works when the operation returns a coroutine (the framework wraps it in an asyncio.Task) or an asyncio.Future (cancelled directly). For plain awaitables with no .cancel() handle, cancellation silently no-ops for that usage and it runs to natural completion. Within a coroutine, the underlying I/O is only truly aborted if the operation uses cancellation-aware async libs (httpx.AsyncClient, aiohttp).

Retry

pool.run() drives the retry loop. @pool.rotated() is a thin decorator shim over it. Attempts are capped at min(max_attempts, len(resources)) — more retries than resources is pointless.

Cancellation discrimination

The framework distinguishes external cancellation (client disconnect, shutdown — re-raised) from internal cancellation (resource failure — swallowed and retried) by checking usage.status. The cooldown/disable handler sets the status to "cancelled" under the pool lock before invoking .cancel() on the handle, so observing that status when CancelledError arrives reliably means "we cancelled ourselves." Works on any Python 3.10+.

API reference

rotapool.Pool[T]

pool = Pool(
    resources: list[Resource[T]] | dict[str, Resource[T]],
    # resources:       A list of Resource objects, or a dict mapping resource_id -> Resource.
    #                  Duplicate resource_ids in list form raise ValueError.

    max_attempts: int = 3,
    # max_attempts:    Total retry budget per run() call. Each attempt picks a fresh
    #                  resource. Effectively capped at len(resources) — once every
    #                  resource has been tried and none is eligible, run() raises
    #                  PoolExhausted rather than retrying any one twice.

    cooldown_table: tuple[float, ...] = (30.0, 120.0, 300.0, 600.0),
    # cooldown_table:  Escalation table indexed by consecutive_cooldown count.
    #                  1st cooldown → cooldown_table[0], 2nd → cooldown_table[1], etc.
    #                  Out-of-range values clamp to the last entry.
)
await pool.run(
    operation: Callable[[Resource[T]], Awaitable[R]],
    # operation:       Callable receiving the selected Resource and returning an
    #                  Awaitable. Raise CooldownResource or DisableResource to
    #                  signal resource health. Any other exception is treated as
    #                  "resource is fine" and propagates to the caller.
    #                  Accepted return types:
    #                    - coroutine          (typical async def)        -- cancellable
    #                    - asyncio.Future     (e.g. loop.create_future)  -- cancellable
    #                    - any Awaitable      (custom __await__)         -- best-effort
    #                  Returning a non-Awaitable raises TypeError at call time.

    *,                 # All following parameters are keyword-only.

    max_attempts: int | None = None,
    # max_attempts:    Per-call override of the pool's max_attempts. None = use pool default.

    deadline: float | None = None,
    # deadline:        Absolute time.monotonic() value bounding total time across
    #                  retries. Raises PoolExhausted if exceeded. None = no deadline.

    retry_delay: float = 0.5,
    # retry_delay:     Seconds to pause between failed attempts.

    request_id: str | None = None,
    # request_id:      Opaque string attached to every Usage created by this call.
    #                  Auto-generated UUID when None.
) -> R
@pool.rotated(
    max_attempts: int | None = None,     # Per-call override; None = use pool default
    deadline: float | None = None,       # Absolute time.monotonic() deadline
    retry_delay: float = 0.5,            # Pause between failed attempts
    request_id: str | None = None,       # Opaque string for Usage tracking
)
# Returns a decorator. The decorated function receives a Resource[T] as its
# first positional argument (injected by the wrapper), followed by caller args.
# Any callable returning an Awaitable is accepted (async def, sync function
# returning a coroutine / Future / awaitable). A callable that returns a
# non-Awaitable raises TypeError at call time.
pool.snapshot() -> dict[str, dict[str, Any]]
# Returns a point-in-time summary of every resource. Thread-safe without the lock.
# Example return value:
# {
#     "key-1": {
#         "status": "healthy",                  # "healthy" | "cooling_down" | "disabled"
#         "in_flight": 2,                       # Current in-flight usage count
#         "consecutive_cooldown": 0,            # Escalation counter
#         "cooldown_seconds_remaining": 0.0,    # Seconds until cooldown expires (0 if healthy)
#         "last_acquired_at": 12345.67,         # time.monotonic() of last acquire
#     },
#     ...
# }

rotapool.Resource[T]

resource = Resource(
    resource_id: str,
    # resource_id:          Unique identifier for this resource.

    value: T,
    # value:                The actual resource object (API key, proxy URL, etc.).

    max_in_flight: int | None = None,
    # max_in_flight:        Maximum concurrent usages. None = unlimited, 1 = exclusive.

    status: str = "healthy",
    # status:               Current health: "healthy", "cooling_down", or "disabled".
    #                       Managed by the framework — do not set manually.

    cooldown_until: float = 0.0,
    # cooldown_until:       time.monotonic() deadline when status is "cooling_down".
    #                       Managed by the framework — do not set manually.

    last_acquired_at: float = 0.0,
    # last_acquired_at:     time.monotonic() of most recent acquire. Affects selection
    #                       order (oldest first). Managed by the framework.

    consecutive_cooldown: int = 0,
    # consecutive_cooldown: Number of consecutive CooldownResource signals. Indexes into
    #                       the pool's cooldown_table. Resets to 0 on next success.
    #                       Managed by the framework — do not set manually.
)

Exceptions

Exception Who raises it Meaning
CooldownResource Your operation Resource temporarily over capacity
DisableResource Your operation Resource permanently bad
PoolExhausted Framework No eligible resource, max attempts reached, or deadline passed
raise CooldownResource(
    cooldown_seconds: float | None = None,
    # Explicit cooldown duration (e.g. from Retry-After header).
    # None = use the pool's cooldown_table based on consecutive_cooldown count.

    reason: str | None = None,
    # Free-form string surfaced in the exception message and logs.
)
raise DisableResource(
    reason: str | None = None,
    # Free-form string surfaced in the exception message and logs.
)

Resource types

rotapool is generic — T can be anything:

# API keys (string bearer tokens)
Resource(resource_id="key-1", value="sk-...")

# HTTP proxies
Resource(resource_id="proxy-1", value="http://proxy:8080", max_in_flight=10)

# Browser sessions (exclusive)
Resource(resource_id="session-1", value=<webdriver>, max_in_flight=1)

# GPU workers
Resource(resource_id="gpu-0", value="cuda:0", max_in_flight=1)

Operation shapes

pool.run and @pool.rotated accept any callable that returns an Awaitable. The framework picks the cancellation strategy at runtime based on what the callable returns:

# 1. async def -- the typical case. Cancellation is full-strength: the
#    framework wraps the coroutine in a Task and cancels younger siblings
#    via task.cancel() on resource failure.
@pool.rotated()
async def call_async(resource, payload):
    async with httpx.AsyncClient() as client:
        return await client.post(url, json=payload,
                                 headers={"Authorization": f"Bearer {resource.value}"})

# 2. Sync function returning a coroutine -- previously rejected, now accepted.
#    Useful when you want to construct the coroutine yourself or thread args.
@pool.rotated()
def call_returning_coro(resource, payload):
    return some_async_helper(resource.value, payload)  # returns a coroutine

# 3. Sync function returning an asyncio.Future -- accepted and cancellable
#    via Future.cancel(). Useful for executor wrappers.
@pool.rotated()
def call_in_thread(resource, payload):
    loop = asyncio.get_running_loop()
    return loop.run_in_executor(None, blocking_request, resource.value, payload)

# 4. Anything returning a plain Awaitable (custom __await__) is also accepted,
#    but with no cancel handle: younger sibling cancellation silently no-ops
#    for this usage and it runs to natural completion (best-effort).

A callable that returns a non-Awaitable (e.g. a plain int) raises TypeError at call time. The resource is marked healthy (your bug, not the resource's) and the error propagates to the caller.

Testing

# pip
pip install -e ".[dev]"
pytest

# uv
uv sync --all-extras
uv run pytest

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rotapool-0.1.0.tar.gz (47.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

rotapool-0.1.0-py3-none-any.whl (14.1 kB view details)

Uploaded Python 3

File details

Details for the file rotapool-0.1.0.tar.gz.

File metadata

  • Download URL: rotapool-0.1.0.tar.gz
  • Upload date:
  • Size: 47.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.10 {"installer":{"name":"uv","version":"0.11.10","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for rotapool-0.1.0.tar.gz
Algorithm Hash digest
SHA256 5f2896011d902a09185edb9a97b194c63f7dd5332d18c8a8916108ae9bf667f1
MD5 b8a4350b9855f63bd1ba1cf699e2fe88
BLAKE2b-256 4baee4def2f2d865196c4fc1faf1d5ed3cb07053bc3c0e9594b044a6247e3913

See more details on using hashes here.

File details

Details for the file rotapool-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: rotapool-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 14.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.10 {"installer":{"name":"uv","version":"0.11.10","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for rotapool-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 176ab213a263c550c1894fd09c6a0de8d9a7f6daa08332988b2cdac466b4c9ad
MD5 3d4963058058cc03f4d2dfcd7a7c6217
BLAKE2b-256 95bd6bb6122e74c7b26a1b608b5393e68c1e62311c5d145edb159cacb98da069

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page