Python Rate-Limiter using Leaky-Bucket Algorithm
Project description
PyrateLimiter
A fast, async-friendly rate limiter for Python — Leaky-Bucket algorithm with pluggable backends.
Documentation · Quickstart · Backends · Migrating from v3
[!NOTE] Upgrading from v3.x? The v4 API is simpler and has breaking changes — see the Migration Guide.
Contents
- Features
- Installation
- Quickstart
- How it works
- Core concepts
- Defining rates & buckets
- Everyday usage
- Backends
- Web request rate limiting
- Advanced usage
- Examples
Features
- 🪣 Leaky-bucket algorithm — smooth, well-understood rate limiting.
- ⏱️ Multiple rates at once — e.g. 5/second and 1000/hour on the same key.
- 🔑 Per-key limits — track different services, users, or resources independently.
- 🧩 Pluggable backends — in-memory, SQLite, Redis (sync and async), Postgres, and multiprocessing.
- ⚡ Sync & async — the same API works in both; async paths never block the event loop.
- 🎀 Direct calls or decorators —
limiter.try_acquire(...)or@limiter.as_decorator(...). - 🚦 Blocking or non-blocking — wait for a permit (with optional timeout) or fail fast.
Installation
PyrateLimiter requires Python 3.10+.
pip install pyrate-limiter
# or
conda install --channel conda-forge pyrate-limiter
Optional backends pull in their own drivers:
pip install "pyrate-limiter[all]" # redis + psycopg (Postgres) + filelock
Quickstart
Limit to 5 requests per 2 seconds:
from pyrate_limiter import Duration, Rate, Limiter
# A Limiter with a single rate, backed by an in-memory bucket
limiter = Limiter(Rate(5, Duration.SECOND * 2))
# Blocking (default): waits until a permit is available
for i in range(6):
limiter.try_acquire("my-resource")
print(f"acquired {i}")
# Non-blocking: returns False immediately when the bucket is full
if not limiter.try_acquire("my-resource", blocking=False):
print("rate limited!")
Prefer a one-liner? The limiter_factory covers the common cases:
from pyrate_limiter import Duration, limiter_factory
limiter = limiter_factory.create_inmemory_limiter(rate_per_duration=5, duration=Duration.SECOND)
limiter.try_acquire("my-resource")
How it works
flowchart TB
user([Your application]):::ext
user -->|"limit a key: try_acquire · try_acquire_async · @as_decorator"| limiter
subgraph pyrate["PyrateLimiter"]
direction TB
limiter["Limiter<br/>public API"]:::api
factory["BucketFactory<br/>routes each key to its bucket"]:::core
leaker["Leaker<br/>background cleanup"]:::leak
clock(["Clock<br/>time source"]):::clk
limiter --> factory
factory -->|routes to| backends
leaker -.->|periodic leak| backends
backends -->|reads time from| clock
subgraph backends["Bucket backend — choose one"]
direction LR
mem["InMemory"]:::bkt
sqlite["SQLite"]:::bkt
redis["Redis<br/>sync · async"]:::bkt
pg["Postgres"]:::bkt
mp["Multiprocess"]:::bkt
end
end
classDef api fill:#E5484D,color:#ffffff,stroke:#E5484D;
classDef core fill:#242A33,color:#ffffff,stroke:#242A33;
classDef bkt fill:#EEF2F6,color:#242A33,stroke:#CBD5E1;
classDef clk fill:#ffffff,color:#242A33,stroke:#242A33;
classDef leak fill:#F2C94C,color:#242A33,stroke:#E0B53C;
classDef ext fill:#ffffff,color:#242A33,stroke:#9AA5B1;
The bucket analogy — this library implements the Leaky Bucket algorithm:
- A bucket represents a fixed capacity (a service, an API quota, …).
- The bucket fills as requests arrive and leaks at a constant rate — the permitted request rate.
- When the bucket is full, new requests are delayed (blocking) or rejected (non-blocking).
Core concepts
| Component | Role |
|---|---|
Clock |
Timestamps incoming items. Only needs now() -> int (sync or async). |
Bucket |
Stores timestamped items; enforces the rates; leak()s out expired items. |
BucketFactory |
Timestamps & routes each item to the right bucket; schedules background leaking. |
Limiter |
The friendly façade. Sync/async, blocking/non-blocking, direct call or decorator; thread-safe via RLock (and asyncio.Lock when async). |
For simple cases you only ever touch Limiter and Rate — the rest is wired up for you.
Defining rates & buckets
An API might allow 500/hour, 1000/day, and 10000/month. Express each as a Rate(limit, interval):
from pyrate_limiter import Duration, Rate
rates = [
Rate(500, Duration.HOUR), # 500 requests per hour
Rate(1000, Duration.DAY), # 1000 requests per day
Rate(10000, Duration.WEEK * 4), # ~10000 requests per month
]
[!IMPORTANT] Rates must be ordered generous-to-tight: increasing interval, increasing limit, and a non-increasing
limit/intervalratio. Ill-formed lists raiseValueErrorat construction. Check a list yourself withvalidate_rate_list(rates).
Pass the rates straight to a Limiter (uses an in-memory bucket), or build a specific bucket:
from pyrate_limiter import InMemoryBucket, Limiter
limiter = Limiter(rates) # shortcut: in-memory bucket
# equivalent to:
limiter = Limiter(InMemoryBucket(rates))
limiter.try_acquire("hello world")
See Backends for Redis, SQLite, Postgres, and multiprocessing.
Everyday usage
Blocking, non-blocking & timeout
try_acquire blocks by default until a permit frees up:
from pyrate_limiter import Rate, Limiter, Duration
limiter = Limiter(Rate(3, Duration.SECOND))
for i in range(5):
limiter.try_acquire("item") # blocks when the bucket is full
Fail fast instead with blocking=False:
if not limiter.try_acquire("item", blocking=False):
print("rate limited!")
In async code use try_acquire_async, optionally with a timeout (seconds):
acquired = await limiter.try_acquire_async("item", timeout=5)
if not acquired:
print("timed out waiting for a permit")
The buffer_ms Limiter parameter (default 50) adds a small slack to absorb clock drift:
from pyrate_limiter import Rate, Duration, InMemoryBucket, Limiter
bucket = InMemoryBucket([Rate(5, Duration.SECOND)])
limiter = Limiter(bucket, buffer_ms=100)
Weight
Items can carry weight (default 1). An item of weight W consumes W unit-slots atomically — either all W fit or none do:
BigItem(weight=5, name="item") → 5 × item(weight=1, name="item", same timestamp)
limiter.try_acquire("the-sun", weight=10)
Decorator
as_decorator wraps any sync or async function:
from pyrate_limiter import Rate, Duration, Limiter
limiter = Limiter(Rate(5, Duration.SECOND))
@limiter.as_decorator(name="api_call", weight=1)
def handle_something(*args, **kwargs):
...
@limiter.as_decorator(name="background_job", weight=2)
async def handle_something_async(*args, **kwargs):
...
Context manager
Limiter releases its resources (background leak tasks, connections) on exit:
from pyrate_limiter import Rate, Duration, Limiter
with Limiter(Rate(5, Duration.SECOND)) as limiter:
limiter.try_acquire("item")
# resources released here
# …or close manually
limiter = Limiter(Rate(5, Duration.SECOND))
try:
limiter.try_acquire("item")
finally:
limiter.close()
asyncio & event loops
Use try_acquire_async so waiting uses asyncio.sleep instead of blocking the loop. With a sync bucket, wrap it in BucketAsyncWrapper:
from pyrate_limiter import BucketAsyncWrapper, InMemoryBucket, Rate, Duration, Limiter
limiter = Limiter(BucketAsyncWrapper(InMemoryBucket([Rate(5, Duration.SECOND)])))
await limiter.try_acquire_async("item")
Backends
| Backend | Sync | Async | Persistent | Multi-process | Best for |
|---|---|---|---|---|---|
| InMemoryBucket | ✅ | (wrap) | ❌ | ❌ | single process, fastest |
| SQLiteBucket | ✅ | ❌ | ✅ | ✅ (file lock) | persistence / one host, many processes |
| RedisBucket | ✅ | ✅ | ✅ | ✅ | distributed across hosts |
| PostgresBucket | ✅ | ❌ | ✅ | ✅ | distributed, already on Postgres |
| MultiprocessBucket | ✅ | (wrap) | ❌ | ✅ | a single multiprocessing pool |
| BucketAsyncWrapper | — | ✅ | — | — | make any sync bucket async-safe |
Every bucket takes a List[Rate].
InMemoryBucket
from pyrate_limiter import InMemoryBucket, Rate, Duration
bucket = InMemoryBucket([Rate(5, Duration.MINUTE * 2)])
RedisBucket
Stores items in a sorted set (key = item name, score = timestamp). Use the init classmethod — it works for sync and async clients (just await it for async):
from pyrate_limiter import RedisBucket, Rate, Duration
rates = [Rate(5, Duration.MINUTE * 2)]
# sync
from redis import ConnectionPool, Redis
redis_db = Redis(connection_pool=ConnectionPool.from_url("redis://localhost:6379"))
bucket = RedisBucket.init(rates, redis_db, "bucket-key")
# async
from redis.asyncio import ConnectionPool as AsyncPool, Redis as AsyncRedis
redis_db = AsyncRedis(connection_pool=AsyncPool.from_url("redis://localhost:6379"))
bucket = await RedisBucket.init(rates, redis_db, "bucket-key")
SQLiteBucket
Persists state to SQLite (sync only):
from pyrate_limiter import SQLiteBucket, Rate, Duration, Limiter
rate = Rate(5, Duration.MINUTE)
# set use_file_lock=True to share one DB file across processes on a host
bucket = SQLiteBucket.init_from_file([rate], use_file_lock=False)
limiter = Limiter(bucket)
init_from_file(rates, table="rate_bucket", db_path=None, create_new_table=True, use_file_lock=False) — db_path=None uses a temp file; use_file_lock=True uses filelock for multi-process access on a single host.
PostgresBucket
Requires psycopg[pool] (install via the [all] extra). Sync only. Use the built-in PostgresClock, or a custom time source:
from pyrate_limiter import PostgresBucket, Rate, PostgresClock
from psycopg_pool import ConnectionPool
pool = ConnectionPool("postgresql://postgres:postgres@localhost:5432")
bucket = PostgresBucket(pool, "my_bucket_table", [Rate(3, 1000), Rate(4, 1500)])
MultiprocessBucket
Shares a ListProxy across a multiprocessing pool / ProcessPoolExecutor, guarded by a multiprocessing lock. See in_memory_multiprocess.py.
Under contention
bucket.waitingestimates can be off, so prefertry_acquire(..., blocking=True)(the default) — the item keeps retrying instead of returningFalseon a transient miss.
BucketAsyncWrapper
Wraps a sync bucket so every method returns an awaitable, letting the Limiter use asyncio.sleep during delays. See asyncio & event loops.
Web request rate limiting
Drop-in helpers for the popular HTTP clients live in pyrate_limiter.extras:
AIOHTTP
from pyrate_limiter import Duration, limiter_factory
from pyrate_limiter.extras.aiohttp_limiter import RateLimitedSession
limiter = limiter_factory.create_inmemory_limiter(rate_per_duration=2, duration=Duration.SECOND)
session = RateLimitedSession(limiter)
HTTPX
import httpx
from pyrate_limiter import Duration, limiter_factory
from pyrate_limiter.extras.httpx_limiter import AsyncRateLimiterTransport, RateLimiterTransport
limiter = limiter_factory.create_inmemory_limiter(rate_per_duration=1, duration=Duration.SECOND)
with httpx.Client(transport=RateLimiterTransport(limiter=limiter)) as client:
client.get("https://example.com")
async with httpx.AsyncClient(transport=AsyncRateLimiterTransport(limiter=limiter)) as client:
await client.get("https://example.com")
Requests
from pyrate_limiter import Duration, limiter_factory
from pyrate_limiter.extras.requests_limiter import RateLimitedRequestsSession
limiter = limiter_factory.create_inmemory_limiter(rate_per_duration=2, duration=Duration.SECOND)
session = RateLimitedRequestsSession(limiter)
Advanced usage
Custom routing with BucketFactory
When items must be routed to different buckets (per user, per endpoint, …), implement a BucketFactory. At minimum, define wrap_item and get:
from pyrate_limiter import (
AbstractBucket, BucketFactory, RateItem, MonotonicClock,
InMemoryBucket, Rate, Duration, Limiter,
)
class SingleRouteFactory(BucketFactory):
def __init__(self, clock, bucket):
self.clock = clock
self.bucket = bucket
self.schedule_leak(bucket) # run background leaking for this bucket
def wrap_item(self, name: str, weight: int = 1) -> RateItem:
return RateItem(name, self.clock.now(), weight=weight)
def get(self, _item: RateItem) -> AbstractBucket:
return self.bucket
To create buckets on demand, use self.create(bucket_class, *args, **kwargs) — it builds the bucket and schedules its leak:
class PerNameFactory(BucketFactory):
def __init__(self, clock):
self.clock = clock
self.buckets = {}
def wrap_item(self, name: str, weight: int = 1) -> RateItem:
return RateItem(name, self.clock.now(), weight=weight)
def get(self, item: RateItem) -> AbstractBucket:
if item.name not in self.buckets:
self.buckets[item.name] = self.create(InMemoryBucket, [Rate(5, Duration.SECOND)])
return self.buckets[item.name]
Then hand the factory to a Limiter:
limiter = Limiter(SingleRouteFactory(MonotonicClock(), InMemoryBucket([Rate(5, Duration.SECOND)])))
limiter.try_acquire("the-earth")
limiter.try_acquire("the-sun", weight=100)
Custom & distributed clocks
In v4 each bucket owns its time source via bucket.now() — the Limiter no longer takes a clock= parameter. To make distributed workers agree on "now" (e.g. a shared Redis/DB clock), either override now() on a bucket subclass (works on every backend, keeps leak consistent), or assign a clock to buckets that delegate to self._clock (e.g. InMemoryBucket, PostgresBucket):
from pyrate_limiter import AbstractClock, InMemoryBucket, RedisBucket, Rate, Duration
class RedisClock(AbstractClock):
def __init__(self, redis):
self.redis = redis
def now(self) -> int:
seconds, microseconds = self.redis.time()
return seconds * 1000 + microseconds // 1000
# Option A — override now() (recommended)
class RedisTimeBucket(RedisBucket):
def now(self) -> int:
seconds, microseconds = self.redis.time()
return seconds * 1000 + microseconds // 1000
# Option B — inject a clock into a bucket that uses self._clock
bucket = InMemoryBucket([Rate(5, Duration.SECOND)])
bucket._clock = RedisClock(redis_client)
Built-in clocks: MonotonicClock (default), MonotonicAsyncClock, PostgresClock, SQLiteClock.
Leaking
Buckets shouldn't hold items forever. Each bucket implements leak(current_timestamp=None) to drop expired items, and BucketFactory.schedule_leak(bucket) runs that in the background (default interval 10 s):
factory.schedule_leak(bucket) # background leak for this bucket
Change the interval (in milliseconds) via the leak_interval property:
class MyFactory(BucketFactory):
def __init__(self, clock, buckets):
self.clock = clock
self.leak_interval = 5000 # leak every 5s
for bucket in buckets:
self.schedule_leak(bucket)
Concurrency
Locking is handled at the Limiter level. try_acquire takes a thread RLock; try_acquire_async takes a loop-local asyncio.Lock in front of the RLock; MultiprocessBucket adds a multiprocessing lock on top. (SQLiteBucket manages its own locking.)
Custom backends
Implement pyrate_limiter.AbstractBucket to add your own backend. The test suite doubles as a conformance spec:
- Fork the repo.
- Implement your bucket against
AbstractBucket. - Add a
create_buckettotests/conftest.pyand wire it into thecreate_bucketfixture. - Run the suite — if it passes, your backend is good to go.
Examples
- asyncio_ratelimit.py — rate-limiting asyncio tasks
- asyncio_decorator.py — the decorator with async functions
- httpx_ratelimiter.py — HTTPX, sync / async / multiprocess
- in_memory_multiprocess.py — multiprocessing with an in-memory bucket
- sqlite_filelock_multiprocess.py — multiprocessing with SQLite + a file lock
Full docs at pyratelimiter.readthedocs.io · Contributing · Changelog
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pyrate_limiter-4.3.1.tar.gz.
File metadata
- Download URL: pyrate_limiter-4.3.1.tar.gz
- Upload date:
- Size: 83.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0575f6b595c2351e3b5778ae1d8d4c523e38446532de66df7a0795bb315bd702
|
|
| MD5 |
f37326e0a99c9f6e3ecc517afe5f002b
|
|
| BLAKE2b-256 |
38f329d1f5c0478100a7847f0a58b918125fc8798d7a24bd658f8925d9f119b4
|
Provenance
The following attestation bundles were made for pyrate_limiter-4.3.1.tar.gz:
Publisher:
build_test.yml on vutran1710/PyrateLimiter
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pyrate_limiter-4.3.1.tar.gz -
Subject digest:
0575f6b595c2351e3b5778ae1d8d4c523e38446532de66df7a0795bb315bd702 - Sigstore transparency entry: 1809715656
- Sigstore integration time:
-
Permalink:
vutran1710/PyrateLimiter@95af1613b12aeadf685897ed437e144dfa93fb3f -
Branch / Tag:
refs/tags/v4.3.1 - Owner: https://github.com/vutran1710
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
build_test.yml@95af1613b12aeadf685897ed437e144dfa93fb3f -
Trigger Event:
push
-
Statement type:
File details
Details for the file pyrate_limiter-4.3.1-py3-none-any.whl.
File metadata
- Download URL: pyrate_limiter-4.3.1-py3-none-any.whl
- Upload date:
- Size: 37.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
75b9d5974ccec835d2c86faefc44b871eb5b42036614179d959bd895e81cf1f3
|
|
| MD5 |
79ee2a7a62470dbb59be0b87d8b0383d
|
|
| BLAKE2b-256 |
33d2d852ab9e10ed4b9a8c36c03d7e3149c0caaf9c0ae1f608cd19de9d026754
|
Provenance
The following attestation bundles were made for pyrate_limiter-4.3.1-py3-none-any.whl:
Publisher:
build_test.yml on vutran1710/PyrateLimiter
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pyrate_limiter-4.3.1-py3-none-any.whl -
Subject digest:
75b9d5974ccec835d2c86faefc44b871eb5b42036614179d959bd895e81cf1f3 - Sigstore transparency entry: 1809715786
- Sigstore integration time:
-
Permalink:
vutran1710/PyrateLimiter@95af1613b12aeadf685897ed437e144dfa93fb3f -
Branch / Tag:
refs/tags/v4.3.1 - Owner: https://github.com/vutran1710
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
build_test.yml@95af1613b12aeadf685897ed437e144dfa93fb3f -
Trigger Event:
push
-
Statement type: