Skip to main content

Python rate limiters backed by Redis

Project description

Python Redis Limiters

A library which regulates traffic, with respect to concurrency or time. It implements sync and async context managers for a semaphore- and a token bucket-implementation.

The rate limiters can be distributed using Redis, or run locally in-memory for single-process applications.

Redis-based limiters leverage Lua scripts on Redis, each operation is fully atomic. Both standalone Redis instances and clusters are supported with sync and async interfaces.

Local limiters provide thread-safe (for sync) and asyncio-safe (for async) in-memory rate limiting without requiring Redis.

We currently support Python 3.11, 3.12, and 3.13.

Features

Deployment Options

  • Local (In-Memory): Thread-safe and asyncio-safe rate limiting without Redis dependencies
    • Perfect for single-process applications, testing, and development
    • Zero external dependencies required
  • Redis-Based: Distributed rate limiting using Redis with Lua scripts
    • Atomic operations for accuracy in distributed systems
    • Supports both standalone Redis and Redis Cluster
    • Scales across multiple processes and servers

Async & Sync Support

  • Full support for both synchronous and asynchronous code
  • Context manager interface (with / async with)

Flexibility & Control

  • Factory Classes: SyncTokenBucket and AsyncTokenBucket automatically choose implementation based on connection
  • Explicit Classes: Direct access to SyncRedisTokenBucket, AsyncRedisTokenBucket, SyncLocalTokenBucket, AsyncLocalTokenBucket
  • Configurable Token Consumption: tokens_to_consume parameter for variable-cost operations
  • Customizable Behavior: Control capacity, refill rates, expiry, max sleep time, and initial state

Installation

Basic Installation (Local limiters only)

pip install redis-limiters

With Redis Support

pip install redis-limiters[redis]

Or install Redis separately:

pip install redis-limiters redis

Usage

Token bucket

The TokenBucket classes are useful if you're working with time-based rate limits. Say, you are allowed 100 requests per minute, for a given API token.

If the max_sleep limit is exceeded, a MaxSleepExceededError is raised. Setting max_sleep to 0.0 will sleep "endlessly" - this is also the default value. On the other hand expiry is how long the token bucket will persist in Redis without any activity (acquires or releases). You might need to adjust both to your requirements.

Using Local (In-Memory) Token Bucket

The local token bucket doesn't require Redis and runs entirely in-memory. Perfect for single-process applications or testing.

Note: The local token bucket implementation does not currently support expiry of bucket state. Buckets persist in memory for the lifetime of the process. If you are creating buckets dynamically (e.g., one bucket per user or per API key), this could lead to unbounded memory growth. Consider using the Redis-based implementation for applications with dynamic bucket creation, or ensure buckets are reused for the same resources.

Async version:

import asyncio

from httpx import AsyncClient

from redis_limiters import AsyncTokenBucket

# No Redis connection needed - runs in-memory
limiter = AsyncTokenBucket(
    name="foo",                # name of the resource you are limiting traffic for
    capacity=5,                # hold up to 5 tokens (default: 5.0)
    refill_frequency=1,        # add tokens every second (default: 1.0)
    refill_amount=1,           # add 1 token when refilling (default: 1.0)
    initial_tokens=None,       # start with full capacity (default: None, which uses capacity)
    max_sleep=30,              # raise an error if there are no free tokens for X seconds, 0 never expires (default: 30.0)
    tokens_to_consume=1,       # consume 1 token per request (default: 1.0)
)

async def get_foo():
    async with AsyncClient() as client:
        async with limiter:
            await client.get(...)

async def main():
    await asyncio.gather(
        get_foo() for i in range(100)
    )

Sync version:

import requests

from redis_limiters import SyncTokenBucket

limiter = SyncTokenBucket(
    name="foo",
    capacity=5,
    refill_frequency=1,
    refill_amount=1,
    tokens_to_consume=1,
)

def main():
    with limiter:
        requests.get(...)

Using Redis-Based Token Bucket

For distributed rate limiting across multiple processes or servers, use the Redis-based implementation by providing a connection parameter.

Async version:

import asyncio

from httpx import AsyncClient
from redis.asyncio import Redis

from redis_limiters import AsyncTokenBucket

# With Redis connection - distributed across processes/servers
limiter = AsyncTokenBucket(
    connection=Redis.from_url("redis://localhost:6379"),  # Add Redis connection for distributed limiting
    name="foo",
    capacity=5,
    refill_frequency=1,
    refill_amount=1,
    max_sleep=30,
    expiry=60,                 # set expiry on Redis keys in seconds (default: 60)
    tokens_to_consume=1,
)

async def get_foo():
    async with AsyncClient() as client:
        async with limiter:
            await client.get(...)

async def main():
    await asyncio.gather(
        get_foo() for i in range(100)
    )

Sync version:

import requests
from redis import Redis

from redis_limiters import SyncTokenBucket


limiter = SyncTokenBucket(
    connection=Redis.from_url("redis://localhost:6379"),
    name="foo",
    capacity=5,
    refill_frequency=1,
    refill_amount=1,
    max_sleep=30,
    expiry=60,
    tokens_to_consume=1,
)

def main():
    with limiter:
        requests.get(...)

Semaphore

The semaphore classes are useful when you have concurrency restrictions; e.g., say you're allowed 5 active requests at the time for a given API token.

Note: Currently, only Redis-based semaphores are available. Local (in-memory) semaphore implementation is planned for a future release.

Beware that the client will block until the Semaphore is acquired, or the max_sleep limit is exceeded. If the max_sleep limit is exceeded, a MaxSleepExceededError is raised. Setting max_sleep to 0.0 will sleep "endlessly" - default is 30 seconds. On the other hand expiry is how long the semaphore will persist in Redis without any activity (acquires or releases). You might need to adjust both to your requirements.

Here's how you might use the async version:

import asyncio

from httpx import AsyncClient
from redis.asyncio import Redis

from redis_limiters import AsyncSemaphore

# All properties have defaults except name and connection
limiter = AsyncSemaphore(
    connection=Redis.from_url("redis://localhost:6379"),
    name="foo",    # name of the resource you are limiting traffic for
    capacity=5,    # allow 5 concurrent requests (default: 5)
    max_sleep=30,  # raise an error if it takes longer than 30 seconds to acquire the semaphore (default: 30.0)
    expiry=60,     # set expiry on the semaphore keys in Redis to prevent deadlocks (default: 60)
)

async def get_foo():
    async with AsyncClient() as client:
        async with limiter:
            await client.get(...)


async def main():
    await asyncio.gather(
        get_foo() for i in range(100)
    )

and here is how you might use the sync version:

import requests
from redis import Redis

from redis_limiters import SyncSemaphore


limiter = SyncSemaphore(
    connection=Redis.from_url("redis://localhost:6379"),
    name="foo",
    capacity=5,
    max_sleep=30,
    expiry=60,
)

def main():
    with limiter:
        requests.get(...)

Using Explicit Implementation Classes

For explicit control over which implementation to use, import the specific classes:

# Local implementations
from redis_limiters import SyncLocalTokenBucket, AsyncLocalTokenBucket

# Redis implementations
from redis_limiters import SyncRedisTokenBucket, AsyncRedisTokenBucket

# Use directly without factory logic
local_limiter = SyncLocalTokenBucket(name="api", capacity=10)
redis_limiter = SyncRedisTokenBucket(connection=redis_conn, name="api", capacity=10)

Consuming Multiple Tokens Per Request

You can control how many tokens are consumed per operation using the tokens_to_consume parameter. This is useful when different operations on the same api have different "costs". Note, how the "name" aka is the same between the limiters which will cause the tokens to be shared.

from redis_limiters import SyncTokenBucket

# Small requests consume 1 token
small_limiter = SyncTokenBucket(name="api", capacity=100, tokens_to_consume=1)

# Large requests consume 5 tokens
large_limiter = SyncTokenBucket(name="api", capacity=100, tokens_to_consume=5)

with small_limiter:
    make_small_request()  # Consumes 1 token

with large_limiter:
    make_large_request()  # Consumes 5 tokens

Using them as a decorator

We don't ship decorators in the package, but if you would like to limit the rate at which a whole function is run, you can create your own, like this:

from redis_limiters import AsyncSemaphore
from redis.asyncio import Redis

# Define a decorator function
def limit(name, capacity, connection):
  def middle(f):
    async def inner(*args, **kwargs):
      async with AsyncSemaphore(connection=connection, name=name, capacity=capacity):
        return await f(*args, **kwargs)
    return inner
  return middle

# Or for local token buckets (no Redis needed)
from redis_limiters import AsyncTokenBucket

def rate_limit(name, capacity):
  def middle(f):
    async def inner(*args, **kwargs):
      async with AsyncTokenBucket(name=name, capacity=capacity):
        return await f(*args, **kwargs)
    return inner
  return middle


# Then pass the relevant limiter arguments like this
@limit(name="foo", capacity=5, connection=Redis.from_url("redis://localhost:6379"))
async def fetch_foo(id: UUID) -> Foo:
    ...

# Or with local rate limiting
@rate_limit(name="bar", capacity=10)
async def fetch_bar(id: UUID) -> Bar:
    ...

Contributing

Contributions are very welcome. Here's how to get started:

  • Clone the repo
  • Install uv and mise
  • Run mise run install to install dependencies If you prefer not to install mise, check the mise.toml file and run the commands manually.
  • Make your code changes, with tests
  • Run tests with mise run test or uv run pytest Note that you will need to first spin up the redis docker containers. This can be done with mise run test-setup (and shut down with mise run test-teardown) or the full cycle can be run with mise run test-full.
  • Commit your changes and open a PR

Publishing a new version

To publish a new version:

  • Update the package version in the pyproject.toml
  • Open Github releases
  • Press "Draft a new release"
  • Set a tag matching the new version (for example, v0.8.0)
  • Set the title matching the tag
  • Add some release notes, explaining what has changed
  • Publish

Once the release is published, our publish workflow should be triggered to push the new version to PyPI.

Acknowledgment:

This project was initially forked from redis-rate-limiters and was mainly created by Sondre Lillebø Gundersen link. It was no longer maintained and I since rewrote a lot of stuff as well as added a local version of the limiters and new functionality like the initial amount of tokens or how many tokens to consume at once.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

steindamm-0.7.0.tar.gz (13.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

steindamm-0.7.0-py3-none-any.whl (17.6 kB view details)

Uploaded Python 3

File details

Details for the file steindamm-0.7.0.tar.gz.

File metadata

  • Download URL: steindamm-0.7.0.tar.gz
  • Upload date:
  • Size: 13.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.4

File hashes

Hashes for steindamm-0.7.0.tar.gz
Algorithm Hash digest
SHA256 9858dcb6455f4abd333c2a7f2d4f0774cc0300b6b624e73929268c2cfc8d111f
MD5 c92f00c7a5b4678331348a63d112d3da
BLAKE2b-256 419e9d259b33d06379a97dfe149a195afb58d4074219164546a24a4ecc84e693

See more details on using hashes here.

File details

Details for the file steindamm-0.7.0-py3-none-any.whl.

File metadata

  • Download URL: steindamm-0.7.0-py3-none-any.whl
  • Upload date:
  • Size: 17.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.4

File hashes

Hashes for steindamm-0.7.0-py3-none-any.whl
Algorithm Hash digest
SHA256 597cf55dbbc3a89ef8d0cac37ece0345e9b96dba773e17da4cff73ca349da870
MD5 1fb1acde7acfa0d47672f1a21f5eaf1b
BLAKE2b-256 de666404766507c722334bbd4153ce47144fd6aa7f9019da0f05e825ca092059

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page