Skip to main content

Request coalescing for Python - eliminate redundant work and protect your systems from thundering herds

Project description

shared-call-py 🚀

Eliminate redundant work and protect your systems from thundering herds with intelligent request coalescing.

Python 3.12+ License: MIT

A Python implementation of request deduplication inspired by Go's singleflight pattern, improved with Python-first ergonomics. When multiple concurrent requests ask for the same resource, only one does the actual work—everyone else gets the same result instantly.

No major code changes required. Just use the @shared.group() decorator or simple functions like call() (execute once) and forget() (invalidate keys) as implemented in src/shared_call_py/_sync.py and src/shared_call_py/_async.py.

✨ Key Features

  • 🚀 92.6x faster - Eliminate redundant database queries (see benchmarks)
  • 🔒 Thread-safe & async-safe - Works with asyncio, threads, and multiprocessing
  • 🎯 Zero dependencies - Built on Python standard library only
  • 🧠 Smart auto-keying - Automatic deduplication based on function + arguments
  • 📊 Built-in monitoring - Track hit rates, errors, and performance metrics
  • 🏛️ Production-ready - Battle-tested patterns from high-scale systems

📋 Table of Contents

🎯 The Problem

Modern applications face three critical challenges:

  1. Thundering Herd: When cache expires, hundreds of requests simultaneously hammer your database
  2. Rate Limit Hell: Concurrent identical API calls burn through your rate limits
  3. Database Overload: High concurrency creates connection pool exhaustion and query slowdowns

Traditional approach: Every request executes independently—wasting resources and destabilizing systems.

shared-call-py approach: Coalesce duplicate in-flight requests into a single execution. The first caller becomes the "leader" and does the work. All others wait and receive the same result.

📊 Benchmarks

Real-world performance improvements across different scenarios:

Scenario Without Coalescing With Coalescing Improvement
Database Load 6.01s, 100 queries 0.07s, 1 query 92.6x faster
Cache Stampede 100 DB hits 1 DB hit 99% reduction
Rate Limits 90% failed 0% failed 100% success
FastAPI Load Test 104s, 52s avg latency 3.8s, 688ms avg latency 27x faster

FastAPI Example Load Test

The examples/fastapi/ project includes load-testing scripts that highlight the impact of request coalescing in a real FastAPI application with a PostgreSQL database hosted on Neon.

  • Normal endpoint (load_test_normal.sh)

    • Total requests: 1000
    • Successful: 1000
    • Failed: 0
    • Total duration: 104.04s
    • Throughput: 9.61 req/s
    • Response times (ms): min 1057.45 · avg 52036.10 · max 102263 · p50 52118.40 · p95 97211.30 · p99 101256
  • Coalesced endpoint (load_test_coalesced.sh)

    • Total requests: 1000
    • Successful: 1000
    • Failed: 0
    • Total duration: 3.84s
    • Throughput: 260.48 req/s
    • Response times (ms): min 52.71 · avg 688.04 · max 1462.63 · p50 731.44 · p95 1149.32 · p99 1432.18

Next, few other benchmark results and methodologies:

Scenario: 100 concurrent requests hit a database with 10 connection pool limit

❌ WITHOUT Request Coalescing
   Concurrent Requests:   100
   Actual DB Queries:     100
   Total Duration:        6.012s
   Avg Latency:           2232.42ms
   p99 Latency:           6010.56ms

✅ WITH Request Coalescing
   Concurrent Requests:   100
   Actual DB Queries:     1
   Total Duration:        0.065s
   Avg Latency:           60.19ms
   p99 Latency:           62.05ms

📊 PERFORMANCE IMPROVEMENT
   Total Speedup:         92.6x faster
   Avg Latency:           37.1x faster
   p99 Latency:           96.9x faster
   DB Queries Eliminated: 99
   Load Reduction:        99.0%

Cache Stampede Protection

Scenario: 100 users hit endpoint simultaneously when cache expires

