Skip to main content

A collection of fast, modular rate & concurrency limiters for Python async and sync code.

Project description

LimitPal

PyPI version Tests Coverage Python versions

Your friendly Python resilient execution toolkit

A fast, modular resilient execution toolkit for Python with sync and async support. In-memory, zero dependencies, thread-safe.

Features

  • Resilience: combine retry, circuit breaker and rate-limiters in one executor
  • Composite limiters (combine multiple limiters for burst control)
  • Token Bucket and Leaky Bucket algorithms
  • Sync and async APIs for all the functionality
  • MockClock for deterministic tests
  • No external dependencies, Python ≥ 3.10

When to use LimitPal

Good fit

  • Building API clients that need fault tolerance (rate limiting + retry + circuit breaker)
  • Integrating with unreliable third-party services
  • Microservices communication with backpressure (blocking acquire)
  • Background job processing with rate control

Not a fit

  • Simple rate limiting without retry logic → use limits
  • Distributed rate limiting across servers → use a Redis-backed solution.

Comparison to other solutions

Feature LimitPal limits slowapi tenacity
Rate Limiting
Retry Logic
Circuit Breaker
Async Support
Distributed(at least for now 😊)

Installation

pip install limitpal

Or with uv:

uv add limitpal

Quick Start

ResilientExecutor (async/sync ready)

Combine Limiting + Retry + CircuitBreaker + BurstControl strategies in one executor

""" Async example """

from limitpal import AsyncResilientExecutor, AsyncTokenBucket, CircuitBreaker, RetryPolicy

# Async rate limiting for burst control.
limiter = AsyncTokenBucket(capacity=5, refill_rate=10)
# Async retries with the same policy.
retry = RetryPolicy(max_attempts=3, base_delay=0.2, backoff=2.0)
# Same breaker semantics in async workflows.
breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=5.0)

# Async executor wraps limiter + retry + breaker.
executor = AsyncResilientExecutor(
    limiter=limiter,
    retry_policy=retry,
    circuit_breaker=breaker,
)

# Your real-world async call.
async def call_api() -> str:
    return await request_external_service()

# Run with async protection.
result = await executor.run("user:123", call_api)

allow / acquire (same API in sync/async)

allow() is non-blocking: it answers “can I proceed right now?”.
acquire() waits for quota (or until timeout) and then proceeds.
Async versions have the same contract — only await differs.

from limitpal import TokenBucket

limiter = TokenBucket(capacity=2, refill_rate=1)

if limiter.allow("user:123"):
    process_request()
else:
    return "Rate limited"

limiter.acquire("user:123", timeout=2.0)  # wait until a token is available
process_request()
from limitpal import AsyncTokenBucket

limiter = AsyncTokenBucket(capacity=2, refill_rate=1)

if await limiter.allow("user:123"):
    await process_request()
else:
    return "Rate limited"

await limiter.acquire("user:123", timeout=2.0)
await process_request()

Key-based limiting (per-user, per-IP, per-tenant)

Limiters keep separate buckets per key. Use keys to isolate users, IPs, or any other dimension you need.

from limitpal import TokenBucket

limiter = TokenBucket(capacity=2, refill_rate=1)

# user:123 has its own bucket
limiter.allow("user:123")
limiter.allow("user:123")  # consumes user:123 quota
limiter.allow("user:123")  # likely False (rate limited)

# user:456 is independent
limiter.allow("user:456")  # allowed, separate bucket

Composite limiters (combine strategies)

Use this when you need both burst control and a smooth global throughput limit at the same time. All limiters must allow the request. (Sync/Async)

from limitpal import AsyncCompositeLimiter, AsyncLeakyBucket, AsyncTokenBucket

per_user = AsyncTokenBucket(capacity=10, refill_rate=5)
global_smooth = AsyncLeakyBucket(capacity=50, leak_rate=20)

limiter = AsyncCompositeLimiter([per_user, global_smooth])

if await limiter.allow("user:123"):
    await process_request()
else:
    return "Rate limited"

Algorithms

Token Bucket

  • Tokens refill at refill_rate per second
  • Each request consumes one token
  • Allows bursts up to capacity
  • Good for APIs that tolerate occasional spikes

Leaky Bucket

  • Requests queue up; they "leak" at leak_rate per second
  • New requests rejected when queue is full
  • Smooth, constant output rate
  • Good for steady throughput (background jobs, pipelines)

Choosing

Use case Algorithm
API with burst allowance Token Bucket
Smooth rate enforcement Leaky Bucket
Third-party API compliance Either

API Reference

TokenBucket / AsyncTokenBucket

TokenBucket(
    capacity: int,              # Max tokens (burst size)
    refill_rate: float,         # Tokens per second
    clock: Clock | None = None,
    ttl: float | None = None,   # Evict buckets after N seconds idle
    max_buckets: int | None = None,  # Max keys (LRU eviction)
    cleanup_interval: int = 100,
)
Method Description
allow(key) Non-blocking: returns True if token consumed
acquire(key, timeout) Block until token available
get_tokens(key) Current token count
reset(key) Reset bucket; key=None clears all

LeakyBucket / AsyncLeakyBucket

LeakyBucket(
    capacity: int,              # Max queue size
    leak_rate: float,           # Requests processed per second
    clock: Clock | None = None,
    ttl: float | None = None,
    max_buckets: int | None = None,
    cleanup_interval: int = 100,
)
Method Description
allow(key) Non-blocking: returns True if queued
acquire(key, timeout) Block until space available
get_queue_size(key) Current queue size
get_wait_time(key) Seconds until space available
reset(key) Clear queue; key=None clears all

CompositeLimiter / AsyncCompositeLimiter

Combine multiple limiters. Operation allowed only if all allow.

from limitpal import CompositeLimiter, TokenBucket, LeakyBucket

limiter = CompositeLimiter([
    TokenBucket(capacity=10, refill_rate=20),  # burst
    LeakyBucket(capacity=20, leak_rate=5),     # steady rate
])

if limiter.allow("user:123"):
    process_request()
Method Description
allow(key) True if all limiters allow
acquire(key, timeout) Block until all allow
limiters Tuple of underlying limiters

Resilience

ResilientExecutor / AsyncResilientExecutor

API reference (all parameters are optional):

ResilientExecutor(
    limiter: SyncLimiter | None = None,
    retry_policy: RetryPolicy | None = None,
    circuit_breaker: CircuitBreaker | None = None,
    clock: Clock | None = None,
)
AsyncResilientExecutor(
    limiter: AsyncLimiter | None = None,
    retry_policy: RetryPolicy | None = None,
    circuit_breaker: CircuitBreaker | None = None,
    clock: Clock | None = None,
)

RetryPolicy

RetryPolicy(
    max_attempts: int = 3,
    base_delay: float = 0.1,
    max_delay: float = 5.0,
    backoff: float = 2.0,
    jitter: float = 0.0,
    retry_on: Iterable[type[Exception]] = (Exception,),
)

CircuitBreaker

CircuitBreaker(
    failure_threshold: int = 5,
    recovery_timeout: float = 5.0,
    half_open_success_threshold: int = 1,
    clock: Clock | None = None,
)

Testing with MockClock

For deterministic tests:

from limitpal import TokenBucket, MockClock

def test_refill():
    clock = MockClock(start_time=0.0)
    limiter = TokenBucket(capacity=1, refill_rate=2.0, clock=clock)

    assert limiter.allow("test") is True
    assert limiter.allow("test") is False

    clock.advance(0.5)  # 1 token refilled at 2/sec
    assert limiter.allow("test") is True

MockClock methods: now(), advance(seconds), set_time(value), sleep(), sleep_async().


Exceptions

from limitpal import (
    RateLimitExceeded,    # acquire() timed out
    InvalidConfigError,   # bad constructor args
    CircuitBreakerOpen,   # circuit breaker blocking
    RetryExhausted,       # retries exhausted
)

# RateLimitExceeded
try:
    limiter.acquire("key", timeout=1.0)
except RateLimitExceeded as e:
    print(e.key, e.retry_after)

# InvalidConfigError
try:
    TokenBucket(capacity=-1, refill_rate=1.0)
except InvalidConfigError as e:
    print(e.parameter, e.value, e.reason)

Project Structure

limitpal/
├── base/         # SyncLimiter, AsyncLimiter interfaces
├── limiters/     # TokenBucket, LeakyBucket (sync + async)
├── composite/    # CompositeLimiter
├── resilience/   # RetryPolicy, CircuitBreaker, ResilientExecutor
├── time/         # Clock, MonotonicClock, MockClock
└── exceptions    # LimitPalError, RateLimitExceeded, etc.

Requirements

  • Python >= 3.10
  • No external dependencies


Documentation

Full documentation: limitpal.readthedocs.io (after connecting the repo to Read the Docs — sign in with GitHub, Import a Project, select the repo, leave defaults; RtD will use .readthedocs.yaml and build with pip install -e ".[docs]" + mkdocs build).

To build and serve the docs locally:

uv sync --group dev
mkdocs serve

Open http://127.0.0.1:8000 . To build static HTML: mkdocs build (output in site/).


License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

limitpal-0.1.1.tar.gz (102.0 kB view details)

Uploaded Source

File details

Details for the file limitpal-0.1.1.tar.gz.

File metadata

  • Download URL: limitpal-0.1.1.tar.gz
  • Upload date:
  • Size: 102.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.26 {"installer":{"name":"uv","version":"0.9.26","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for limitpal-0.1.1.tar.gz
Algorithm Hash digest
SHA256 91cc570942f061b3989d365f6485556b10c9da08c2c29fc802fe75770687dbbb
MD5 6586a74181d5fe5e1eb166b3ded60516
BLAKE2b-256 73e455ecf1f37778380dda98dc89b9213da0606b5c2482c3d46ba394b9996709

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page