Skip to main content

Python Rate-Limiter using Leaky-Bucket Algorithm

Project description

PyrateLimiter logo

PyrateLimiter

A fast, async-friendly rate limiter for Python — Leaky-Bucket algorithm with pluggable backends.

PyPI version Python versions Downloads codecov Docs License

Documentation · Quickstart · Backends · Migrating from v3

[!NOTE] Upgrading from v3.x? The v4 API is simpler and has breaking changes — see the Migration Guide.


Contents

Features

  • 🪣 Leaky-bucket algorithm — smooth, well-understood rate limiting.
  • ⏱️ Multiple rates at once — e.g. 5/second and 1000/hour on the same key.
  • 🔑 Per-key limits — track different services, users, or resources independently.
  • 🧩 Pluggable backends — in-memory, SQLite, Redis (sync and async), Postgres, and multiprocessing.
  • Sync & async — the same API works in both; async paths never block the event loop.
  • 🎀 Direct calls or decoratorslimiter.try_acquire(...) or @limiter.as_decorator(...).
  • 🚦 Blocking or non-blocking — wait for a permit (with optional timeout) or fail fast.

Installation

PyrateLimiter requires Python 3.10+.

pip install pyrate-limiter
# or
conda install --channel conda-forge pyrate-limiter

Optional backends pull in their own drivers:

pip install "pyrate-limiter[all]"   # redis + psycopg (Postgres) + filelock

Quickstart

Limit to 5 requests per 2 seconds:

from pyrate_limiter import Duration, Rate, Limiter

# A Limiter with a single rate, backed by an in-memory bucket
limiter = Limiter(Rate(5, Duration.SECOND * 2))

# Blocking (default): waits until a permit is available
for i in range(6):
    limiter.try_acquire("my-resource")
    print(f"acquired {i}")

# Non-blocking: returns False immediately when the bucket is full
if not limiter.try_acquire("my-resource", blocking=False):
    print("rate limited!")

Prefer a one-liner? The limiter_factory covers the common cases:

from pyrate_limiter import Duration, limiter_factory

limiter = limiter_factory.create_inmemory_limiter(rate_per_duration=5, duration=Duration.SECOND)
limiter.try_acquire("my-resource")

How it works

flowchart TB
    user([Your application]):::ext
    user -->|"limit a key: try_acquire · try_acquire_async · @as_decorator"| limiter

    subgraph pyrate["PyrateLimiter"]
        direction TB
        limiter["Limiter<br/>public API"]:::api
        factory["BucketFactory<br/>routes each key to its bucket"]:::core
        leaker["Leaker<br/>background cleanup"]:::leak
        clock(["Clock<br/>time source"]):::clk

        limiter --> factory
        factory -->|routes to| backends
        leaker -.->|periodic leak| backends
        backends -->|reads time from| clock

        subgraph backends["Bucket backend — choose one"]
            direction LR
            mem["InMemory"]:::bkt
            sqlite["SQLite"]:::bkt
            redis["Redis<br/>sync · async"]:::bkt
            pg["Postgres"]:::bkt
            mp["Multiprocess"]:::bkt
        end
    end

    classDef api fill:#E5484D,color:#ffffff,stroke:#E5484D;
    classDef core fill:#242A33,color:#ffffff,stroke:#242A33;
    classDef bkt fill:#EEF2F6,color:#242A33,stroke:#CBD5E1;
    classDef clk fill:#ffffff,color:#242A33,stroke:#242A33;
    classDef leak fill:#F2C94C,color:#242A33,stroke:#E0B53C;
    classDef ext fill:#ffffff,color:#242A33,stroke:#9AA5B1;

The bucket analogy — this library implements the Leaky Bucket algorithm:

  • A bucket represents a fixed capacity (a service, an API quota, …).
  • The bucket fills as requests arrive and leaks at a constant rate — the permitted request rate.
  • When the bucket is full, new requests are delayed (blocking) or rejected (non-blocking).

Core concepts

Component Role
Clock Timestamps incoming items. Only needs now() -> int (sync or async).
Bucket Stores timestamped items; enforces the rates; leak()s out expired items.
BucketFactory Timestamps & routes each item to the right bucket; schedules background leaking.
Limiter The friendly façade. Sync/async, blocking/non-blocking, direct call or decorator; thread-safe via RLock (and asyncio.Lock when async).

For simple cases you only ever touch Limiter and Rate — the rest is wired up for you.

Defining rates & buckets

An API might allow 500/hour, 1000/day, and 10000/month. Express each as a Rate(limit, interval):

from pyrate_limiter import Duration, Rate

rates = [
    Rate(500, Duration.HOUR),       # 500 requests per hour
    Rate(1000, Duration.DAY),       # 1000 requests per day
    Rate(10000, Duration.WEEK * 4), # ~10000 requests per month
]

[!IMPORTANT] Rates must be ordered generous-to-tight: increasing interval, increasing limit, and a non-increasing limit/interval ratio. Ill-formed lists raise ValueError at construction. Check a list yourself with validate_rate_list(rates).

Pass the rates straight to a Limiter (uses an in-memory bucket), or build a specific bucket:

from pyrate_limiter import InMemoryBucket, Limiter

limiter = Limiter(rates)            # shortcut: in-memory bucket
# equivalent to:
limiter = Limiter(InMemoryBucket(rates))

limiter.try_acquire("hello world")

See Backends for Redis, SQLite, Postgres, and multiprocessing.

Everyday usage

Blocking, non-blocking & timeout

try_acquire blocks by default until a permit frees up:

from pyrate_limiter import Rate, Limiter, Duration

limiter = Limiter(Rate(3, Duration.SECOND))

for i in range(5):
    limiter.try_acquire("item")     # blocks when the bucket is full

Fail fast instead with blocking=False:

if not limiter.try_acquire("item", blocking=False):
    print("rate limited!")

In async code use try_acquire_async, optionally with a timeout (seconds):

acquired = await limiter.try_acquire_async("item", timeout=5)
if not acquired:
    print("timed out waiting for a permit")

The buffer_ms Limiter parameter (default 50) adds a small slack to absorb clock drift:

from pyrate_limiter import Rate, Duration, InMemoryBucket, Limiter

bucket = InMemoryBucket([Rate(5, Duration.SECOND)])
limiter = Limiter(bucket, buffer_ms=100)

Weight

Items can carry weight (default 1). An item of weight W consumes W unit-slots atomically — either all W fit or none do:

BigItem(weight=5, name="item") → 5 × item(weight=1, name="item", same timestamp)
limiter.try_acquire("the-sun", weight=10)

Decorator

as_decorator wraps any sync or async function:

from pyrate_limiter import Rate, Duration, Limiter

limiter = Limiter(Rate(5, Duration.SECOND))

@limiter.as_decorator(name="api_call", weight=1)
def handle_something(*args, **kwargs):
    ...

@limiter.as_decorator(name="background_job", weight=2)
async def handle_something_async(*args, **kwargs):
    ...

Context manager

Limiter releases its resources (background leak tasks, connections) on exit:

from pyrate_limiter import Rate, Duration, Limiter

with Limiter(Rate(5, Duration.SECOND)) as limiter:
    limiter.try_acquire("item")
# resources released here

# …or close manually
limiter = Limiter(Rate(5, Duration.SECOND))
try:
    limiter.try_acquire("item")
finally:
    limiter.close()

asyncio & event loops

Use try_acquire_async so waiting uses asyncio.sleep instead of blocking the loop. With a sync bucket, wrap it in BucketAsyncWrapper:

from pyrate_limiter import BucketAsyncWrapper, InMemoryBucket, Rate, Duration, Limiter

limiter = Limiter(BucketAsyncWrapper(InMemoryBucket([Rate(5, Duration.SECOND)])))
await limiter.try_acquire_async("item")

Backends

Backend Sync Async Persistent Multi-process Best for
InMemoryBucket (wrap) single process, fastest
SQLiteBucket ✅ (file lock) persistence / one host, many processes
RedisBucket distributed across hosts
PostgresBucket distributed, already on Postgres
MultiprocessBucket (wrap) a single multiprocessing pool
BucketAsyncWrapper make any sync bucket async-safe

Every bucket takes a List[Rate].

InMemoryBucket

from pyrate_limiter import InMemoryBucket, Rate, Duration

bucket = InMemoryBucket([Rate(5, Duration.MINUTE * 2)])

RedisBucket

Stores items in a sorted set (key = item name, score = timestamp). Use the init classmethod — it works for sync and async clients (just await it for async):

from pyrate_limiter import RedisBucket, Rate, Duration

rates = [Rate(5, Duration.MINUTE * 2)]

# sync
from redis import ConnectionPool, Redis
redis_db = Redis(connection_pool=ConnectionPool.from_url("redis://localhost:6379"))
bucket = RedisBucket.init(rates, redis_db, "bucket-key")

# async
from redis.asyncio import ConnectionPool as AsyncPool, Redis as AsyncRedis
redis_db = AsyncRedis(connection_pool=AsyncPool.from_url("redis://localhost:6379"))
bucket = await RedisBucket.init(rates, redis_db, "bucket-key")

RedisBucket stores one sorted-set member per consumed unit for exact sliding-window checks. For high-volume, long-window limits such as daily or monthly quotas, the retained sorted set can grow large; use shorter windows or a coarser counter-based backend if predictable memory and latency are more important than exact per-item history.

SQLiteBucket

Persists state to SQLite (sync only):

from pyrate_limiter import SQLiteBucket, Rate, Duration, Limiter

rate = Rate(5, Duration.MINUTE)
# set use_file_lock=True to share one DB file across processes on a host
bucket = SQLiteBucket.init_from_file([rate], use_file_lock=False)
limiter = Limiter(bucket)

init_from_file(rates, table="rate_bucket", db_path=None, create_new_table=True, use_file_lock=False)db_path=None uses a temp file; use_file_lock=True uses filelock for multi-process access on a single host.

PostgresBucket

Requires psycopg[pool] (install via the [all] extra). Sync only. Use the built-in PostgresClock, or a custom time source:

from pyrate_limiter import PostgresBucket, Rate, PostgresClock
from psycopg_pool import ConnectionPool

pool = ConnectionPool("postgresql://postgres:postgres@localhost:5432")
bucket = PostgresBucket(pool, "my_bucket_table", [Rate(3, 1000), Rate(4, 1500)])

MultiprocessBucket

Shares a ListProxy across a multiprocessing pool / ProcessPoolExecutor, guarded by a multiprocessing lock. See in_memory_multiprocess.py.

Under contention bucket.waiting estimates can be off, so prefer try_acquire(..., blocking=True) (the default) — the item keeps retrying instead of returning False on a transient miss.

BucketAsyncWrapper

Wraps a sync bucket so every method returns an awaitable, letting the Limiter use asyncio.sleep during delays. See asyncio & event loops.

Web request rate limiting

Drop-in helpers for the popular HTTP clients live in pyrate_limiter.extras:

AIOHTTP
from pyrate_limiter import Duration, limiter_factory
from pyrate_limiter.extras.aiohttp_limiter import RateLimitedSession

limiter = limiter_factory.create_inmemory_limiter(rate_per_duration=2, duration=Duration.SECOND)
session = RateLimitedSession(limiter)

aiohttp_ratelimiter.py

HTTPX
import httpx
from pyrate_limiter import Duration, limiter_factory
from pyrate_limiter.extras.httpx_limiter import AsyncRateLimiterTransport, RateLimiterTransport

limiter = limiter_factory.create_inmemory_limiter(rate_per_duration=1, duration=Duration.SECOND)

with httpx.Client(transport=RateLimiterTransport(limiter=limiter)) as client:
    client.get("https://example.com")

async with httpx.AsyncClient(transport=AsyncRateLimiterTransport(limiter=limiter)) as client:
    await client.get("https://example.com")

httpx_ratelimiter.py

Requests
from pyrate_limiter import Duration, limiter_factory
from pyrate_limiter.extras.requests_limiter import RateLimitedRequestsSession

limiter = limiter_factory.create_inmemory_limiter(rate_per_duration=2, duration=Duration.SECOND)
session = RateLimitedRequestsSession(limiter)

requests_ratelimiter.py

Advanced usage

Custom routing with BucketFactory

When items must be routed to different buckets (per user, per endpoint, …), implement a BucketFactory. At minimum, define wrap_item and get:

from pyrate_limiter import (
    AbstractBucket, BucketFactory, RateItem, MonotonicClock,
    InMemoryBucket, Rate, Duration, Limiter,
)

class SingleRouteFactory(BucketFactory):
    def __init__(self, clock, bucket):
        self.clock = clock
        self.bucket = bucket
        self.schedule_leak(bucket)   # run background leaking for this bucket

    def wrap_item(self, name: str, weight: int = 1) -> RateItem:
        return RateItem(name, self.clock.now(), weight=weight)

    def get(self, _item: RateItem) -> AbstractBucket:
        return self.bucket

To create buckets on demand, use self.create(bucket_class, *args, **kwargs) — it builds the bucket and schedules its leak:

class PerNameFactory(BucketFactory):
    def __init__(self, clock):
        self.clock = clock
        self.buckets = {}

    def wrap_item(self, name: str, weight: int = 1) -> RateItem:
        return RateItem(name, self.clock.now(), weight=weight)

    def get(self, item: RateItem) -> AbstractBucket:
        if item.name not in self.buckets:
            self.buckets[item.name] = self.create(InMemoryBucket, [Rate(5, Duration.SECOND)])
        return self.buckets[item.name]

Then hand the factory to a Limiter:

limiter = Limiter(SingleRouteFactory(MonotonicClock(), InMemoryBucket([Rate(5, Duration.SECOND)])))
limiter.try_acquire("the-earth")
limiter.try_acquire("the-sun", weight=100)

Custom & distributed clocks

In v4 each bucket owns its time source via bucket.now() — the Limiter no longer takes a clock= parameter. To make distributed workers agree on "now" (e.g. a shared Redis/DB clock), either override now() on a bucket subclass (works on every backend, keeps leak consistent), or assign a clock to buckets that delegate to self._clock (e.g. InMemoryBucket, PostgresBucket):

from pyrate_limiter import AbstractClock, InMemoryBucket, RedisBucket, Rate, Duration

class RedisClock(AbstractClock):
    def __init__(self, redis):
        self.redis = redis

    def now(self) -> int:
        seconds, microseconds = self.redis.time()
        return seconds * 1000 + microseconds // 1000

# Option A — override now() (recommended)
class RedisTimeBucket(RedisBucket):
    def now(self) -> int:
        seconds, microseconds = self.redis.time()
        return seconds * 1000 + microseconds // 1000

# Option B — inject a clock into a bucket that uses self._clock
bucket = InMemoryBucket([Rate(5, Duration.SECOND)])
bucket._clock = RedisClock(redis_client)

Built-in clocks: MonotonicClock (default), MonotonicAsyncClock, PostgresClock, SQLiteClock.

Leaking

Buckets shouldn't hold items forever. Each bucket implements leak(current_timestamp=None) to drop expired items, and BucketFactory.schedule_leak(bucket) runs that in the background (default interval 10 s):

factory.schedule_leak(bucket)   # background leak for this bucket

Change the interval (in milliseconds) via the leak_interval property:

class MyFactory(BucketFactory):
    def __init__(self, clock, buckets):
        self.clock = clock
        self.leak_interval = 5000          # leak every 5s
        for bucket in buckets:
            self.schedule_leak(bucket)

Concurrency

Locking is handled at the Limiter level. try_acquire takes a thread RLock; try_acquire_async takes a loop-local asyncio.Lock in front of the RLock; MultiprocessBucket adds a multiprocessing lock on top. (SQLiteBucket manages its own locking.)

Custom backends

Implement pyrate_limiter.AbstractBucket to add your own backend. The test suite doubles as a conformance spec:

  1. Fork the repo.
  2. Implement your bucket against AbstractBucket.
  3. Add a create_bucket to tests/conftest.py and wire it into the create_bucket fixture.
  4. Run the suite — if it passes, your backend is good to go.

Examples


Full docs at pyratelimiter.readthedocs.io · Contributing · Changelog

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyrate_limiter-4.4.0.tar.gz (91.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyrate_limiter-4.4.0-py3-none-any.whl (43.1 kB view details)

Uploaded Python 3

File details

Details for the file pyrate_limiter-4.4.0.tar.gz.

File metadata

  • Download URL: pyrate_limiter-4.4.0.tar.gz
  • Upload date:
  • Size: 91.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for pyrate_limiter-4.4.0.tar.gz
Algorithm Hash digest
SHA256 2c0c720c4fa16c5d8199e4821bf34507fb49c007a25b786cec6fb94ffd0844aa
MD5 1644509ba1697cb84b47b1cb6fdb528d
BLAKE2b-256 1927e564f33ea085c63d5540f707b31aeb50a4992eac2da655dc02435a760a07

See more details on using hashes here.

Provenance

The following attestation bundles were made for pyrate_limiter-4.4.0.tar.gz:

Publisher: build_test.yml on vutran1710/PyrateLimiter

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file pyrate_limiter-4.4.0-py3-none-any.whl.

File metadata

  • Download URL: pyrate_limiter-4.4.0-py3-none-any.whl
  • Upload date:
  • Size: 43.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for pyrate_limiter-4.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 f738dfa3c7ac1222a5ea3d31e00cfd31b5592b13ade4077afe9e8ac6293381f5
MD5 585c700e1b8eaade33a14d27221b7e5c
BLAKE2b-256 2b772b5ea2e5e343fd7f74ba9c50a282d7cb66d1be3d12bd647510338d78fcf1

See more details on using hashes here.

Provenance

The following attestation bundles were made for pyrate_limiter-4.4.0-py3-none-any.whl:

Publisher: build_test.yml on vutran1710/PyrateLimiter

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page