Request coalescing for Python - eliminate redundant work and protect your systems from thundering herds
Project description
shared-call-py 🚀
Eliminate redundant work and protect your systems from thundering herds with intelligent request coalescing.
A Python implementation of request deduplication inspired by Go's singleflight pattern, improved with Python-first ergonomics. When multiple concurrent requests ask for the same resource, only one does the actual work—everyone else gets the same result instantly.
No major code changes required. Just use the @shared.group() decorator or simple functions like call() (execute once) and forget() (invalidate keys) as implemented in src/shared_call_py/_sync.py and src/shared_call_py/_async.py.
✨ Key Features
- 🚀 92.6x faster - Eliminate redundant database queries (see benchmarks)
- 🔒 Thread-safe & async-safe - Works with
asyncio, threads, and multiprocessing - 🎯 Zero dependencies - Built on Python standard library only
- 🧠 Smart auto-keying - Automatic deduplication based on function + arguments
- 📊 Built-in monitoring - Track hit rates, errors, and performance metrics
- 🏛️ Production-ready - Battle-tested patterns from high-scale systems
📋 Table of Contents
- The Problem
- Benchmarks
- Installation
- Quick Start
- Use Cases
- Advanced Features
- Documentation
- How It Works
- When NOT to Use
- Development
- FAQ
🎯 The Problem
Modern applications face three critical challenges:
- Thundering Herd: When cache expires, hundreds of requests simultaneously hammer your database
- Rate Limit Hell: Concurrent identical API calls burn through your rate limits
- Database Overload: High concurrency creates connection pool exhaustion and query slowdowns
Traditional approach: Every request executes independently—wasting resources and destabilizing systems.
shared-call-py approach: Coalesce duplicate in-flight requests into a single execution. The first caller becomes the "leader" and does the work. All others wait and receive the same result.
📊 Benchmarks
Real-world performance improvements across different scenarios:
| Scenario | Without Coalescing | With Coalescing | Improvement |
|---|---|---|---|
| Database Load | 6.01s, 100 queries | 0.07s, 1 query | 92.6x faster |
| Cache Stampede | 100 DB hits | 1 DB hit | 99% reduction |
| Rate Limits | 90% failed | 0% failed | 100% success |
| FastAPI Load Test | 104s, 52s avg latency | 3.8s, 688ms avg latency | 27x faster |
FastAPI Example Load Test
The examples/fastapi/ project includes load-testing scripts that highlight the impact of request coalescing in a real FastAPI application with a PostgreSQL database hosted on Neon.
Test Configuration: 1000 requests, 50 concurrent workers
❌ WITHOUT Request Coalescing (load_test_normal.sh)
Total Duration: 104.04s
Throughput: 9.61 req/s
Success Rate: 100% (1000/1000)
Response Times (ms):
├─ Min: 1,057
├─ Average: 52,036
├─ Median (p50): 52,118
├─ p95: 97,211
├─ p99: 101,256
└─ Max: 102,263
✅ WITH Request Coalescing (load_test_coalesced.sh)
Total Duration: 3.84s
Throughput: 260.48 req/s
Success Rate: 100% (1000/1000)
Response Times (ms):
├─ Min: 53
├─ Average: 688
├─ Median (p50): 731
├─ p95: 1,149
├─ p99: 1,432
└─ Max: 1,463
📊 PERFORMANCE IMPROVEMENT
Duration: 27.1x faster (104s → 3.8s)
Throughput: 27.1x higher (9.6 → 260.5 req/s)
Avg Response Time: 75.6x faster (52s → 688ms)
p99 Response Time: 70.7x faster (101s → 1.4s)
Next, few other benchmark results and methodologies:
- Database Load Benchmark - Connection pool exhaustion prevention
- Cache Stampede Benchmark - Thundering herd protection
- Rate Limit Benchmark - API quota preservation
Scenario: 100 concurrent requests hit a database with 10 connection pool limit
❌ WITHOUT Request Coalescing
Concurrent Requests: 100
Actual DB Queries: 100
Total Duration: 6.012s
Avg Latency: 2232.42ms
p99 Latency: 6010.56ms
✅ WITH Request Coalescing
Concurrent Requests: 100
Actual DB Queries: 1
Total Duration: 0.065s
Avg Latency: 60.19ms
p99 Latency: 62.05ms
📊 PERFORMANCE IMPROVEMENT
Total Speedup: 92.6x faster
Avg Latency: 37.1x faster
p99 Latency: 96.9x faster
DB Queries Eliminated: 99
Load Reduction: 99.0%
Cache Stampede Protection
Scenario: 100 users hit endpoint simultaneously when cache expires
❌ WITHOUT Protection:
Duration: 2.004s
DB Queries: 100 (all 100 hit the database!)
Wasted Queries: 99
✅ WITH Protection (AsyncSharedCall):
Duration: 2.005s
DB Queries: 1 (only the leader executes)
Coalescing Rate: 99.0%
Queries Prevented: 99
💡 System stays stable under load!
Rate Limit Prevention
Scenario: API with 10 requests/second limit, 50 concurrent requests
❌ WITHOUT Coalescing:
Successful: 10
Failed: 90 (rate limited!)
Error handling: Required
✅ WITH Coalescing:
Successful: 100
Failed: 0
API Calls Made: 1
API Calls Saved: 99
Rate Limit Status: ✅ No violations
Run them yourself:
python examples/mock_db_query.py
python examples/thundering_herd.py
python examples/ratelimit.py
📦 Installation
pip install shared-call-py
Or with Poetry:
poetry add shared-call-py
🎨 Quick Start
Get up and running in under 2 minutes. Choose async (recommended for modern apps) or sync (for legacy/threaded code).
Async Usage (Recommended)
import asyncio
from shared_call_py import AsyncSharedCall
# Create a shared call instance
shared = AsyncSharedCall()
@shared.group()
async def fetch_user(user_id: int) -> dict:
"""Expensive database query - only executes once per unique user_id"""
print(f"🔍 Fetching user {user_id} from database...")
await asyncio.sleep(1) # Simulate slow query
return {"id": user_id, "name": f"User {user_id}"}
# Simulate 100 concurrent requests for the same user
async def main():
tasks = [fetch_user(42) for _ in range(100)]
results = await asyncio.gather(*tasks)
print(f"✅ Got {len(results)} results, but only 1 database query!")
asyncio.run(main())
Output:
🔍 Fetching user 42 from database...
✅ Got 100 results, but only 1 database query!
Sync Usage
from shared_call_py import SharedCall
shared = SharedCall()
@shared.group()
def expensive_operation(x: int) -> int:
print(f"Computing {x}...")
import time
time.sleep(1)
return x * 2
# Multiple threads calling simultaneously - only one executes
result = expensive_operation(5)
🏗️ Use Cases
See Quick Start for basic usage examples.
1. Protect Your Database
from shared_call_py import AsyncSharedCall
shared = AsyncSharedCall()
@shared.group()
async def get_user_profile(user_id: int):
# Only one query executes, even with thousands of concurrent requests
return await db.query("SELECT * FROM users WHERE id = ?", user_id)
2. Respect Rate Limits
from shared_call_py import AsyncSharedCall
shared = AsyncSharedCall()
class APIClient:
@shared.group()
async def fetch_data(self, endpoint: str):
# Multiple requests coalesce into one API call
return await self.http_client.get(endpoint)
# 1000 concurrent requests = 1 API call (if for same endpoint)
3. Prevent Cache Stampede
from shared_call_py import AsyncSharedCall
shared = AsyncSharedCall()
@shared.group()
async def get_popular_item():
# When cache expires, only first request refills it
result = await expensive_computation()
cache.set("popular_item", result, ttl=300)
return result
4. Deduplicate Background Jobs
from shared_call_py import AsyncSharedCall
shared = AsyncSharedCall()
@shared.group()
async def process_webhook(webhook_id: str):
# If duplicate webhooks arrive, only process once
return await process_payment(webhook_id)
🎛️ Advanced Features
For basic usage, see Quick Start. These advanced features give you fine-grained control.
Custom Key Functions
Control coalescing granularity with custom key functions:
from shared_call_py import AsyncSharedCall
shared = AsyncSharedCall()
# Coalesce by user_id only, ignore other parameters
@shared.group(key_fn=lambda user_id, include_details: f"user:{user_id}")
async def fetch_user(user_id: int, include_details: bool = False):
return await db.get_user(user_id, include_details)
Statistics and Monitoring
stats = await shared.get_stats()
print(f"Hit Rate: {stats.hit_rate:.1%}")
print(f"Hits: {stats.hits}")
print(f"Misses: {stats.misses}")
print(f"Errors: {stats.errors}")
print(f"Active Calls: {stats.active}")
Cache Invalidation
# Forget a specific key
await shared.forget("user:42")
# Clear all tracked calls
await shared.forget_all()
# Reset statistics
await shared.reset_stats()
📚 Documentation
- Quick Start Guide - Get started in 5 minutes
- API Reference - Complete API documentation
- Benchmarks - Detailed performance comparisons (also see above)
- Examples - Real-world usage patterns including FastAPI, Django, and Flask
🔧 How It Works
- First Request: Becomes the "leader" and executes the function
- Concurrent Requests: Wait for the leader's result via
asyncio.Eventorthreading.Event - Result Sharing: All waiters receive the same result (or error)
- Cleanup: Call completes, resources released
Key features:
- Thread-safe and async-safe
- Automatic key generation from function name and arguments
- Error propagation - all waiters receive the same exception
- Zero dependencies - uses only Python standard library
🤝 When NOT to Use
❌ Avoid coalescing in these scenarios:
- Mutations: Don't coalesce write operations (POST, PUT, DELETE) - each must execute independently
- User-specific data: Each user needs their own result (unless you customize the key function)
- Time-sensitive data: When staleness matters (though you can use
forget()to invalidate keys) - Side effects: Functions with important side effects beyond the return value
✅ Perfect for:
- Database queries (example)
- External API calls (rate limit protection)
- Cache warming (stampede prevention)
- Expensive computations with identical inputs
🛠️ Development
# Clone the repository
git clone https://github.com/yourusername/shared-call-py.git
cd shared-call-py
# Install dependencies
poetry install
# Run tests
poetry run pytest
# Run benchmarks
python examples/mock_db_query.py
📝 License
MIT License - see LICENSE file for details.
🌟 Credits
Inspired by Go's singleflight pattern and adapted for Python's async/await paradigm.
🤔 FAQ
Q: What happens if the leader fails?
A: All waiting callers receive the same exception. They can retry, which will elect a new leader.
Q: How is this different from caching?
A: Caching stores past results. Coalescing deduplicates in-flight requests. They complement each other—use both for maximum efficiency!
Q: Does this work with FastAPI/Django/Flask?
A: Yes! It's framework-agnostic. Check out our examples for FastAPI, Django, and Flask integrations.
Q: What about memory leaks?
A: Completed calls are automatically cleaned up. Use forget() or forget_all() for manual control.
Q: Can I use this with sync and async code?
A: Yes! Use SharedCall for sync/threaded code and AsyncSharedCall for async/await. See Quick Start.
Q: How do I monitor performance?
A: Use get_stats() to track hit rates, misses, and errors. Perfect for observability dashboards!
Built with ❤️ to make Python applications faster and more resilient.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters