Generic async resource pool with health-aware selection, cooldown, and retry
Project description
rotapool
Async resource pool with inline health feedback, automatic cooldown, and retry — for API keys, proxies, GPU workers, or anything that can rate-limit you or go down.
Designed for AI coding agents,
rotapoolexposes machine-readable usage notes via agent-readable: the operation contract, do/don't rules, anti-patterns, and failure modes. Teach your coding agent the agent-readable protocol once —npx skills add zydo/skills --skill agent-readable— and it will discover and read these docs on its own before generating code that uses
Pool.
Core idea
Most resource pools are passive — they hand out resources round-robin or at random, and rely on external health checks to detect and remove bad ones. rotapool closes that gap: every call through the pool is also a health probe. The pool learns from caller signals in real time and immediately adjusts which resources to offer — no external probers or manual updates needed.
Not every failure means the resource is bad — an HTTP 400 is your bug, but a 429 is the key's problem. You tell rotapool which is which by raising exceptions from inside your operation, and the pool reacts accordingly:
| Signal | Meaning |
|---|---|
| normal return / any other exception | Resource is healthy |
CooldownResource |
Temporarily overloaded (e.g. 429) |
DisableResource |
Permanently unusable (e.g. revoked key) |
rotapool handles the rest — picks the best resource, cools down bad ones, cancels doomed in-flight work, and retries automatically.
Install
pip install rotapool
# or
uv add rotapool
Requires Python 3.10+. Zero runtime dependencies; pip install "rotapool[agent]" adds the optional agent-readable integration.
Quick start
Initialize the pool
from rotapool import CooldownResource, DisableResource, Pool, Resource
# Define your resources (e.g. API keys)
pool = Pool(
# A list of Resource objects, or a dict whose keys match each resource_id.
resources=[
Resource(
resource_id="key-1", # Unique identifier (used in logs, metrics, snapshot)
value="sk-aaa", # The actual resource value (generic type T)
# max_in_flight=None, # Max concurrent usages per resource (None = unlimited)
),
Resource(resource_id="key-2", value="sk-bbb"),
Resource(resource_id="key-3", value="sk-ccc"),
],
max_attempts=3, # Total retry budget per run() call (capped at len(resources))
cooldown_table=(30.0, 120.0, 300.0, 600.0), # Escalation: 1st=30s, 2nd=120s, 3rd=300s, 4th+=600s
)
Option 1: Use the decorator
# Resource selection happens automatically per the pool's strategy (round_robin by default).
# All parameters are optional and forward to pool.run() on every call.
@pool.use(
max_attempts=None, # Override the pool's max_attempts for this decorated function
deadline=None, # Absolute time.monotonic() deadline; None = no deadline
retry_delay=0.5, # Base pause between failed attempts (jittered ±50%)
wait_for_cooldown=False, # Wait out the earliest cooldown instead of failing fast
)
async def call_upstream(resource, url, payload):
async with httpx.AsyncClient() as client:
resp = await client.post(
url,
headers={"Authorization": f"Bearer {resource.value}"},
json=payload,
)
if resp.status_code == 429:
raise CooldownResource(
cooldown_seconds=parse_retry_after(resp.headers.get("retry-after")),
reason="rate limited",
)
if resp.status_code == 401:
raise DisableResource(reason="invalid key")
return resp.json()
# Call it — the framework picks the best key and retries on failure
result = await call_upstream("https://api.example.com/v1/chat", {"prompt": "hi"})
Option 2: Direct run()
@pool.use() is a thin shim over pool.run(), but it only accepts the policy knobs that are safe to fix at decoration time (max_attempts, deadline, retry_delay, wait_for_cooldown). Anything that needs to vary per call must go through run() directly — most notably request_id, which is meant to correlate with caller-side context (e.g. an inbound HTTP request id) and would be wrong to bake into the decorator. Use run() directly when you want per-call overrides or when the call site can't be decorated:
async def call_upstream(resource, url, payload):
async with httpx.AsyncClient() as client:
resp = await client.post(
url,
headers={"Authorization": f"Bearer {resource.value}"},
json=payload,
)
if resp.status_code == 429:
raise CooldownResource(reason="rate limited")
if resp.status_code == 401:
raise DisableResource(reason="invalid key")
return resp.json()
# Operation receives the selected Resource as its first argument.
result = await pool.run(
lambda resource: call_upstream(resource, "https://api.example.com/v1/chat", {"prompt": "hi"}),
max_attempts=None, # Override the pool's max_attempts for this call only
deadline=time.monotonic() + 30, # Gates the start of each attempt (not in-flight work); None = no deadline
retry_delay=0.5, # Base pause between failed attempts (jittered ±50%)
wait_for_cooldown=False, # Wait out the earliest cooldown instead of failing fast
request_id="req-abc", # Opaque string attached to every Usage; auto-UUID when None
)
How it works
Selection
Among resources that are not disabled, not cooling down, and not at max_in_flight, the pool picks one according to its strategy:
"round_robin"(default) — fewest in-flight usages first, then oldestlast_acquired_at. Best-effort fairness across resources; the pool can't predict how long a usage will hold a slot, so this only balances by acquisition time."primary_backup"— return the first eligible resource in the original list/dict order. Later resources are only used when earlier ones are cooling down, disabled, or at capacity. Resource ordering is load-bearing under this strategy.
Selection and usage registration are atomic under one lock acquisition.
Example: primary_backup for a paid-tier primary with a free-tier fallback
Send every request to the paid key first; only spill over to the free key when the primary is rate-limited (cooling down) or the paid quota is exhausted (disabled). Resource ordering in the list is the priority ranking — the pool will never reach for free-fallback while paid-primary is still healthy and below capacity.
pool = Pool(
resources=[
Resource(resource_id="paid-primary", value="sk-paid-..."),
Resource(resource_id="free-fallback", value="sk-free-..."),
],
strategy="primary_backup",
)
Example: primary_backup with a capacity cap to fan out under bursts
Combine max_in_flight on the primary with primary_backup to get "use the primary up to N concurrent calls, then overflow to the next tier." Useful when the primary is fastest/cheapest but has a hard concurrency limit you don't want to breach.
pool = Pool(
resources=[
Resource(resource_id="region-us", value=us_client, max_in_flight=8),
Resource(resource_id="region-eu", value=eu_client, max_in_flight=8),
Resource(resource_id="region-asia", value=asia_client),
],
strategy="primary_backup",
)
# 1st–8th concurrent calls -> region-us
# 9th–16th -> region-eu (us is at capacity)
# 17th+ -> region-asia (eu is at capacity)
Cooldown escalation
Each consecutive CooldownResource from the same resource escalates the cooldown:
| Consecutive count | Cooldown |
|---|---|
| 1st | 30s |
| 2nd | 120s |
| 3rd | 300s |
| 4th+ | 600s |
You can override per-event: CooldownResource(cooldown_seconds=5) (e.g. from a Retry-After header). The counter resets on the next success.
Custom tables are supported per pool:
pool = Pool(
resources=[...],
cooldown_table=(10.0, 30.0, 60.0, 120.0),
)
In-flight cancellation (best-effort)
When a resource receives a CooldownResource or DisableResource signal, the framework cancels younger in-flight usages on the same resource. Older usages are left alone — they may still succeed. This maximises throughput while avoiding doomed requests.
Cancellation is best-effort: it works when the operation returns a coroutine (the framework wraps it in an asyncio.Task) or an asyncio.Future (cancelled directly). For plain awaitables with no .cancel() handle, cancellation silently no-ops for that usage and it runs to natural completion. Within a coroutine, the underlying I/O is only truly aborted if the operation uses cancellation-aware async libs (httpx.AsyncClient, aiohttp).
Retry
pool.run() drives the retry loop. @pool.use() is a thin decorator shim over it. Attempts are capped at min(max_attempts, len(resources)) — more retries than resources is pointless.
The pause between attempts is jittered: retry_delay * uniform(0.5, 1.5), mean retry_delay. Without jitter, concurrent calls that hit the same cooldown would all wake at the same instant and stampede the next eligible resource.
By default, run() fails fast: when no resource is eligible at the start of an attempt, it raises PoolExhausted immediately — even if a deadline would outlive the cooldowns. Pass wait_for_cooldown=True to instead sleep until the earliest cooldown_until among cooling resources and select again. Only a cooldown gives a known wake-up time, so this never waits on resources that are disabled or at max_in_flight — if nothing is cooling, PoolExhausted raises as usual. With a deadline, the wait only happens when the earliest cooldown ends before it; otherwise PoolExhausted raises immediately rather than sleeping out a wait that provably cannot help.
The wake-up is jittered too: each waiter sleeps an extra retry_delay * uniform(0, 1) past the expiry (capped by deadline), so concurrent waiters don't all fire at the recovered resource in the same instant. As with the retry pause, retry_delay=0 disables the jitter.
Waiters also react to admin calls: pool.enable() wakes them so they can acquire the now-eligible resource immediately, and pool.disable() wakes them so they can re-evaluate (and fail fast) instead of sleeping out a cooldown that no longer matters.
Admin control
pool.enable(resource_id) and pool.disable(resource_id) give operators write access to resource lifecycle state — the counterpart to snapshot():
disable()removes a resource from selection untilenable()is called. Unlike an operation raisingDisableResource, in-flight usages are not cancelled — admin disable is policy, not failure evidence, so running work (which may already have upstream side effects) finishes naturally.enable()returns a resource to selection: it clears both the disabled state and any active cooldown, and resetsconsecutive_cooldownto 0 — enable means "the operator says this resource is usable now" (e.g. a rotated key), so if the operator is wrong, escalation restarts from the firstcooldown_tableslot rather than resuming where it left off.
Both are async (they take the pool lock), idempotent, raise KeyError for an unknown resource_id, and wake any run(wait_for_cooldown=True) sleepers so they re-evaluate immediately.
Cancellation discrimination
The framework distinguishes external cancellation (client disconnect, shutdown — re-raised) from internal cancellation (resource failure — swallowed and retried) by checking usage.status. The cooldown/disable handler sets the status to "cancelled" under the pool lock before invoking .cancel() on the handle, so observing that status when CancelledError arrives means "we cancelled ourselves" — except for the one-tick edge case described in the cancellation gotcha. Works on any Python 3.10+.
API reference
rotapool.Pool[T]
pool = Pool(
resources: list[Resource[T]] | dict[str, Resource[T]],
# resources: A list of Resource objects, or a dict mapping resource_id -> Resource.
# Duplicate resource_ids in list form raise ValueError, as does a
# dict key that does not match its Resource's resource_id.
max_attempts: int = 3,
# max_attempts: Total retry budget per run() call. Each attempt picks among
# currently eligible resources — one that triggered cooldown or
# disable stays ineligible while that state lasts (a zero-second
# cooldown can make it eligible again immediately). Effectively
# capped at len(resources); raises PoolExhausted once spent.
cooldown_table: tuple[float, ...] = (30.0, 120.0, 300.0, 600.0),
# cooldown_table: Escalation table indexed by consecutive_cooldown count.
# 1st cooldown → cooldown_table[0], 2nd → cooldown_table[1], etc.
# Out-of-range values clamp to the last entry.
strategy: Literal["round_robin", "primary_backup"] = "round_robin",
# strategy: Selection policy among eligible resources. "round_robin"
# (default) balances by fewest in-flight, then oldest
# last_acquired_at. "primary_backup" returns the first eligible
# resource in list/dict order (ordering is the priority ranking).
# Pool-level by design; not overridable per call. See "Selection".
)
await pool.run(
operation: Callable[[Resource[T]], Awaitable[R]],
# operation: Callable receiving the selected Resource and returning an
# Awaitable. Raise CooldownResource or DisableResource to
# signal resource health. Any other exception is treated as
# "resource is fine" and propagates to the caller.
# Accepted return types:
# - coroutine (typical async def) -- cancellable
# - asyncio.Future (e.g. loop.create_future) -- cancellable
# - any Awaitable (custom __await__) -- best-effort
# Returning a non-Awaitable raises TypeError at call time.
*, # All following parameters are keyword-only.
max_attempts: int | None = None,
# max_attempts: Per-call override of the pool's max_attempts. None = use pool default.
deadline: float | None = None,
# deadline: Absolute time.monotonic() value that gates when each attempt may
# start and caps the inter-attempt pause. Does NOT interrupt an
# in-flight operation, so a single long call can overrun it. Raises
# PoolExhausted when a new attempt would start past it. None = none.
retry_delay: float = 0.5,
# retry_delay: Base pause (seconds) between failed attempts. Must be >= 0.
# The actual pause is jittered to retry_delay * uniform(0.5, 1.5)
# (mean stays retry_delay) so concurrent callers don't retry in
# lockstep and stampede the next eligible resource.
wait_for_cooldown: bool = False,
# wait_for_cooldown: When no resource is eligible at the start of an attempt,
# sleep until the earliest cooldown_until among cooling resources
# and select again, instead of raising PoolExhausted immediately.
# Never waits on disabled or saturated resources (no known wake-up
# time); raises immediately when the earliest cooldown ends at or
# after the deadline. The wake-up is jittered by an extra
# retry_delay * uniform(0, 1), capped by the deadline, to avoid
# waiter stampedes at the expiry instant. False = fail fast (default).
request_id: str | None = None,
# request_id: Opaque string attached to every Usage created by this call.
# Auto-generated UUID when None.
) -> R
@pool.use(
max_attempts: int | None = None, # Per-call override; None = use pool default
deadline: float | None = None, # Absolute time.monotonic() deadline
retry_delay: float = 0.5, # Base pause between failed attempts (jittered ±50%)
wait_for_cooldown: bool = False, # Wait out the earliest cooldown instead of failing fast
)
# Returns a decorator. The decorated function receives a Resource[T] as its
# first positional argument (injected by the wrapper), followed by caller args.
# Any callable returning an Awaitable is accepted (async def, sync function
# returning a coroutine / Future / awaitable). A callable that returns a
# non-Awaitable raises TypeError at call time.
pool.snapshot() -> dict[str, dict[str, Any]]
# Returns a point-in-time summary of every resource. Thread-safe without the lock.
# A resource whose cooldown has expired is reported as "healthy" even though the
# stored status only flips on the next acquire.
# Example return value:
# {
# "key-1": {
# "status": "healthy", # "healthy" | "cooling_down" | "disabled"
# "in_flight": 2, # Current in-flight usage count
# "consecutive_cooldown": 0, # Escalation counter
# "cooldown_seconds_remaining": 0.0, # Seconds until cooldown expires (0 if healthy)
# "last_acquired_at": 12345.67, # time.monotonic() of last acquire
# },
# ...
# }
await pool.enable(resource_id: str) -> None
# Administratively return a resource to selection. Clears both the disabled state
# and any active cooldown, and resets consecutive_cooldown to 0 (a later failure
# escalates from the first cooldown_table slot). Idempotent on a healthy resource.
# Raises KeyError for an unknown resource_id.
await pool.disable(resource_id: str) -> None
# Administratively remove a resource from selection until enable(). In-flight
# usages are NOT cancelled (unlike an operation raising DisableResource) — they
# run to natural completion. Idempotent on an already-disabled resource.
# Raises KeyError for an unknown resource_id.
rotapool.Resource[T]
resource = Resource(
resource_id: str,
# resource_id: Unique identifier for this resource. Must be non-empty.
value: T,
# value: The actual resource object (API key, proxy URL, etc.).
max_in_flight: int | None = None,
# max_in_flight: Maximum concurrent usages. None = unlimited, 1 = exclusive.
# Must be >= 1 or None.
status: Literal["healthy", "cooling_down", "disabled"] = "healthy",
# status: Current health. Managed by the framework — do not set
# manually.
cooldown_until: float = 0.0,
# cooldown_until: time.monotonic() deadline when status is "cooling_down".
# Managed by the framework — do not set manually.
last_acquired_at: float = 0.0,
# last_acquired_at: time.monotonic() of most recent acquire. Affects selection
# order (oldest first). Managed by the framework.
consecutive_cooldown: int = 0,
# consecutive_cooldown: Number of consecutive CooldownResource signals. Indexes into
# the pool's cooldown_table. Resets to 0 on next success.
# Managed by the framework — do not set manually.
)
Exceptions
| Exception | Who raises it | Meaning |
|---|---|---|
CooldownResource |
Your operation | Resource temporarily over capacity |
DisableResource |
Your operation | Resource permanently bad (only pool.enable() brings it back) |
PoolExhausted |
Framework | No eligible resource, max attempts reached, deadline passed, or (with wait_for_cooldown) waiting cannot beat the deadline |
raise CooldownResource(
cooldown_seconds: float | None = None,
# Explicit cooldown duration (e.g. from Retry-After header). Must be >= 0
# (negative and NaN raise ValueError at construction).
# None = use the pool's cooldown_table based on consecutive_cooldown count.
reason: str | None = None,
# Free-form string surfaced in the exception message and logs.
)
raise DisableResource(
reason: str | None = None,
# Free-form string surfaced in the exception message and logs.
)
Resource types
rotapool is generic — T can be anything:
# API keys (string bearer tokens)
Resource(resource_id="key-1", value="sk-...")
# HTTP proxies
Resource(resource_id="proxy-1", value="http://proxy:8080", max_in_flight=10)
# Browser sessions (exclusive)
Resource(resource_id="session-1", value=<webdriver>, max_in_flight=1)
# GPU workers
Resource(resource_id="gpu-0", value="cuda:0", max_in_flight=1)
Operation shapes
pool.run and @pool.use accept any callable that returns an Awaitable. The framework picks the cancellation strategy at runtime based on what the callable returns:
# 1. async def -- the typical case. Cancellation is full-strength: the
# framework wraps the coroutine in a Task and cancels younger siblings
# via task.cancel() on resource failure.
@pool.use()
async def call_async(resource, payload):
async with httpx.AsyncClient() as client:
return await client.post(url, json=payload,
headers={"Authorization": f"Bearer {resource.value}"})
# 2. Sync function returning a coroutine -- previously rejected, now accepted.
# Useful when you want to construct the coroutine yourself or thread args.
@pool.use()
def call_returning_coro(resource, payload):
return some_async_helper(resource.value, payload) # returns a coroutine
# 3. Sync function returning an asyncio.Future -- accepted and cancellable
# via Future.cancel(). Useful for executor wrappers.
@pool.use()
def call_in_thread(resource, payload):
loop = asyncio.get_running_loop()
return loop.run_in_executor(None, blocking_request, resource.value, payload)
# 4. Anything returning a plain Awaitable (custom __await__) is also accepted,
# but with no cancel handle: younger sibling cancellation silently no-ops
# for this usage and it runs to natural completion (best-effort).
A callable that returns a non-Awaitable (e.g. a plain int) raises TypeError at call time. The resource is marked healthy (your bug, not the resource's) and the error propagates to the caller.
Pitfalls
Anti-pattern: doing the real work outside run()
The pool only sees what happens inside the operation. Returning a client or handle from run() and using it afterwards means every later failure is invisible — the attempt is already recorded as success and the cooldown counter was reset.
# WRONG — the actual API call is outside the pool's view.
client = await pool.run(lambda r: build_client(r.value))
response = await client.get("/things") # invisible to pool
# RIGHT — the call lives inside the operation, so 429s reach the pool.
async def fetch(resource):
client = build_client(resource.value)
try:
return await client.get("/things")
except RateLimited as e:
raise CooldownResource(cooldown_seconds=e.retry_after)
response = await pool.run(fetch)
Return only plain values (bytes, dict, dataclass) from operations. For N backend calls, make N run() invocations.
Don't
- Don't raise
CooldownResourcefor business errors (404, validation failures). The next resource will return the same error and burn the retry budget for nothing — these belong in normal exceptions or return values. - Don't catch and swallow exceptions inside the operation. The pool needs to see
CooldownResource/DisableResourceto update health; swallowing them turns rate limits into invisible successes. - Don't mutate
Resourcefields from outside the pool.status,cooldown_until,last_acquired_at, andconsecutive_cooldownare framework-owned lifecycle state. For administrative control, useawait pool.enable(id)/await pool.disable(id)instead. - Don't share one
Poolacross asyncio event loops. The internal lock binds to the loop where it was first awaited; reusing the pool from a different loop is undefined behaviour.
Gotcha: cancellation only hits younger siblings
When a resource raises CooldownResource or DisableResource, the framework cancels younger in-flight usages on that resource and retries them elsewhere. Older usages are left to run to completion — they may already have side effects upstream that you can't unwind.
asyncio.CancelledError from this sibling cancellation is swallowed by the framework and the affected usages retry on a fresh resource; only outer caller cancellation propagates back to the caller.
One known edge: if an outer cancellation lands in the same event-loop tick as an internal sibling cancellation, only one CancelledError is delivered and it is classified as internal — the external cancel is absorbed for that attempt and run() retries. This is a deliberate trade-off for Python 3.10 compatibility (3.11+ Task.cancelling() could disambiguate). The window is a single tick; a caller that must stop can simply cancel again.
Testing
# pip (>= 25.1 for --group)
pip install -e . --group dev
pytest
# uv
uv sync --all-extras
uv run pytest
License
MIT
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file rotapool-0.2.0.tar.gz.
File metadata
- Download URL: rotapool-0.2.0.tar.gz
- Upload date:
- Size: 59.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
bd91d06d2cc566090cadc10209d77fe263590a4d6366b90a6e4b3bb5fc39f12f
|
|
| MD5 |
a7ca2451e5c87773a48ec0bf44de1c20
|
|
| BLAKE2b-256 |
ae8f8a8c8f4159197993f5f643fc694813845fc2879efd4ca9ce85d6146e9b3c
|
Provenance
The following attestation bundles were made for rotapool-0.2.0.tar.gz:
Publisher:
publish.yml on zydo/rotapool
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
rotapool-0.2.0.tar.gz -
Subject digest:
bd91d06d2cc566090cadc10209d77fe263590a4d6366b90a6e4b3bb5fc39f12f - Sigstore transparency entry: 1796319365
- Sigstore integration time:
-
Permalink:
zydo/rotapool@9c2f308060dd808db229b88cc6d4692053734214 -
Branch / Tag:
refs/tags/v0.2.0 - Owner: https://github.com/zydo
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@9c2f308060dd808db229b88cc6d4692053734214 -
Trigger Event:
push
-
Statement type:
File details
Details for the file rotapool-0.2.0-py3-none-any.whl.
File metadata
- Download URL: rotapool-0.2.0-py3-none-any.whl
- Upload date:
- Size: 23.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6ee7d1a56dbad22b8ba1577aad89fc2d1abbba81940e0869663e04e336cad63c
|
|
| MD5 |
987a0bbe85e96ff324c2beb6309c5487
|
|
| BLAKE2b-256 |
73f3973706efc2e74f08be5153a40abee77ff9dfeaf93e8058c7cea8092de9b8
|
Provenance
The following attestation bundles were made for rotapool-0.2.0-py3-none-any.whl:
Publisher:
publish.yml on zydo/rotapool
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
rotapool-0.2.0-py3-none-any.whl -
Subject digest:
6ee7d1a56dbad22b8ba1577aad89fc2d1abbba81940e0869663e04e336cad63c - Sigstore transparency entry: 1796319633
- Sigstore integration time:
-
Permalink:
zydo/rotapool@9c2f308060dd808db229b88cc6d4692053734214 -
Branch / Tag:
refs/tags/v0.2.0 - Owner: https://github.com/zydo
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@9c2f308060dd808db229b88cc6d4692053734214 -
Trigger Event:
push
-
Statement type: