Production-ready resilience patterns for Python: Circuit Breaker and Rate Limiter
Project description
flexinfer-resilience
Production-ready resilience patterns for Python applications.
Features
- Circuit Breaker - Prevents cascading failures by failing fast when services are down
- Rate Limiter - Token bucket rate limiting with exponential backoff
- Retry - Automatic retries with configurable backoff strategies (constant, linear, exponential, decorrelated jitter)
- Bulkhead - Concurrency isolation using semaphores to prevent resource overload
- Timeout - Time-bounded operations with fallback support
- Async-first - Built for asyncio with full async/await support
- Type hints - Complete type annotations for IDE support
- Zero dependencies - Uses only Python standard library (optional Redis support)
- Distributed State -
RedisCircuitBreakerfor sharing circuit state across multiple instances.
Installation
pip install flexinfer-resilience
Import path stays resilience:
from resilience import CircuitBreaker
Or install from GitLab:
pip install git+https://gitlab.flexinfer.ai/libs/py-resilience.git
Quick Start
Circuit Breaker
from resilience import CircuitBreaker, CircuitBreakerConfig
# Create a circuit breaker with custom config
config = CircuitBreakerConfig(
failure_threshold=5, # Open after 5 failures
recovery_timeout=30.0, # Wait 30s before half-open
success_threshold=2, # Close after 2 successes in half-open
)
breaker = CircuitBreaker("my-api", config)
# Use as context manager
async def call_api():
async with breaker:
return await external_api.fetch()
# Or use execute method
result = await breaker.execute(external_api.fetch)
# Or use as decorator
@breaker.decorate
async def decorated_call():
return await external_api.fetch()
Rate Limiter
from resilience import RateLimiter, RateLimitConfig
# Create a rate limiter
config = RateLimitConfig(
requests_per_second=10, # 10 requests per second
burst_size=20, # Allow bursts up to 20
)
limiter = RateLimiter(config)
# Acquire a token before making requests
async def rate_limited_call():
await limiter.acquire()
return await api.request()
# Or check without blocking
if await limiter.try_acquire():
await api.request()
else:
print("Rate limited, try again later")
Combined Usage with Retry
from resilience import (
CircuitBreaker,
RateLimiter,
execute_with_retry,
CircuitBreakerOpenError,
)
limiter = RateLimiter(RateLimitConfig(requests_per_second=5))
# Execute with automatic retry and rate limiting
result = await execute_with_retry(
fetch_data,
url="https://api.example.com",
rate_limiter=limiter,
max_retries=3,
retry_exceptions=(TimeoutError, ConnectionError),
)
API Reference
CircuitBreaker
class CircuitBreaker:
def __init__(self, name: str, config: CircuitBreakerConfig | None = None): ...
@property
def state(self) -> CircuitState: ... # CLOSED, OPEN, or HALF_OPEN
async def execute(self, func, *args, **kwargs) -> T: ...
async def __aenter__(self) -> CircuitBreaker: ...
async def __aexit__(self, ...): ...
def decorate(self, func) -> func: ...
def reset(self) -> None: ...
CircuitBreakerConfig
| Parameter | Type | Default | Description |
|---|---|---|---|
failure_threshold |
int | 5 | Failures before opening |
recovery_timeout |
float | 30.0 | Seconds before half-open |
half_open_max_calls |
int | 3 | Test calls in half-open |
success_threshold |
int | 2 | Successes to close |
excluded_exceptions |
tuple | () | Exceptions that don't count as failures |
RateLimiter
class RateLimiter:
def __init__(self, config: RateLimitConfig | None = None): ...
async def acquire(self, tokens: int = 1) -> float: ... # Returns wait time
async def try_acquire(self, tokens: int = 1) -> bool: ...
def backoff_delay(self, attempt: int) -> float: ...
def reset(self) -> None: ...
RateLimitConfig
| Parameter | Type | Default | Description |
|---|---|---|---|
requests_per_second |
float | 10.0 | Target request rate |
burst_size |
int | 2x rate | Maximum burst allowed |
max_retries |
int | 5 | Retry attempts |
base_delay |
float | 1.0 | Initial backoff delay |
max_delay |
float | 60.0 | Maximum backoff delay |
jitter |
bool | True | Add random jitter |
Registry Pattern
For managing multiple circuit breakers:
from resilience import CircuitBreakerRegistry
# Get or create circuit breakers by name (singleton)
registry = CircuitBreakerRegistry()
breaker = registry.get("api-service")
# Pre-configure breakers
registry.configure("api-service", CircuitBreakerConfig(failure_threshold=3))
# Get stats for all breakers
stats = registry.get_stats()
Circuit Breaker States
┌─────────────────────────────────────────┐
│ │
▼ │
┌─────────┐ failure_threshold ┌──────────┐ │
│ CLOSED │ ──────────────────▶ │ OPEN │ │
└─────────┘ └──────────┘ │
▲ │ │
│ │ recovery_timeout
│ success_threshold ▼ │
│ ┌───────────┐ │
└───────────────────────── │ HALF_OPEN │──┘
└───────────┘ failure
- CLOSED: Normal operation, requests pass through
- OPEN: Service failing, requests rejected immediately
- HALF_OPEN: Testing recovery with limited requests
License
MIT License - see LICENSE
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
No source distribution files available for this release.See tutorial on generating distribution archives.
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file flexinfer_resilience-0.2.2-py3-none-any.whl.
File metadata
- Download URL: flexinfer_resilience-0.2.2-py3-none-any.whl
- Upload date:
- Size: 21.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fa93885f0223858d773cbd3e06e7c2b3bb1d02c8d9a5dffd839420f138a53d4d
|
|
| MD5 |
e3e24d2644c0b04c8db63d5455d3187d
|
|
| BLAKE2b-256 |
b60625dfe2be6763784f2e1c8089cc3eace909f5c21d347df182bf755675cd55
|