❌ WITHOUT Protection:
   Duration:       2.004s
   DB Queries:     100 (all 100 hit the database!)
   Wasted Queries: 99

✅ WITH Protection (AsyncSharedCall):
   Duration:       2.005s
   DB Queries:     1 (only the leader executes)
   Coalescing Rate: 99.0%
   Queries Prevented: 99

💡 System stays stable under load!

Rate Limit Prevention

Scenario: API with 10 requests/second limit, 50 concurrent requests

❌ WITHOUT Coalescing:
   Successful:     10
   Failed:         90 (rate limited!)
   Error handling: Required

✅ WITH Coalescing:
   Successful:     100
   Failed:         0
   API Calls Made: 1
   API Calls Saved: 99
   Rate Limit Status: ✅ No violations

Run them yourself:

python examples/mock_db_query.py
python examples/thundering_herd.py
python examples/ratelimit.py

📦 Installation

pip install shared-call-py

Or with Poetry:

poetry add shared-call-py

🎨 Quick Start

Get up and running in under 2 minutes. Choose async (recommended for modern apps) or sync (for legacy/threaded code).

Async Usage (Recommended)

import asyncio
from shared_call_py import AsyncSharedCall

# Create a shared call instance
shared = AsyncSharedCall()

@shared.group()
async def fetch_user(user_id: int) -> dict:
    """Expensive database query - only executes once per unique user_id"""
    print(f"🔍 Fetching user {user_id} from database...")
    await asyncio.sleep(1)  # Simulate slow query
    return {"id": user_id, "name": f"User {user_id}"}

# Simulate 100 concurrent requests for the same user
async def main():
    tasks = [fetch_user(42) for _ in range(100)]
    results = await asyncio.gather(*tasks)
    print(f"✅ Got {len(results)} results, but only 1 database query!")

asyncio.run(main())

Output:

🔍 Fetching user 42 from database...
✅ Got 100 results, but only 1 database query!

Sync Usage

from shared_call_py import SharedCall

shared = SharedCall()

@shared.group()
def expensive_operation(x: int) -> int:
    print(f"Computing {x}...")
    import time
    time.sleep(1)
    return x * 2

# Multiple threads calling simultaneously - only one executes
result = expensive_operation(5)

🏗️ Use Cases

See Quick Start for basic usage examples.

1. Protect Your Database

from shared_call_py import AsyncSharedCall

shared = AsyncSharedCall()

@shared.group()
async def get_user_profile(user_id: int):
    # Only one query executes, even with thousands of concurrent requests
    return await db.query("SELECT * FROM users WHERE id = ?", user_id)

2. Respect Rate Limits

from shared_call_py import AsyncSharedCall

shared = AsyncSharedCall()

class APIClient:
    @shared.group()
    async def fetch_data(self, endpoint: str):
        # Multiple requests coalesce into one API call
        return await self.http_client.get(endpoint)

# 1000 concurrent requests = 1 API call (if for same endpoint)

3. Prevent Cache Stampede

from shared_call_py import AsyncSharedCall

shared = AsyncSharedCall()

@shared.group()
async def get_popular_item():
    # When cache expires, only first request refills it
    result = await expensive_computation()
    cache.set("popular_item", result, ttl=300)
    return result

4. Deduplicate Background Jobs

from shared_call_py import AsyncSharedCall

shared = AsyncSharedCall()

@shared.group()
async def process_webhook(webhook_id: str):
    # If duplicate webhooks arrive, only process once
    return await process_payment(webhook_id)

🎛️ Advanced Features

For basic usage, see Quick Start. These advanced features give you fine-grained control.

Custom Key Functions

Control coalescing granularity with custom key functions:

from shared_call_py import AsyncSharedCall

shared = AsyncSharedCall()

# Coalesce by user_id only, ignore other parameters
@shared.group(key_fn=lambda user_id, include_details: f"user:{user_id}")
async def fetch_user(user_id: int, include_details: bool = False):
    return await db.get_user(user_id, include_details)

