Simple Multi-Resource Rate Limiting That Saves Unused Tokens. Rate limit API requests across different resources and workers without wasting your quota. Reserve tokens upfront, get refunds for what you don't use, and avoid over-limiting.
Project description
token-throttle
Multi-resource rate limiting for LLM APIs. Reserve tokens before you call, refund what you don't use, stay under the limit across workers.
Works with any LLM provider and any client library — token-throttle limits the rate, not the client.
pip install "token-throttle[redis,tiktoken]>=1.1.0,<1.2.0" # OpenAI + Redis (recommended)
pip install "token-throttle[redis]>=1.1.0,<1.2.0" # Any provider + Redis
pip install "token-throttle>=1.1.0,<1.2.0" # Any provider + in-memory
Quickstart
OpenAI (built-in helpers)
from openai import AsyncOpenAI
from token_throttle import create_openai_redis_rate_limiter
client = AsyncOpenAI()
limiter = create_openai_redis_rate_limiter(
redis_client, rpm=10_000, tpm=2_000_000,
)
# 1. Reserve capacity (blocks until available)
request = dict(model="gpt-4.1", messages=[{"role": "user", "content": "Hi"}])
reservation = await limiter.acquire_capacity_for_request(**request, extra_usage=None)
# 2. Make the API call
response = await client.chat.completions.create(**request)
# 3. Refund unused tokens
await limiter.refund_capacity_from_response(reservation, response)
Any provider (manual usage)
from token_throttle import RateLimiter, Quota, UsageQuotas, RedisBackendBuilder
from token_throttle import PerModelConfig
limiter = RateLimiter(
lambda model: PerModelConfig(
quotas=UsageQuotas([
Quota(metric="requests", limit=1_000, per_seconds=60),
Quota(metric="input_tokens", limit=80_000, per_seconds=60),
Quota(metric="output_tokens", limit=20_000, per_seconds=60),
]),
),
backend=RedisBackendBuilder(redis_client),
)
# Works with Anthropic, Gemini, local models — anything
reservation = await limiter.acquire_capacity(
model="claude-sonnet-4-20250514",
usage={"requests": 1, "input_tokens": 500, "output_tokens": 4_000},
)
response = await call_your_llm(...) # Use whatever client you want
await limiter.refund_capacity(
actual_usage={"requests": 1, "input_tokens": 480, "output_tokens": 1_200},
reservation=reservation,
)
# Unused 2,800 output tokens returned to the pool
Why token-throttle
The problem: You're running parallel LLM calls (batch processing, agents, multiple services sharing a key). Simple rate limiters waste throughput because they reserve worst-case tokens and never give them back. You hit 429s or crawl at half capacity.
The solution: Reserve before you call, refund after. Actual usage is tracked, not estimated maximums.
| Feature | Details |
|---|---|
| Multi-resource limits | Limit requests, tokens, input/output tokens — simultaneously, each with its own quota |
| Multiple time windows | e.g., 1,000 req/min AND 10,000 req/day on the same resource |
| Reserve & refund | Reserve max expected usage upfront, refund the difference after the call completes |
| Distributed | Redis backend with atomic locks — safe across workers and processes |
| Per-model quotas | Different limits per model via model_family; the built-in OpenAI helper auto-groups date-suffixed variants (e.g. gpt-4o-20241203 → gpt-4o) |
| Pluggable | Bring your own backend (ships with Redis and in-memory). Sync and async APIs |
| Observability | Callbacks for wait-start, wait-end, consume, refund, and missing-state events |
How it works
token-throttle implements a token bucket algorithm (capacity refills linearly over time, capped at the quota limit).
- Acquire — blocks until enough capacity is available, then atomically reserves it
- Call — make your API request with any client
- Refund — report actual usage; unused tokens return to the pool immediately
The Redis backend uses sorted locking to prevent deadlocks when acquiring multiple resource buckets simultaneously.
Configuration
Quotas
from token_throttle import Quota, UsageQuotas, SecondsIn
quotas = UsageQuotas([
Quota(metric="requests", limit=2_000, per_seconds=SecondsIn.MINUTE),
Quota(metric="tokens", limit=3_000_000, per_seconds=SecondsIn.MINUTE),
Quota(metric="requests", limit=10_000_000, per_seconds=SecondsIn.DAY),
])
per_seconds accepts integer seconds. Use SecondsIn.MINUTE (60), SecondsIn.HOUR (3600), SecondsIn.DAY (86400), or any integer.
Per-model configuration
def get_config(model_name: str) -> PerModelConfig:
if model_name.startswith("gpt"):
return PerModelConfig(
quotas=UsageQuotas([
Quota(metric="requests", limit=10_000, per_seconds=60),
Quota(metric="tokens", limit=2_000_000, per_seconds=60),
]),
usage_counter=OpenAIUsageCounter(), # auto-counts tokens from messages
model_family=openai_model_family_getter(model_name),
)
# ... other providers
limiter = RateLimiter(get_config, backend=RedisBackendBuilder(redis_client))
Backends
# Distributed (multiple workers/processes)
from token_throttle import RedisBackendBuilder
backend = RedisBackendBuilder(redis_client)
# Single process (no Redis needed)
from token_throttle import MemoryBackendBuilder
backend = MemoryBackendBuilder()
Both backends are available in sync (SyncRedisBackendBuilder, SyncMemoryBackendBuilder) and async variants.
Dynamic rate limits
Adjust bucket limits at runtime without rebuilding the limiter — useful for
adaptive rate limiting (e.g., reacting to x-ratelimit-* response headers):
# After at least one acquire/record call for this model:
await limiter.set_max_capacity(
model="gpt-4o",
metric="tokens",
per_seconds=60,
value=5000,
)
For Redis backends the new limit is written to Redis, so all processes sharing the same Redis see the change within ~1 second.
Timeout
By default, acquire_capacity blocks until enough capacity is available.
Use timeout to fail fast or cap the wait:
# Non-blocking: check if capacity is available without waiting
try:
reservation = await limiter.acquire_capacity(
model="gpt-4o",
usage={"requests": 1, "tokens": 500},
timeout=0, # Fail immediately if no capacity
)
except TimeoutError:
# Handle: retry later, use cheaper model, skip, etc.
pass
# Bounded wait: wait up to 5 seconds
reservation = await limiter.acquire_capacity(
model="gpt-4o",
usage={"requests": 1, "tokens": 500},
timeout=5.0, # Raise TimeoutError after 5s
)
Sync API
from token_throttle import SyncRateLimiter, SyncMemoryBackendBuilder
limiter = SyncRateLimiter(get_config, backend=SyncMemoryBackendBuilder())
reservation = limiter.acquire_capacity(model="gpt-4.1", usage={"requests": 1, "tokens": 500})
response = call_llm_sync(...)
limiter.refund_capacity(actual_usage={"requests": 1, "tokens": 320}, reservation=reservation)
Links
- Originally a rewrite of openlimit
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file token_throttle-1.1.0.tar.gz.
File metadata
- Download URL: token_throttle-1.1.0.tar.gz
- Upload date:
- Size: 44.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ac68d1eeb64cd96480c04c963750ada0c8be62dd71096f75886dfdbdc8c4333d
|
|
| MD5 |
90f633445f4c8acd41a0c93070216949
|
|
| BLAKE2b-256 |
4b6bd0b035fc60add1707de6db985671f528f3b4400a53bc068776bae90e5e1d
|
Provenance
The following attestation bundles were made for token_throttle-1.1.0.tar.gz:
Publisher:
release.yml on Elijas/token-throttle
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
token_throttle-1.1.0.tar.gz -
Subject digest:
ac68d1eeb64cd96480c04c963750ada0c8be62dd71096f75886dfdbdc8c4333d - Sigstore transparency entry: 1162742463
- Sigstore integration time:
-
Permalink:
Elijas/token-throttle@648e9cf35beeda86ababd4963ec73a10f9e7eeed -
Branch / Tag:
refs/heads/main - Owner: https://github.com/Elijas
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@648e9cf35beeda86ababd4963ec73a10f9e7eeed -
Trigger Event:
workflow_dispatch
-
Statement type:
File details
Details for the file token_throttle-1.1.0-py3-none-any.whl.
File metadata
- Download URL: token_throttle-1.1.0-py3-none-any.whl
- Upload date:
- Size: 58.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
26176bb78754509d62ed98e2e17951ca99e57a875126d55b2618319dc862b1cf
|
|
| MD5 |
bd9473ffa742f8699a57ba2101f9a2aa
|
|
| BLAKE2b-256 |
415451981eeae3a3b1c6dedb13a2b7f74496803cd4f6f05167b7b1f87bb6ed2b
|
Provenance
The following attestation bundles were made for token_throttle-1.1.0-py3-none-any.whl:
Publisher:
release.yml on Elijas/token-throttle
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
token_throttle-1.1.0-py3-none-any.whl -
Subject digest:
26176bb78754509d62ed98e2e17951ca99e57a875126d55b2618319dc862b1cf - Sigstore transparency entry: 1162742518
- Sigstore integration time:
-
Permalink:
Elijas/token-throttle@648e9cf35beeda86ababd4963ec73a10f9e7eeed -
Branch / Tag:
refs/heads/main - Owner: https://github.com/Elijas
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@648e9cf35beeda86ababd4963ec73a10f9e7eeed -
Trigger Event:
workflow_dispatch
-
Statement type: