Skip to main content

Distributed token bucket and semphore

Project description

Steindamm

A library which regulates traffic, with respect to concurrency or time. It implements sync and async context managers for a semaphore- and a token bucket-implementation.

The rate limiters can be distributed using Redis, or run locally in-memory for single-process applications.

Redis-based limiters leverage Lua scripts on Redis, each operation is fully atomic. Both standalone Redis instances and clusters are supported with sync and async interfaces.

Local limiters provide thread-safe (for sync) and asyncio-safe (for async) in-memory rate limiting without requiring Redis.

We currently support Python 3.11, 3.12, and 3.13.

Features

Deployment Options

  • Local (In-Memory): Thread-safe and asyncio-safe rate limiting without Redis dependencies
    • Perfect for single-process applications, testing, and development
    • Zero external dependencies required
  • Redis-Based: Distributed rate limiting using Redis with Lua scripts
    • Atomic operations for accuracy in distributed systems
    • Supports both standalone Redis and Redis Cluster
    • Scales across multiple processes and servers

Async & Sync Support

  • Full support for both synchronous and asynchronous code
  • Context manager interface (with / async with)

Flexibility & Control

  • Factory Classes: SyncTokenBucket, AsyncTokenBucket, SyncSemaphore, and AsyncSemaphore automatically choose implementation based on connection
  • Explicit Classes: Direct access to SyncRedisTokenBucket, AsyncRedisTokenBucket, SyncLocalTokenBucket, AsyncLocalTokenBucket, SyncRedisSemaphore, AsyncRedisSemaphore, SyncLocalSemaphore, and AsyncLocalSemaphore
  • Configurable Token Consumption: tokens_to_consume parameter for variable-cost operations
    • Set at initialization or override dynamically per request: with bucket(5):
  • Customizable Behavior: Control capacity, refill rates, expiry, max sleep time, and initial state

Installation

Basic Installation (Local limiters only)

pip install steindamm

With Redis Support

pip install steindamm[redis]

Or install Redis separately:

pip install steindamm redis

Usage

Token bucket

The TokenBucket classes are useful if you're working with time-based rate limits. Say, you are allowed 100 requests per minute, for a given API token.

Exception Handling:

  • If the max_sleep limit is exceeded, a MaxSleepExceededError is raised. Setting max_sleep to 0.0 will sleep "endlessly" - this is also the default value.
  • If a non-refilling bucket (with refill_amount=0 and refill_frequency=0) runs out of tokens, a NoTokensAvailableError is raised.
  • expiry controls how long the token bucket will persist in Redis without any activity (acquires or releases). You might need to adjust both to your requirements.

Using Local (In-Memory) Token Bucket

The local token bucket doesn't require Redis and runs entirely in-memory. Perfect for single-process applications or testing.

Note: The local token bucket implementation does not currently support expiry of bucket state. Buckets persist in memory for the lifetime of the process. If you are creating buckets dynamically (e.g., one bucket per user or per API key), this could lead to unbounded memory growth. Consider using the Redis-based implementation for applications with dynamic bucket creation, or ensure buckets are reused for the same resources.

Async version:

import asyncio

from httpx import AsyncClient

from steindamm import AsyncTokenBucket

# No Redis connection needed - runs in-memory
limiter = AsyncTokenBucket(
    name="foo",                # name of the resource you are limiting traffic for
    capacity=5,                # hold up to 5 tokens (default: 5.0)
    refill_frequency=1,        # add tokens every second (default: 1.0)
    refill_amount=1,           # add 1 token when refilling (default: 1.0)
    initial_tokens=None,       # start with full capacity (default: None, which uses capacity)
    max_sleep=30,              # raise an error if there are no free tokens for X seconds, 0 never expires (default: 30.0)
    tokens_to_consume=1,       # consume 1 token per request (default: 1.0)
    window_start_time=None,    # align the first window start to a specific past datetime (default: None, no alignment)
)

async def get_foo():
    async with AsyncClient() as client:
        async with limiter:
            await client.get(...)

async def main():
    await asyncio.gather(
        get_foo() for i in range(100)
    )

Sync version:

import requests

from steindamm import SyncTokenBucket

limiter = SyncTokenBucket(
    name="foo",
    capacity=5,
    refill_frequency=1,
    refill_amount=1,
    tokens_to_consume=1,
)

def main():
    with limiter:
        requests.get(...)

Using Redis-Based Token Bucket

For distributed rate limiting across multiple processes or servers, use the Redis-based implementation by providing a connection parameter.

Async version:

import asyncio

from httpx import AsyncClient
from redis.asyncio import Redis

from steindamm import AsyncTokenBucket

# With Redis connection - distributed across processes/servers
limiter = AsyncTokenBucket(
    connection=Redis.from_url("redis://localhost:6379"),  # Add Redis connection for distributed limiting
    name="foo",
    capacity=5,
    refill_frequency=1,
    refill_amount=1,
    max_sleep=30,
    expiry=60,                 # set expiry on Redis keys in seconds (default: 60)
    tokens_to_consume=1,
    window_start_time=None,    # align the first window start to a specific past datetime (default: None, no alignment)
)

async def get_foo():
    async with AsyncClient() as client:
        async with limiter:
            await client.get(...)

async def main():
    await asyncio.gather(
        get_foo() for i in range(100)
    )

Sync version:

import requests
from redis import Redis

from steindamm import SyncTokenBucket


limiter = SyncTokenBucket(
    connection=Redis.from_url("redis://localhost:6379"),
    name="foo",
    capacity=5,
    refill_frequency=1,
    refill_amount=1,
    max_sleep=30,
    expiry=60,
    tokens_to_consume=1,
)

def main():
    with limiter:
        requests.get(...)

Using Explicit Implementation Classes

For explicit control over which implementation to use, import the specific classes:

# Local implementations
from steindamm import SyncLocalTokenBucket, AsyncLocalTokenBucket

# Redis implementations
from steindamm import SyncRedisTokenBucket, AsyncRedisTokenBucket

# Use directly without factory logic
local_limiter = SyncLocalTokenBucket(name="api", capacity=10)
redis_limiter = SyncRedisTokenBucket(connection=redis_conn, name="api", capacity=10)

Consuming Multiple Tokens Per Request

You can control how many tokens are consumed per operation using the tokens_to_consume parameter. This can be any non-negative number, including 0 for zero-cost operations. There are two ways to do this:

1. Set at initialization time:

from steindamm import SyncTokenBucket

# Small requests consume 1 token
small_limiter = SyncTokenBucket(name="api", capacity=100, tokens_to_consume=1)

# Large requests consume 5 tokens
large_limiter = SyncTokenBucket(name="api", capacity=100, tokens_to_consume=5)

with small_limiter:
    make_small_request()  # Consumes 1 token

with large_limiter:
    make_large_request()  # Consumes 5 tokens

Note: When different operations on the same resource have different "costs", using the same "name" (aka the bucket key) will cause the tokens to be shared across all limiters using that name.

2. Pass dynamically when using the context manager:

You can override the tokens_to_consume value on a per-request basis by calling the bucket instance:

from steindamm import SyncTokenBucket

# Create a single bucket with default tokens_to_consume=1
limiter = SyncTokenBucket(name="api", capacity=100, tokens_to_consume=1)

with limiter:
    make_small_request()  # Uses default: consumes 1 token

with limiter(5):
    make_large_request()  # Dynamically consumes 5 tokens

with limiter(10):
    make_extra_large_request()  # Dynamically consumes 10 tokens

with limiter(0):
    make_free_request()  # Dynamically consumes 0 tokens - free operation

This works for all token bucket implementations (sync/async, Redis/local):

from steindamm import AsyncTokenBucket

async_limiter = AsyncTokenBucket(name="api", capacity=100)

async with async_limiter:
    await make_small_request()  # Default: 1 token

async with async_limiter(5):
    await make_large_request()  # Dynamically: 5 tokens

Non-Refilling Token Buckets (Fixed Quota)

You can create a token bucket that never refills by setting both refill_amount and refill_frequency to 0. This is useful for implementing fixed quotas:

from steindamm import SyncTokenBucket, NoTokensAvailableError

# Create a bucket with a fixed quota of 100 tokens
limiter = SyncTokenBucket(
    name="api-quota",
    capacity=100,
    refill_frequency=0,  # Never refill
    refill_amount=0,     # Never refill
)

try:
    # Consume tokens until exhausted
    for i in range(150):
        with limiter:
            make_api_call()
except NoTokensAvailableError as e:
    print(f"Quota exhausted: {e}")

Key points:

  • Both refill_amount and refill_frequency must be 0 (raises ValidationError if only one is 0)
  • Once tokens are exhausted, the bucket raises NoTokensAvailableError instead of waiting
  • You can use initial_tokens but since there are no refills simply setting capacity is enough

Aligning Token Bucket Windows to Specific Times - Fixed Window Algorithm

By default, token buckets start their refill cycles from when they're first used. Essentially the window starts as soon as the first client makes a request, you can however align bucket windows to specific timestamps using the window_start_time parameter. This is useful for scenarios like:

  • Aligning rate limits to calendar boundaries (e.g., start of hour, day, or week)
  • Synchronizing multiple instances to share the same refill schedule
  • Implementing fixed-window rate limits with specific start times

Example: Align to the start of the current hour

from datetime import datetime, timezone
from steindamm import SyncTokenBucket

# Calculate the start of the current hour
now = datetime.now(timezone.utc)
hour_start = now.replace(minute=0, second=0, microsecond=0)

# Create a bucket that refills at the top of each hour
limiter = SyncTokenBucket(
    name="hourly-api",
    capacity=100,
    refill_frequency=3600,     # 1 hour in seconds
    refill_amount=100,         # Refill all tokens at once
    window_start_time=hour_start # Align to the start of the hour
)

with limiter:
    make_api_call()

Example: Align to midnight for daily limits

from datetime import datetime, timezone

# Calculate midnight of the current day
now = datetime.now(timezone.utc)
midnight = now.replace(hour=0, minute=0, second=0, microsecond=0)

limiter = SyncTokenBucket(
    name="daily-api",
    capacity=1000,
    refill_frequency=86400,    # 24 hours in seconds
    refill_amount=1000,        # Daily quota
    window_start_time=midnight
)

Important notes:

  • window_start_time must be a datetime object in the past (raises ValueError if in the future)
  • Works with both local and Redis-based token buckets
  • All instances sharing the same name need to use the same window_start_time to avoid undefined behavior

Using them as a decorator

We don't ship decorators in the package, but if you would like to limit the rate at which a whole function is run, you can create your own, like this:

from steindamm import AsyncSemaphore
from redis.asyncio import Redis

# Define a decorator function
def limit(name, capacity, connection):
  def middle(f):
    async def inner(*args, **kwargs):
      async with AsyncSemaphore(connection=connection, name=name, capacity=capacity):
        return await f(*args, **kwargs)
    return inner
  return middle

# Or for local token buckets (no Redis needed)
from steindamm import AsyncTokenBucket

def rate_limit(name, capacity):
  def middle(f):
    async def inner(*args, **kwargs):
      async with AsyncTokenBucket(name=name, capacity=capacity):
        return await f(*args, **kwargs)
    return inner
  return middle


# Then pass the relevant limiter arguments like this
@limit(name="foo", capacity=5, connection=Redis.from_url("redis://localhost:6379"))
async def fetch_foo(id: UUID) -> Foo:
    ...

# Or with local rate limiting
@rate_limit(name="bar", capacity=10)
async def fetch_bar(id: UUID) -> Bar:
    ...

Semaphore

The semaphore classes are useful when you have concurrency restrictions; e.g., say you're allowed 5 active requests at the time for a given API token.

Local semaphore implementations are backed by Python concurrency primitives: threading.BoundedSemaphore for sync code and asyncio.BoundedSemaphore for async code.

Beware that the client will block until the Semaphore is acquired, or the max_sleep limit is exceeded. If the max_sleep limit is exceeded, a MaxSleepExceededError is raised. Setting max_sleep to 0.0 will sleep "endlessly" - default is 30 seconds.

For Redis-backed semaphores, expiry controls how long the semaphore keys will persist in Redis without any activity (acquires or releases). You might need to adjust it to your requirements.

Note: The local semaphore implementation does not currently support expiry of semaphore state. Local semaphores persist in memory for the lifetime of the process. If you are creating semaphores dynamically (for example, one semaphore per user or per API key), this could lead to unbounded memory growth. Consider using the Redis-based implementation for applications with dynamic semaphore creation, or ensure semaphores are reused for the same resources.

Here's how you might use the local async version:

import asyncio

from httpx import AsyncClient

from steindamm import AsyncSemaphore

limiter = AsyncSemaphore(
    name="foo",
    capacity=5,
    max_sleep=30,
)

async def get_foo():
    async with AsyncClient() as client:
        async with limiter:
            await client.get(...)


async def main():
    await asyncio.gather(
        get_foo() for i in range(100)
    )

And here's the Redis-backed async version:

import asyncio

from httpx import AsyncClient
from redis.asyncio import Redis

from steindamm import AsyncSemaphore

# All properties have defaults except name and connection
limiter = AsyncSemaphore(
    connection=Redis.from_url("redis://localhost:6379"),
    name="foo",    # name of the resource you are limiting traffic for
    capacity=5,    # allow 5 concurrent requests (default: 5)
    max_sleep=30,  # raise an error if it takes longer than 30 seconds to acquire the semaphore (default: 30.0)
    expiry=60,     # set expiry on the semaphore keys in Redis to prevent deadlocks (default: 60)
)

async def get_foo():
    async with AsyncClient() as client:
        async with limiter:
            await client.get(...)


async def main():
    await asyncio.gather(
        get_foo() for i in range(100)
    )

And here is how you might use the local sync version:

import requests

from steindamm import SyncSemaphore


limiter = SyncSemaphore(
    name="foo",
    capacity=5,
    max_sleep=30,
)

def main():
    with limiter:
        requests.get(...)

And here is the Redis-backed sync version:

import requests
from redis import Redis

from steindamm import SyncSemaphore


limiter = SyncSemaphore(
    connection=Redis.from_url("redis://localhost:6379"),
    name="foo",
    capacity=5,
    max_sleep=30,
    expiry=60,
)

def main():
    with limiter:
        requests.get(...)

Contributing

Contributions are very welcome. Here's how to get started:

  • Clone the repo
  • Install mise en place
  • Run mise trust to trust the mise.toml file
  • Run mise run install to install dependencies If you prefer not to install mise, check the mise.toml file and run the commands manually, this would also require you to install uv.
  • Make your code changes, with tests
  • Run tests with mise run test or uv run pytest Note that you will need to first spin up the redis docker containers. This can be done with mise run test-setup (and shut down with mise run test-teardown) or the full cycle can be run with mise run test-full.
  • Commit your changes and open a PR

Publishing a new version

To publish a new version:

  • Update the package version in the pyproject.toml
  • Open Github releases
  • Press "Draft a new release"
  • Set a tag matching the new version (for example, v0.8.0)
  • Set the title matching the tag
  • Add some release notes, explaining what has changed
  • Publish

Once the release is published, our publish workflow should be triggered to push the new version to PyPI.

Acknowledgment:

This project was initially forked from redis-rate-limiters and was mainly created by Sondre Lillebø Gundersen link. It was no longer maintained and I since rewrote a lot of stuff as well as added a local version of the limiters and new functionality like the initial amount of tokens or how many tokens to consume at once.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

steindamm-0.10.0.tar.gz (18.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

steindamm-0.10.0-py3-none-any.whl (25.2 kB view details)

Uploaded Python 3

File details

Details for the file steindamm-0.10.0.tar.gz.

File metadata

  • Download URL: steindamm-0.10.0.tar.gz
  • Upload date:
  • Size: 18.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.7 {"installer":{"name":"uv","version":"0.11.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for steindamm-0.10.0.tar.gz
Algorithm Hash digest
SHA256 4b5b1547ad9bce0f8c415f8cb804b8392f971d0874edbfc2850e8a9fd1cd8fd9
MD5 083379cf7e996e1790e0aa294aef7c76
BLAKE2b-256 8359d0308b1bf6cd565376f1cf2c5517c4d14f8786c1170dfefeecb6be15f17d

See more details on using hashes here.

File details

Details for the file steindamm-0.10.0-py3-none-any.whl.

File metadata

  • Download URL: steindamm-0.10.0-py3-none-any.whl
  • Upload date:
  • Size: 25.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.7 {"installer":{"name":"uv","version":"0.11.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for steindamm-0.10.0-py3-none-any.whl
Algorithm Hash digest
SHA256 af64766212532a61128d54558e922c6e3a2b1f6eb48341e567d0e19bb8e65910
MD5 486cad76f479cceea354647f97dc0107
BLAKE2b-256 121791dff4db306da502cb92bbc4b95ddacdfdd2d25163989211e17570a2fd0e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page