Statistics and Monitoring

stats = await shared.get_stats()
print(f"Hit Rate: {stats.hit_rate:.1%}")
print(f"Hits: {stats.hits}")
print(f"Misses: {stats.misses}")
print(f"Errors: {stats.errors}")
print(f"Active Calls: {stats.active}")

Cache Invalidation

# Forget a specific key
await shared.forget("user:42")

# Clear all tracked calls
await shared.forget_all()

# Reset statistics
await shared.reset_stats()

📚 Documentation

🔧 How It Works

  1. First Request: Becomes the "leader" and executes the function
  2. Concurrent Requests: Wait for the leader's result via asyncio.Event or threading.Event
  3. Result Sharing: All waiters receive the same result (or error)
  4. Cleanup: Call completes, resources released

Key features:

  • Thread-safe and async-safe
  • Automatic key generation from function name and arguments
  • Error propagation - all waiters receive the same exception
  • Zero dependencies - uses only Python standard library

🤝 When NOT to Use

Avoid coalescing in these scenarios:

  • Mutations: Don't coalesce write operations (POST, PUT, DELETE) - each must execute independently
  • User-specific data: Each user needs their own result (unless you customize the key function)
  • Time-sensitive data: When staleness matters (though you can use forget() to invalidate keys)
  • Side effects: Functions with important side effects beyond the return value

Perfect for:

🛠️ Development

# Clone the repository
git clone https://github.com/yourusername/shared-call-py.git
cd shared-call-py

# Install dependencies
poetry install

# Run tests
poetry run pytest

# Run benchmarks
python examples/mock_db_query.py

📝 License

MIT License - see LICENSE file for details.

🌟 Credits

Inspired by Go's singleflight pattern and adapted for Python's async/await paradigm.

🤔 FAQ

Q: What happens if the leader fails?
A: All waiting callers receive the same exception. They can retry, which will elect a new leader.

Q: How is this different from caching?
A: Caching stores past results. Coalescing deduplicates in-flight requests. They complement each other—use both for maximum efficiency!

Q: Does this work with FastAPI/Django/Flask?
A: Yes! It's framework-agnostic. Check out our examples for FastAPI, Django, and Flask integrations.

Q: What about memory leaks?
A: Completed calls are automatically cleaned up. Use forget() or forget_all() for manual control.

Q: Can I use this with sync and async code?
A: Yes! Use SharedCall for sync/threaded code and AsyncSharedCall for async/await. See Quick Start.

Q: How do I monitor performance?
A: Use get_stats() to track hit rates, misses, and errors. Perfect for observability dashboards!


Built with ❤️ to make Python applications faster and more resilient.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

shared_call_py-0.1.1.tar.gz (14.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

shared_call_py-0.1.1-py3-none-any.whl (12.7 kB view details)

Uploaded Python 3

File details

Details for the file shared_call_py-0.1.1.tar.gz.

File metadata

  • Download URL: shared_call_py-0.1.1.tar.gz
  • Upload date:
  • Size: 14.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for shared_call_py-0.1.1.tar.gz
Algorithm Hash digest
SHA256 3c671d34bf28c1e956b05d162316cc4ce5f7be51e07deaa03d3d34115e59e5f3
MD5 8a812c484a3fc492b9e649f0aa8d240a
BLAKE2b-256 12fc4e192e02754ec65d89094b2c3b518d7bec99f599956f5db041fc368c6ee6

See more details on using hashes here.

File details

Details for the file shared_call_py-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: shared_call_py-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 12.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for shared_call_py-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 edf503e274f1a3fdb06f88a1e580f16bfbbf83c4791ede2958d183bcf2400286
MD5 569c304077158ba7e20cf4a273741c4a
BLAKE2b-256 8e66a4a1b566f33864821eefbd16a711eafecbf915ea52d6539aa0a86277fe31

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page