Async resilience patterns for Python - Circuit breakers, rate limiting, and more
Project description
aioresilience - Fault Tolerance Library for Asyncio
Introduction
aioresilience is a comprehensive fault tolerance library for Python's asyncio ecosystem, providing 9 resilience patterns with a powerful event system for monitoring.
Core Capabilities:
- 9 Resilience Patterns: Circuit Breaker, Retry, Timeout, Bulkhead, Fallback, Rate Limiter, Load Shedder, Backpressure, and Adaptive Concurrency
- Event-Driven Observability: Local and global event handlers for comprehensive monitoring
- Decorator & Context Manager APIs: Flexible integration styles - use decorators, context managers, or direct calls
- Composable: Stack multiple patterns on any async function
- Framework Integrations: First-class support for FastAPI, Sanic, and aiohttp
aioresilience requires Python 3.9+.
from aioresilience import CircuitBreaker, RateLimiter, LoadShedder, circuit_breaker, with_load_shedding
# Create a CircuitBreaker with default configuration
circuit = CircuitBreaker(name="backendService", failure_threshold=5)
# Create a RateLimiter with local in-memory storage
rate_limiter = RateLimiter(name="backendService")
# Create a LoadShedder with default configuration
load_shedder = LoadShedder(max_requests=1000)
# Example: Your backend service call
async def call_external_api():
# Simulated API call
import httpx
async with httpx.AsyncClient() as client:
response = await client.get("https://api.example.com/data")
return response.json()
# Decorate your function with Circuit Breaker and Load Shedding
@circuit_breaker("backendService", failure_threshold=5)
@with_load_shedding(load_shedder, priority="normal")
async def decorated_call(user_id: str):
# Check rate limit
if await rate_limiter.check_rate_limit(user_id, "100/minute"):
return await call_external_api()
else:
raise Exception("Rate limit exceeded")
# Execute the decorated function and handle exceptions
try:
result = await decorated_call("user_123")
except Exception as e:
result = "Fallback value"
# Or call directly through the circuit breaker
result = await circuit.call(call_external_api)
Note: With aioresilience you don't have to go all-in, you can pick what you need.
Features
- 9 Resilience Patterns - Circuit Breaker, Retry, Timeout, Bulkhead, Fallback, Rate Limiter, Load Shedder, Backpressure, Adaptive Concurrency
- Event System - Comprehensive observability with local and global event handlers
- Flexible Logging - Silent by default, supports any logging framework (loguru, structlog, etc.)
- Async-First - Built for asyncio from the ground up
- Decorator & Context Manager - Flexible API styles
- Type Hints - Full PEP 484 type annotations
- Composable - Stack multiple patterns on any function
- Framework Integrations - FastAPI, Sanic, aiohttp middleware
- Optional Dependencies - Use only what you need
Documentation
Complete documentation is available in this README and through Python docstrings.
Installation
Basic Installation
pip install aioresilience
With Optional Features
# Redis-based distributed rate limiting
pip install aioresilience[redis]
# System metrics monitoring (CPU/memory)
pip install aioresilience[system]
# Framework integrations
pip install aioresilience[fastapi] # FastAPI/Starlette
pip install aioresilience[sanic] # Sanic
pip install aioresilience[aiohttp] # aiohttp
pip install aioresilience[integrations] # All frameworks
# Development dependencies
pip install aioresilience[dev]
# Everything
pip install aioresilience[all]
Overview
aioresilience provides the following resilience patterns:
- Circuit Breaker - Circuit breaking with state management
- Rate Limiter - Rate limiting (local and distributed)
- Load Shedder - Load shedding (basic and system-aware)
- Backpressure Manager - Backpressure management with water marks
- Adaptive Concurrency Limiter - Adaptive concurrency limiting with AIMD algorithm
- Retry Policy - Retry with exponential/linear/constant backoff
- Timeout Manager - Timeout and deadline management
- Bulkhead - Resource isolation and concurrency limiting
- Fallback Handler - Graceful degradation with fallback values
- Event System - Local and global event handlers for observability
Framework Integrations
Seamless integration with popular async Python web frameworks:
- FastAPI / Starlette - Middleware and dependency injection
- Sanic - Async-native middleware and decorators
- aiohttp - Handler decorators and middleware
See INTEGRATIONS.md for detailed integration guides.
Note: All core modules are included in the base package. Use optional dependencies to enable additional features.
Tip: For all features install with
pip install aioresilience[all].
Resilience Patterns
| Name | How Does It Work? | Description |
|---|---|---|
| Circuit Breaker | Temporarily blocks possible failures | When a system is seriously struggling, failing fast is better than making clients wait. Prevents cascading failures by monitoring error rates and opening the circuit when thresholds are exceeded. |
| Retry | Automatic retry with backoff | Automatically retry failed operations with configurable strategies (exponential, linear, constant) and jitter to prevent thundering herd. |
| Timeout | Time-bound operations | Set maximum execution time for operations. Supports both relative timeouts and absolute deadlines. |
| Bulkhead | Isolate resources | Limit concurrent access to resources to prevent resource exhaustion. Isolates failures to specific resource pools. |
| Fallback | Graceful degradation | Provide alternative responses when primary operation fails. Supports static values, callables, and chained fallback strategies. |
| Rate Limiter | Limits executions per time period | Control the rate of incoming requests with configurable windows (second, minute, hour, day). Supports both local (in-memory) and distributed (Redis) backends. |
| Load Shedder | Rejects requests under high load | Protect your system by rejecting new requests when load exceeds thresholds. Supports request-count-based and system-metric-based (CPU/memory) shedding. |
| Backpressure Manager | Controls flow in async pipelines | Signal upstream components to slow down when downstream is overloaded. Uses water marks (high/low) for graceful flow control. |
| Adaptive Concurrency | Auto-adjusts concurrency limits | Dynamically adjust concurrency based on success rate using AIMD algorithm. Similar to TCP congestion control - additive increase, multiplicative decrease. |
Above table is inspired by Polly: resilience policies and resilience4j.
Logging Configuration
aioresilience follows Python library best practices with silent logging by default (NullHandler). This gives you complete control over how errors and operational logs are handled.
Default Behavior (Silent)
By default, aioresilience emits no logs:
from aioresilience import CircuitBreaker
# No logs are emitted - library is silent by default
circuit = CircuitBreaker("api")
Enable Standard Python Logging
Configure standard Python logging for aioresilience:
import logging
from aioresilience import configure_logging
# Enable logging with DEBUG level
configure_logging(logging.DEBUG)
# Now you'll see logs from aioresilience
circuit = CircuitBreaker("api")
Custom Logging Framework Integration
Use any logging framework (loguru, structlog, etc.) with the error handler:
With Loguru
from loguru import logger
from aioresilience import set_error_handler
# Route aioresilience errors to loguru
set_error_handler(
lambda name, exc, ctx: logger.error(
f"[{name}] {exc.__class__.__name__}: {exc}",
**ctx
)
)
With Structlog
import structlog
from aioresilience import set_error_handler
log = structlog.get_logger()
set_error_handler(
lambda name, exc, ctx: log.error(
"aioresilience_error",
module=name,
exception_type=exc.__class__.__name__,
exception=str(exc),
**ctx
)
)
Custom Format
from aioresilience import configure_logging
import logging
configure_logging(
level=logging.INFO,
format_string='%(asctime)s [%(name)s] %(levelname)s: %(message)s'
)
Disable Logging
from aioresilience import disable_logging
# Explicitly disable all logging (already default)
disable_logging()
Check Logging Status
from aioresilience import is_logging_enabled
if is_logging_enabled():
print("Logging is configured")
else:
print("Logging is silent")
Logging API Reference
| Function | Description |
|---|---|
configure_logging(level, handler, format_string) |
Enable standard Python logging |
set_error_handler(handler) |
Set custom error handler for any framework |
disable_logging() |
Reset to silent state (NullHandler) |
is_logging_enabled() |
Check if logging is configured |
Usage Examples
Circuit Breaker
The following example shows how to decorate an async function with a Circuit Breaker and how to handle state transitions.
import asyncio
import httpx
from aioresilience import CircuitBreaker, circuit_breaker
# Simulates a Backend Service
class BackendService:
async def do_something(self):
# Simulate API call
async with httpx.AsyncClient() as client:
response = await client.get("https://api.example.com/data")
return response.json()
backend_service = BackendService()
# Create a CircuitBreaker with custom configuration
circuit = CircuitBreaker(
name="backendName",
failure_threshold=5, # Open after 5 consecutive failures
recovery_timeout=60.0, # Wait 60 seconds before trying half-open
success_threshold=2 # Need 2 successes to close from half-open
)
# Decorate your call to BackendService.do_something()
async def call_backend():
if await circuit.can_execute():
try:
result = await circuit.call(backend_service.do_something)
return result
except Exception as e:
# Circuit breaker automatically tracks the failure
raise
else:
raise Exception("Circuit breaker is OPEN")
# Or use the decorator pattern
@circuit_breaker("backendName", failure_threshold=5)
async def decorated_backend_call():
return await backend_service.do_something()
# Execute with fallback
async def call_with_fallback():
try:
result = await decorated_backend_call()
return result
except Exception:
return {"data": "fallback_value"}
# When you don't want to decorate your function
result = await circuit.call(backend_service.do_something)
Circuit Breaker States
The circuit breaker has three states:
- CLOSED: Normal operation, requests pass through
- OPEN: Failure threshold exceeded, requests fail fast
- HALF_OPEN: Testing recovery, limited requests allowed
Monitoring Circuit Breaker
You can monitor circuit breaker state and metrics:
# Get current state
state = circuit.get_state()
print(f"Circuit state: {state}")
# Get detailed metrics
metrics = circuit.get_metrics()
print(f"Total requests: {metrics['total_requests']}")
print(f"Failed requests: {metrics['failed_requests']}")
print(f"Failure rate: {metrics['failure_rate']:.2%}")
# Access global circuit breaker manager
from aioresilience import get_circuit_breaker, get_all_circuit_metrics
# Get or create a circuit breaker
backend_circuit = get_circuit_breaker("backend", failure_threshold=3)
# Get metrics for all circuit breakers
all_metrics = get_all_circuit_metrics()
for name, metrics in all_metrics.items():
print(f"{name}: {metrics['state']}")
Rate Limiter
The following example shows how to restrict the calling rate to not be higher than 10 requests per second.
import asyncio
from aioresilience import RateLimiter
# Create a RateLimiter (local/in-memory)
rate_limiter = RateLimiter(name="backendName")
# Check rate limit for a specific key (e.g., user ID)
async def handle_request(user_id: str):
if await rate_limiter.check_rate_limit(user_id, "10/second"):
# Request is within rate limit
return {"status": "success", "data": "..."}
else:
# Rate limit exceeded
raise Exception("Rate limit exceeded")
# Example: Testing rate limits
async def test_rate_limit():
# First call succeeds
try:
result = await handle_request("user_123")
print("Request successful")
except Exception as e:
print(f"Request failed: {e}")
# If you make 11 calls in one second, the 11th will fail
for i in range(11):
try:
result = await handle_request("user_123")
print(f"Call {i+1} successful")
except Exception as e:
print(f"Call {i+1} failed: {e}")
# Run the test
asyncio.run(test_rate_limit())
Rate Limit Formats
aioresilience supports multiple time periods:
"10/second"- 10 requests per second"100/minute"- 100 requests per minute"1000/hour"- 1000 requests per hour"10000/day"- 10000 requests per day
Distributed Rate Limiting with Redis
For multi-instance applications, use Redis-based distributed rate limiting:
from aioresilience.rate_limiting import RedisRateLimiter
# Create a Redis-backed rate limiter
rate_limiter = RedisRateLimiter(name="backendName")
await rate_limiter.init_redis("redis://localhost:6379")
# Use the same API - now shared across all instances
if await rate_limiter.check_rate_limit("user_123", "1000/hour"):
result = await backend_service.do_something()
else:
raise Exception("Rate limit exceeded")
# Don't forget to close the connection when done
await rate_limiter.close()
Note: Redis rate limiter uses a sliding window algorithm with sorted sets for accurate distributed rate limiting.
Load Shedding
There are two load shedding implementations.
BasicLoadShedder
The following example shows how to shed load based on request count:
from aioresilience import LoadShedder
# Create a LoadShedder with request count limits
load_shedder = LoadShedder(
max_requests=1000, # Maximum concurrent requests
max_queue_depth=500 # Maximum queue depth
)
# Use in your request handler
async def handle_request():
if await load_shedder.acquire():
try:
# Process the request
result = await backend_service.do_something()
return result
finally:
await load_shedder.release()
else:
# Load shedding - reject request
raise Exception("Service overloaded")
# Or use the decorator
from aioresilience import with_load_shedding
@with_load_shedding(load_shedder, priority="normal")
async def process_request():
return await backend_service.do_something()
SystemLoadShedder
The following example shows how to shed load based on system metrics (CPU and memory):
from aioresilience.load_shedding import SystemLoadShedder
# Create a system-aware load shedder
load_shedder = SystemLoadShedder(
max_requests=1000,
cpu_threshold=85.0, # Shed load if CPU > 85%
memory_threshold=85.0 # Shed load if memory > 85%
)
# Use the same API as BasicLoadShedder
async def handle_request():
if await load_shedder.acquire(priority="normal"):
try:
result = await backend_service.do_something()
return result
finally:
await load_shedder.release()
else:
raise Exception("Service overloaded - high system load")
# High priority requests can bypass some checks
if await load_shedder.acquire(priority="high"):
# High priority request processing
pass
Note: SystemLoadShedder requires the
psutilpackage. Install withpip install aioresilience[system].
Backpressure Management
Control flow in async processing pipelines using water marks:
from aioresilience import BackpressureManager
# Create a backpressure manager
backpressure = BackpressureManager(
max_pending=1000, # Hard limit on pending items
high_water_mark=800, # Start applying backpressure
low_water_mark=200 # Stop applying backpressure
)
# Use in async pipeline
async def process_stream(items):
for item in items:
# Try to acquire slot (with timeout)
if await backpressure.acquire(timeout=5.0):
try:
await process_item(item)
finally:
await backpressure.release()
else:
# Backpressure timeout - item rejected
logger.warning(f"Item rejected due to backpressure")
# Or use the decorator
from aioresilience import with_backpressure
@with_backpressure(backpressure, timeout=5.0)
async def process_item(item):
# Your processing logic
await asyncio.sleep(0.1)
return item
Adaptive Concurrency Limiting
Automatically adjust concurrency limits based on success rate:
from aioresilience import AdaptiveConcurrencyLimiter
# Create an adaptive limiter with AIMD algorithm
limiter = AdaptiveConcurrencyLimiter(
initial_limit=100, # Starting concurrency
min_limit=10, # Minimum concurrency
max_limit=1000, # Maximum concurrency
increase_rate=1.0, # Additive increase
decrease_factor=0.9 # Multiplicative decrease
)
# Use in your request handler
async def handle_request():
if await limiter.acquire():
try:
result = await backend_service.do_something()
# Report success
await limiter.release(success=True)
return result
except Exception as e:
# Report failure
await limiter.release(success=False)
raise
else:
raise Exception("Concurrency limit reached")
# Check current statistics
stats = limiter.get_stats()
print(f"Current limit: {stats['current_limit']}")
print(f"Active requests: {stats['active_count']}")
Note: The AIMD algorithm increases the limit linearly on success and decreases it exponentially on failure, similar to TCP congestion control.
Retry Pattern
Automatically retry failed operations with exponential backoff and jitter:
from aioresilience import RetryPolicy, retry, RetryStrategy
# Using RetryPolicy class
policy = RetryPolicy(
max_attempts=5,
initial_delay=1.0,
max_delay=60.0,
backoff_multiplier=2.0,
strategy=RetryStrategy.EXPONENTIAL,
jitter=0.1,
)
async def fetch_data():
async with httpx.AsyncClient() as client:
response = await client.get("https://api.example.com/data")
return response.json()
# Execute with retry
result = await policy.execute(fetch_data)
# Or use the decorator
@retry(
max_attempts=3,
initial_delay=0.5,
strategy=RetryStrategy.EXPONENTIAL
)
async def fetch_user(user_id: str):
async with httpx.AsyncClient() as client:
response = await client.get(f"https://api.example.com/users/{user_id}")
response.raise_for_status()
return response.json()
# Will automatically retry on exceptions
user = await fetch_user("123")
Retry Strategies
Three backoff strategies are available:
- Exponential: Delays increase exponentially (1s, 2s, 4s, 8s...)
- Linear: Delays increase linearly (1s, 2s, 3s, 4s...)
- Constant: Same delay every time (1s, 1s, 1s, 1s...)
Predefined Policies
from aioresilience import RetryPolicies
# Default: 3 attempts, exponential backoff
policy = RetryPolicies.default()
# Aggressive: 5 attempts, fast exponential backoff
policy = RetryPolicies.aggressive()
# Conservative: 3 attempts, linear backoff with high jitter
policy = RetryPolicies.conservative()
# Network-oriented: handles connection errors
policy = RetryPolicies.network()
Timeout Pattern
Set maximum execution time for async operations:
from aioresilience import TimeoutManager, timeout, with_timeout
# Using TimeoutManager class
manager = TimeoutManager(timeout=5.0)
async def slow_operation():
await asyncio.sleep(10.0)
return "result"
# Will raise OperationTimeoutError after 5 seconds
try:
result = await manager.execute(slow_operation)
except OperationTimeoutError:
print("Operation timed out")
# Or use the decorator
@timeout(3.0)
async def fetch_data():
async with httpx.AsyncClient() as client:
response = await client.get("https://api.example.com/data")
return response.json()
# Convenience function for one-off timeouts
result = await with_timeout(fetch_data, 5.0)
Deadline Management
For absolute time constraints:
from aioresilience import DeadlineManager, with_deadline
import time
# Set an absolute deadline
deadline = time.time() + 10.0 # 10 seconds from now
manager = DeadlineManager(deadline=deadline)
async def process_request():
# Multiple operations sharing the same deadline
data1 = await manager.execute(fetch_data)
data2 = await manager.execute(process_data, data1)
return data2
# Or use convenience function
result = await with_deadline(fetch_data, deadline)
Bulkhead Pattern
Isolate resources and limit concurrent access:
from aioresilience import Bulkhead, bulkhead
# Create a bulkhead for database connections
db_bulkhead = Bulkhead(
max_concurrent=10, # Max 10 concurrent database operations
max_waiting=20, # Max 20 requests waiting in queue
timeout=5.0, # Max 5 seconds wait time
name="database"
)
async def query_database(query: str):
async with db_bulkhead:
# Only 10 of these can run concurrently
# Your database query here
result = {"query": query, "status": "success"}
return result
# Or use as a function executor with a callable
async def execute_query(query: str):
# Your database logic here
return {"query": query, "status": "success"}
result = await db_bulkhead.execute(execute_query, "SELECT * FROM users")
# Or use the decorator
@bulkhead(max_concurrent=5, max_waiting=10)
async def call_external_api(endpoint: str):
async with httpx.AsyncClient() as client:
response = await client.get(f"https://api.example.com/{endpoint}")
return response.json()
# Get metrics
metrics = db_bulkhead.get_metrics()
print(f"Current active: {metrics['current_active']}")
print(f"Peak active: {metrics['peak_active']}")
print(f"Rejected: {metrics['rejected_requests']}")
Bulkhead Registry
Manage multiple resource pools:
from aioresilience import get_bulkhead
# Define your operations
async def call_api():
# Your API call logic
return {"status": "success"}
async def query_db():
# Your database query logic
return {"rows": []}
async def get_cache():
# Your cache operation logic
return {"cached": True}
# Get or create named bulkheads
api_bulkhead = await get_bulkhead("external_api", max_concurrent=10)
db_bulkhead = await get_bulkhead("database", max_concurrent=20)
cache_bulkhead = await get_bulkhead("cache", max_concurrent=50)
# Use them independently
await api_bulkhead.execute(call_api)
await db_bulkhead.execute(query_db)
await cache_bulkhead.execute(get_cache)
Fallback Pattern
Provide alternative responses when operations fail:
import httpx
from aioresilience import fallback, chained_fallback, with_fallback
# Simple static fallback
@fallback([])
async def fetch_items():
async with httpx.AsyncClient() as client:
response = await client.get("https://api.example.com/items")
return response.json()
# If fetch_items fails, returns empty list []
# Fallback with callable
@fallback(lambda: {"status": "unavailable"})
async def get_service_status():
async with httpx.AsyncClient() as client:
response = await client.get("https://api.example.com/status")
return response.json()
# Async fallback function
async def get_cached_data(*args, **kwargs):
# Simulated cache lookup
return {"cached": True, "data": "cached_user_data"}
@fallback(get_cached_data)
async def fetch_user_data(user_id: str):
async with httpx.AsyncClient() as client:
response = await client.get(f"https://api.example.com/users/{user_id}")
return response.json()
# If API fails, tries cache; if cache fails, raises exception
Chained Fallbacks
Multiple fallback strategies in sequence:
import httpx
from aioresilience import chained_fallback
async def get_from_cache(user_id):
# Simulated cache lookup
return {"cached": True, "user_id": user_id}
async def get_from_backup_api(user_id):
async with httpx.AsyncClient() as client:
response = await client.get(f"https://backup-api.example.com/users/{user_id}")
return response.json()
DEFAULT_USER = {"id": None, "name": "Guest", "email": None}
@chained_fallback(
get_from_cache, # Try cache first
get_from_backup_api, # Then backup API
DEFAULT_USER # Finally use default
)
async def get_user(user_id: str):
# Try primary API
async with httpx.AsyncClient() as client:
response = await client.get(f"https://api.example.com/users/{user_id}")
response.raise_for_status()
return response.json()
# Tries: primary API → cache → backup API → default value
user = await get_user("123")
Combining Patterns
Retry with fallback for robust error handling:
@retry(max_attempts=3, initial_delay=1.0)
@fallback({"data": [], "status": "degraded"})
async def fetch_critical_data():
async with httpx.AsyncClient() as client:
response = await client.get("https://api.example.com/critical-data")
response.raise_for_status()
return response.json()
# Will retry up to 3 times, then use fallback if all fail
Framework Integrations
aioresilience provides fully configurable middleware and decorators for FastAPI, Sanic, and aiohttp with zero hardcoded values.
Key Features:
- All error messages configurable
- All HTTP status codes configurable
- All Retry-After headers configurable
- Custom response factories for complete control
See INTEGRATIONS.md for comprehensive guides.
FastAPI Integration
FastAPI provides modular middleware and decorators with full configurability:
from fastapi import FastAPI
from aioresilience import CircuitBreaker, LoadShedder, RetryPolicy
from aioresilience.integrations.fastapi import (
CircuitBreakerMiddleware,
LoadSheddingMiddleware,
TimeoutMiddleware,
BulkheadMiddleware,
FallbackMiddleware,
ResilienceMiddleware, # Composite - combines multiple patterns
retry_route, # Route decorator (recommended for retry logic)
)
app = FastAPI()
# Circuit Breaker - Fully customizable
app.add_middleware(
CircuitBreakerMiddleware,
circuit_breaker=CircuitBreaker("api", failure_threshold=5),
error_message="Service temporarily down", # Custom message
status_code=503, # Custom status
retry_after=30, # Custom retry delay
include_circuit_info=False, # Hide internal details
exclude_paths={"/health", "/metrics"}, # O(1) set lookup
)
# Load Shedding - Configurable
app.add_middleware(
LoadSheddingMiddleware,
load_shedder=LoadShedder(max_requests=1000),
error_message="Too busy - please retry",
retry_after=10,
priority_header="X-Request-Priority", # Custom header name
default_priority="normal",
)
# Timeout - Configurable
app.add_middleware(
TimeoutMiddleware,
timeout=30.0,
error_message="Request took too long",
status_code=408,
)
# Fallback
app.add_middleware(
FallbackMiddleware,
fallback_response={"status": "degraded", "data": []},
log_errors=True,
)
# Retry - Use route-level decorator (recommended over middleware)
@app.get("/api/data")
@retry_route(RetryPolicy(max_attempts=3, initial_delay=1.0))
async def get_data():
# Automatic retry on failure with exponential backoff
return {"data": "..."}
Rate Limiting Dependency
from fastapi import FastAPI, Depends
from aioresilience import RateLimiter
from aioresilience.integrations.fastapi import rate_limit_dependency
app = FastAPI()
rate_limiter = RateLimiter(name="api")
# Basic usage
@app.get("/api/data", dependencies=[
Depends(rate_limit_dependency(rate_limiter, "100/minute"))
])
async def get_data():
return {"data": "..."}
# Fully customized
@app.get("/api/premium", dependencies=[
Depends(rate_limit_dependency(
rate_limiter,
"1000/minute",
error_message="Premium tier limit exceeded",
status_code=429,
retry_after=30,
key_func=lambda req: req.headers.get("X-User-ID"), # Custom key
))
])
async def premium_data():
return {"data": "premium"}
Custom Client IP Extraction
from fastapi import Request
from aioresilience.integrations.fastapi import get_client_ip
@app.middleware("http")
async def custom_middleware(request: Request, call_next):
client_ip = get_client_ip(request)
# Supports X-Forwarded-For and X-Real-IP headers
logger.info(f"Request from {client_ip}")
response = await call_next(request)
return response
Sanic Integration
Sanic is async-native with fully configurable middleware and decorators.
from sanic import Sanic, json
from aioresilience import CircuitBreaker, RateLimiter, LoadShedder
from aioresilience.integrations.sanic import (
setup_resilience,
circuit_breaker_route,
rate_limit_route,
timeout_route,
bulkhead_route,
)
app = Sanic("MyApp")
# Global resilience setup - Fully configurable
setup_resilience(
app,
circuit_breaker=CircuitBreaker(name="api"),
rate_limiter=RateLimiter(name="api"),
rate="1000/minute",
load_shedder=LoadShedder(max_requests=500),
timeout=30.0,
# All customizable
exclude_paths={"/health", "/metrics", "/admin"},
circuit_error_message="API temporarily unavailable",
circuit_status_code=503,
circuit_retry_after=60,
rate_error_message="Too many requests",
rate_retry_after=120,
load_error_message="Server overloaded",
priority_header="X-Priority",
)
# Or use route decorators - Also configurable
@app.get("/api/data")
@circuit_breaker_route(
CircuitBreaker(name="api"),
error_message="Service down",
status_code=503,
retry_after=30,
include_info=False, # Hide circuit details
)
async def get_data(request):
return json({"data": "..."})
@app.get("/api/limited")
@rate_limit_route(
RateLimiter(name="api"),
"100/minute",
error_message="Rate limit hit",
retry_after=60,
)
async def limited_endpoint(request):
return json({"data": "limited"})
aiohttp Integration
Clean middleware and decorators with full configurability for aiohttp.
from aiohttp import web
from aioresilience import CircuitBreaker, RateLimiter, LoadShedder
from aioresilience.integrations.aiohttp import (
create_resilience_middleware,
circuit_breaker_handler,
rate_limit_handler,
timeout_handler,
)
app = web.Application()
# Add resilience middleware - Fully configurable
app.middlewares.append(create_resilience_middleware(
circuit_breaker=CircuitBreaker(name="api"),
rate_limiter=RateLimiter(name="api"),
rate="1000/minute",
load_shedder=LoadShedder(max_requests=500),
timeout=30.0,
# All customizable
exclude_paths={"/health", "/metrics"},
circuit_error_message="API down",
circuit_status_code=503,
circuit_retry_after=45,
rate_error_message="Limit reached",
rate_retry_after=90,
load_error_message="Too busy",
priority_header="X-Priority",
))
# Or use handler decorators - Also configurable
@circuit_breaker_handler(
CircuitBreaker(name="api"),
error_message="Service unavailable",
status_code=503,
retry_after=30,
include_info=False,
)
async def get_data(request):
return web.json_response({"data": "..."})
@rate_limit_handler(
RateLimiter(name="api"),
"100/minute",
error_message="Rate limit exceeded",
retry_after=60,
)
async def limited_data(request):
return web.json_response({"data": "limited"})
app.router.add_get("/api/data", get_data)
app.router.add_get("/api/limited", limited_data)
For more details, see INTEGRATIONS.md.
Event System
All resilience patterns emit events for logging, monitoring, and metrics. The event system supports both local event handlers (per pattern instance) and a global event bus for centralized monitoring.
Local Event Handlers
Each pattern has its own EventEmitter accessible via the .events attribute:
from aioresilience import CircuitBreaker
import logging
logger = logging.getLogger(__name__)
circuit = CircuitBreaker(name="backend", failure_threshold=5)
# Register event handlers using decorator syntax
@circuit.events.on("state_change")
async def on_state_change(event):
logger.warning(f"Circuit {event.name} changed state: "
f"{event.metadata['from_state']} → {event.metadata['to_state']}")
@circuit.events.on("call_success")
async def on_success(event):
logger.debug(f"Circuit {event.name}: successful call")
@circuit.events.on("call_failure")
async def on_failure(event):
logger.error(f"Circuit {event.name}: call failed - {event.metadata.get('error')}")
# Or register handlers directly
async def on_circuit_opened(event):
logger.critical(f"⚠️ Circuit {event.name} OPENED! System degraded.")
circuit.events.on("circuit_opened", on_circuit_opened)
# Wildcard handler to capture all events
@circuit.events.on("*")
async def log_all_events(event):
logger.info(f"Event: {event.event_type} from {event.name}")
Global Event Bus
Monitor events across all patterns using the global event bus:
from aioresilience import CircuitBreaker, RateLimiter, Bulkhead
from aioresilience.events import event_bus
import logging
logger = logging.getLogger(__name__)
# Register global event handlers
@event_bus.on("state_change")
async def monitor_all_state_changes(event):
logger.warning(f"[{event.pattern_type}] {event.name}: "
f"{event.metadata['from_state']} → {event.metadata['to_state']}")
@event_bus.on("rate_limit_exceeded")
async def alert_on_rate_limit(event):
logger.warning(f"Rate limit exceeded for key: {event.metadata.get('key')}")
@event_bus.on("*")
async def collect_metrics(event):
# Send to monitoring system (Prometheus, DataDog, etc.)
metrics_collector.record(
event_type=event.event_type,
pattern=event.pattern_type,
timestamp=event.timestamp
)
# All patterns emit to both local handlers AND the global bus
circuit = CircuitBreaker(name="api")
rate_limiter = RateLimiter(name="api")
bulkhead = Bulkhead(max_concurrent=100)
Event Types by Pattern
Circuit Breaker:
state_change- State transitions (CLOSED ↔ OPEN ↔ HALF_OPEN)circuit_opened- Circuit opened due to failurescircuit_closed- Circuit recoveredcall_success- Successful callcall_failure- Failed call
Rate Limiter:
rate_limit_exceeded- Request rejectedrate_limit_passed- Request allowed
Bulkhead:
bulkhead_rejected- Request rejected (full)bulkhead_accepted- Request accepted
Load Shedder:
request_shed- Request shed due to overloadrequest_accepted- Request accepted
Timeout:
timeout_exceeded- Operation timed outtimeout_success- Completed within timeout
Fallback:
fallback_triggered- Fallback value returned
Retry:
retry_attempt- Retry attempt startedretry_success- Retry succeededretry_exhausted- All retries failed
Getting Metrics
You can still poll metrics synchronously when needed:
# Circuit Breaker metrics
metrics = circuit.get_metrics()
print(f"State: {metrics['state']}, Failures: {metrics['consecutive_failures']}")
# Load Shedder statistics
stats = load_shedder.get_stats()
print(f"Active: {stats['active_requests']}, Shed: {stats['total_shed']}")
# Rate Limiter statistics
stats = rate_limiter.get_stats()
print(f"Active limiters: {stats['active_limiters']}")
For detailed examples, see examples/events_example.py.
Architecture
aioresilience follows a modular architecture with minimal required dependencies:
aioresilience/
├── __init__.py # Main exports
├── logging.py # Logging configuration utilities (no dependencies)
├── events/ # Event system (no dependencies)
│ ├── __init__.py
│ ├── emitter.py # Local event handlers per pattern
│ ├── bus.py # Global event bus
│ └── types.py # Event types and dataclasses
├── circuit_breaker.py # Circuit breaker pattern (no dependencies)
├── retry.py # Retry with backoff strategies (no dependencies)
├── timeout.py # Timeout and deadline management (no dependencies)
├── bulkhead.py # Resource isolation (no dependencies)
├── fallback.py # Graceful degradation (no dependencies)
├── backpressure.py # Backpressure management (no dependencies)
├── adaptive_concurrency.py # Adaptive concurrency limiting (no dependencies)
├── rate_limiting/
│ ├── __init__.py
│ ├── local.py # In-memory rate limiting (requires: aiolimiter)
│ └── redis.py # Distributed rate limiting (requires: redis)
├── load_shedding/
│ ├── __init__.py
│ ├── basic.py # Basic load shedding (no dependencies)
│ └── system.py # System-aware load shedding (requires: psutil)
└── integrations/
├── __init__.py
├── fastapi/ # FastAPI integration (requires: fastapi)
│ ├── __init__.py
│ ├── circuit_breaker.py
│ ├── load_shedding.py
│ ├── timeout.py
│ ├── bulkhead.py
│ ├── retry.py
│ ├── fallback.py
│ ├── backpressure.py
│ ├── adaptive_concurrency.py
│ ├── composite.py # Composite resilience middleware
│ ├── decorators.py # Route-level decorators (retry_route, etc.)
│ ├── dependencies.py # Dependency injection utilities
│ ├── utils.py
│ └── README.md
├── sanic/ # Sanic integration (requires: sanic)
│ ├── __init__.py
│ ├── decorators.py
│ ├── middleware.py
│ └── utils.py
└── aiohttp/ # aiohttp integration (requires: aiohttp)
├── __init__.py
├── decorators.py
├── middleware.py
└── utils.py
Core Dependencies
- Required:
aiolimiter>=1.0.0(for rate limiting) - Optional:
redis>=4.5.0(for distributed rate limiting)psutil>=5.9.0(for system-aware load shedding)fastapi>=0.100.0(for FastAPI integration)sanic>=23.0.0(for Sanic integration)aiohttp>=3.8.0(for aiohttp integration)
Design Philosophy
- Async-First: Built from the ground up for Python's asyncio
- Fail-Safe Defaults: Components fail open to preserve availability
- Modular: Use only what you need, no unnecessary dependencies
- Type-Safe: Full type hints (PEP 484) for better IDE support
- Production-Ready: Thread-safe with proper async locking
- Observable: Rich metrics and statistics for monitoring
Comparison with Other Libraries
| Feature | aioresilience | pybreaker | circuitbreaker |
|---|---|---|---|
| Async-native | Yes | No | No |
| Type hints | Yes | Partial | No |
| Circuit breaker | Yes | Yes | Yes |
| Retry with backoff | Yes | No | No |
| Timeout/Deadline | Yes | No | No |
| Bulkhead | Yes | No | No |
| Fallback | Yes | No | No |
| Rate limiting | Yes | No | No |
| Load shedding | Yes | No | No |
| Backpressure | Yes | No | No |
| Modular design | Yes | No | No |
| Metrics & monitoring | Yes | Basic | Basic |
Performance
aioresilience is designed for minimal overhead in production environments. All patterns use efficient async primitives and lock-free algorithms where possible.
Recent Optimizations:
- Lazy event emission (only when listeners registered)
- Conditional logging (format strings only when enabled)
- O(1) path exclusions using set lookups
- Cached listener checks (reduces per-request overhead)
- Thread-safe state management with async locks
Benchmark Your Own System:
# Sequential overhead (baseline)
python benchmarks/benchmark_sequential.py
# Concurrent overhead (realistic load)
python benchmarks/benchmark_concurrent.py
# With contention and failures
python benchmarks/benchmark_concurrent.py --with-failures
See benchmarks/ directory for detailed benchmarking tools and methodology.
Design Goals:
- Microsecond-level overhead per operation
- Minimal allocations and GC pressure
- Lock-free designs where possible
- Efficient async/await integration
- Support for 20,000+ requests/second in production APIs
Roadmap
Completed in v0.1.0
- Circuit Breaker pattern
- Retry policies with exponential backoff and jitter
- Bulkhead pattern for resource isolation
- Time limiters with timeout and deadline support
- Fallback mechanisms with chained fallbacks
- Rate limiting (local and distributed)
- Load shedding (basic and system-aware)
- Backpressure management
- Adaptive concurrency limiting
- Event system with local and global handlers
- FastAPI integration with modular middleware
- Sanic integration
- aiohttp integration
Planned for Future Releases
- Cache pattern with TTL and invalidation
- Request deduplication
- Prometheus metrics exporter
- OpenTelemetry integration
- Grafana dashboard templates
- Enhanced event streaming
- WebSocket support
- HTTP client wrapper with built-in resilience
- gRPC interceptors
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
For major changes, please open an issue first to discuss what you would like to change.
Development Setup
# Clone the repository
git clone https://github.com/xonming/aioresilience.git
cd aioresilience
# Create virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install with all dependencies
pip install -e ".[dev]"
# or
pip install -r requirements-dev.txt
# Run tests
pytest
# Run tests with coverage
pytest --cov=aioresilience --cov-report=html
# Code formatting
black aioresilience tests
isort aioresilience tests
# Type checking
mypy aioresilience
# Linting
flake8 aioresilience
Running Tests
# Run all tests
pytest
# Run specific test file
pytest tests/unit/test_circuit_breaker.py
# Run with verbose output
pytest -v
# Run with coverage
pytest --cov=aioresilience
License
Copyright 2025 aioresilience contributors
Licensed under the MIT License. You may obtain a copy of the License at
https://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Acknowledgments
Special thanks to:
- aiolimiter for async rate limiting primitives
Support
- Documentation: This README and Python docstrings
- Issues: GitHub Issues
- Discussions: GitHub Discussions
- PyPI: aioresilience on PyPI
Built for the Python asyncio community
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file aioresilience-0.1.0.tar.gz.
File metadata
- Download URL: aioresilience-0.1.0.tar.gz
- Upload date:
- Size: 87.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d487c96624e1e668484b3cacea273683e053cb29201ecd0c00ea101df42bb33b
|
|
| MD5 |
d9eefa3ffe9ab2376228270409573b86
|
|
| BLAKE2b-256 |
78a5d4d7e8db17bdb5ebc2557a3765490cf281a6e53f6a81ae26237f75904bf0
|
File details
Details for the file aioresilience-0.1.0-py3-none-any.whl.
File metadata
- Download URL: aioresilience-0.1.0-py3-none-any.whl
- Upload date:
- Size: 81.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
06f20dac5871f6dce8d2df4670dd1344a4b32e3a66b4442753f330db9b22040f
|
|
| MD5 |
1b2b6e2ae14d4d6dda6ebc055aef18b1
|
|
| BLAKE2b-256 |
89b4c13f4f6c8ec6418fe3e402a1c5f8be8a71799337e3951ff1b1fea7a51e8e
|