Skip to main content

Rate limiting package

Project description

Rate Limiting Algorithms

PyPI Version Build Status Documentation Status Code Coverage PyPI - Python Version

This project adheres to Semantic Versioning

Algorithms

Algorithms Sync Async
Leaky Bucket Yes Yes
Token Bucket Yes Yes
Generic Cell Rate Algorithm Yes Yes
LLM-Token Yes Yes

[!NOTE]
Implementations will be single-threaded, blocking requests (or the equivalent) with burst capabilities. With asyncio, we use non-blocking cooperative multitasking, not preemptive multi-threading

Development

Setup uv-based virtual environment

# Install uv
# for a mac or linux
brew install uv
# OPTIONAL: or
curl -LsSf https://astral.sh/uv/install.sh | sh

# python version are automatically downloaded as needed or: uv python install 3.12
uv venv rate --python 3.12


# to activate the virtual environment
source .venv/bin/activate

# to deactivate the virtual environment
deactivate

Create lock file + requirements.txt

# after pyproject.toml is created
uv lock

uv export -o requirements.txt --quiet

Upgrade dependencies

# can use sync or lock
uv sync --upgrade

or 

# to upgrade a specific package
uv lock --upgrade-package requests

Usage

[!IMPORTANT] These are special use cases. The general use cases are in the examples/ folder

LLM Token-Based Rate Limiting

[!NOTE] This decorator assumes that the user will pass any necessary params. If you want to make these optional, see limitor/__init__.py

from functools import wraps
import random
import time
from typing import Callable

from limitor.base import SyncRateLimit
from limitor.configs import BucketConfig
from limitor.leaky_bucket.core import SyncLeakyBucket


def rate_limit(capacity: int = 10, seconds: float = 1, bucket_cls: type[SyncRateLimit] = SyncLeakyBucket) -> Callable:
    bucket = bucket_cls(BucketConfig(capacity=capacity, seconds=seconds))

    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            amount = kwargs.get("amount", 1)
            bucket.acquire(amount=amount)
            return func(*args, **kwargs)
        return wrapper

    return decorator

# limit of 100,000 tokens per second

@rate_limit(capacity=100_000, seconds=1)
def process_request(amount=1):
    print(f"This is a rate-limited function: {time.strftime('%X')} - {amount} tokens")

for _ in range(100):
    # generate random prompt tokens between 5,000 and 30,000 for 100 sample requests
    llm_prompt_tokens = random.randint(5_000, 30_000)
    try:
        process_request(amount=llm_prompt_tokens)
    except Exception as error:
        print(f"Rate limit exceeded: {error}")

With User-Specific Rate Limits + Cache

from functools import wraps
import time
from typing import Optional

from cachetools import LRUCache, TTLCache

from limitor.base import SyncRateLimit
from limitor.configs import BucketConfig
from limitor.leaky_bucket.core import (
    AsyncLeakyBucket,
    SyncLeakyBucket,
)


def _get_user_cache(max_users, ttl):
    if ttl is not None:
        return TTLCache(maxsize=max_users, ttl=ttl)
    return LRUCache(maxsize=max_users)

def rate_limit_per_user(capacity=10, seconds=1, max_users=1000, ttl=None, bucket_cls: type[SyncRateLimit] = SyncLeakyBucket):
    buckets = _get_user_cache(max_users, ttl)
    global_bucket = bucket_cls(BucketConfig(capacity=capacity, seconds=seconds))

    def decorator(func):
        # optional use_id. if not set, it will default to a regular global rate limiter
        # if user_id is not set, this means the max_users / ttl parameters will be ignored
        @wraps(func)
        def wrapper(*args, user_id=None, **kwargs):
            if user_id is None:
                bucket = global_bucket
            else:
                if user_id not in buckets:
                    buckets[user_id] = bucket_cls(BucketConfig(capacity=capacity, seconds=seconds))
                bucket = buckets[user_id]
            with bucket:
                return func(user_id, *args, **kwargs)

        return wrapper

    return decorator

@rate_limit_per_user(capacity=2, seconds=1, max_users=3, ttl=600)  # TTLCache: 10 min/user
def something_user(user_id):
    print(f"User {user_id} called at {time.strftime('%X')}")

for _ in range(20):
    try:
        x = 1 if _ % 2 == 0 else 0
        something_user(user_id=x)
    except Exception as error:
        print(f"Rate limit exceeded: {error}")

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

limitor-0.4.2.tar.gz (119.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

limitor-0.4.2-py3-none-any.whl (21.3 kB view details)

Uploaded Python 3

File details

Details for the file limitor-0.4.2.tar.gz.

File metadata

  • Download URL: limitor-0.4.2.tar.gz
  • Upload date:
  • Size: 119.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.8.4

File hashes

Hashes for limitor-0.4.2.tar.gz
Algorithm Hash digest
SHA256 14b3bc95dc51b3fa213ae12db730b7f6c2b255f8e20055c4d76d3fb2044c4cf8
MD5 a30e44fd061fd574ce153de54ce5e2f6
BLAKE2b-256 8dca1921f0b801688d9b34c2880929618de1dd0d1d89c9f8f20dc3188daa3dec

See more details on using hashes here.

File details

Details for the file limitor-0.4.2-py3-none-any.whl.

File metadata

  • Download URL: limitor-0.4.2-py3-none-any.whl
  • Upload date:
  • Size: 21.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.8.4

File hashes

Hashes for limitor-0.4.2-py3-none-any.whl
Algorithm Hash digest
SHA256 d26477ebe8786404aa672c0a9e3dae37caf26c6338d5b89935971b4a1f7e3233
MD5 e38132556cf491dfb40619bc530f5382
BLAKE2b-256 30a0ad5bba29ad1855a5c41b89b5df6e32ec97cd38d358fe0483281acf888ae6

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page