Skip to main content

Backpressure/concurrency control middleware for FastMCP MCP servers

Project description

mcp-backpressure

Backpressure and concurrency control middleware for FastMCP MCP servers.

Problem: LLMs can generate hundreds of parallel tool calls, causing resource exhaustion, server crashes, and no structured feedback for clients to retry.

Solution: Middleware that limits concurrent executions, queues excess requests with timeout, and returns structured JSON-RPC overload errors.

Quickstart

from fastmcp import FastMCP
from mcp_backpressure import BackpressureMiddleware

mcp = FastMCP("MyServer")
mcp.add_middleware(BackpressureMiddleware(
    max_concurrent=5,      # Max parallel executions
    queue_size=10,         # Bounded queue for waiting requests
    queue_timeout=30.0,    # Queue wait timeout (seconds)
))

Installation

pip install mcp-backpressure

Features

  • Concurrency limiting: Semaphore-based control of parallel executions
  • Bounded queue: Optional FIFO queue with configurable size
  • Queue timeout: Automatic timeout for queued requests with cleanup
  • Structured errors: JSON-RPC compliant overload errors with detailed metrics
  • Metrics: Real-time counters for active, queued, and rejected requests
  • Callback hook: Optional notification on each overload event
  • Zero dependencies: Only requires FastMCP and Python 3.10+

Usage

Basic Configuration

from mcp_backpressure import BackpressureMiddleware

mcp.add_middleware(BackpressureMiddleware(
    max_concurrent=5,           # Required: max parallel tool executions
    queue_size=10,              # Optional: bounded queue (0 = no queue)
    queue_timeout=30.0,         # Optional: seconds to wait in queue
    overload_error_code=-32001, # Optional: JSON-RPC error code
    on_overload=callback,       # Optional: called on each overload
))

Parameters

Parameter Type Default Description
max_concurrent int required Maximum number of concurrent tool executions. Must be >= 1.
queue_size int 0 Maximum queue size for waiting requests. Set to 0 to reject immediately when limit reached.
queue_timeout float 30.0 Maximum time (seconds) a request can wait in queue before timing out. Must be > 0.
overload_error_code int -32001 JSON-RPC error code returned when server is overloaded.
on_overload Callable None Optional callback (error: OverloadError) -> None invoked on each overload.

Error Handling

When the server is overloaded, requests are rejected with a structured JSON-RPC error:

{
  "code": -32001,
  "message": "SERVER_OVERLOADED",
  "data": {
    "reason": "queue_full",
    "active": 5,
    "queued": 10,
    "max_concurrent": 5,
    "queue_size": 10,
    "queue_timeout_ms": 30000,
    "retry_after_ms": 1000
  }
}

Overload Reasons

Reason Description
concurrency_limit All execution slots full and no queue configured (queue_size=0)
queue_full All execution slots and queue slots are full
queue_timeout Request waited in queue longer than queue_timeout

Metrics

Get real-time metrics from the middleware:

metrics = middleware.get_metrics()  # Synchronous

print(f"Active: {metrics.active}")
print(f"Queued: {metrics.queued}")
print(f"Total rejected: {metrics.total_rejected}")
print(f"Rejected (concurrency): {metrics.rejected_concurrency_limit}")
print(f"Rejected (queue full): {metrics.rejected_queue_full}")
print(f"Rejected (timeout): {metrics.rejected_queue_timeout}")

For async contexts, use await middleware.get_metrics_async().

Callback Hook

Register a callback to be notified of each overload event:

def on_overload(error: OverloadError):
    print(f"OVERLOAD: {error.reason} (active={error.active})")
    # Log to monitoring system, update metrics, etc.

middleware = BackpressureMiddleware(
    max_concurrent=5,
    queue_size=10,
    on_overload=on_overload,
)

Examples

Simple Server

See examples/simple_server.py for a minimal FastMCP server with backpressure.

Load Simulation

Run examples/load_simulation.py to see backpressure behavior under heavy concurrent load:

python examples/load_simulation.py

This simulates 30 concurrent requests against a server limited to 5 concurrent executions with a queue of 10, demonstrating how the middleware handles overload.

How It Works

The middleware provides two-level limiting:

  1. Semaphore (max_concurrent): Controls active executions
  2. Bounded queue (queue_size): Holds waiting requests with timeout

Request flow:

  • If execution slot available → execute immediately
  • If execution slots full and queue not full → wait in queue with timeout
  • If queue full → reject with queue_full
  • If timeout in queue → reject with queue_timeout

Invariants (guaranteed under all conditions):

  • active <= max_concurrent ALWAYS
  • queued <= queue_size ALWAYS
  • Cancellation correctly frees slots and decrements counters
  • Queue timeout removes item from queue

Development

Running Tests

python -m pytest tests/ -v

Linting

ruff check src/ tests/

Design Rationale

This library emerged from python-sdk #1698 (closed as "not planned"). Key design decisions:

  • Global limits only (v0.1): Per-client and per-tool limits deferred to v0.2+
  • Simple counters: No Prometheus/OTEL dependencies by default
  • JSON-RPC errors: Follows MCP protocol conventions
  • Monotonic time: Queue timeouts use time.monotonic() for reliability

License

MIT

Contributing

Contributions welcome! Please open an issue before submitting PRs.

Changelog

See CHANGELOG.md

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mcp_backpressure-0.1.2.tar.gz (21.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mcp_backpressure-0.1.2-py3-none-any.whl (9.4 kB view details)

Uploaded Python 3

File details

Details for the file mcp_backpressure-0.1.2.tar.gz.

File metadata

  • Download URL: mcp_backpressure-0.1.2.tar.gz
  • Upload date:
  • Size: 21.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.12

File hashes

Hashes for mcp_backpressure-0.1.2.tar.gz
Algorithm Hash digest
SHA256 977780c27a749cfafe752a7c5c1467ef99108c86b4b15adafb0f7b045d08e79e
MD5 6884a0b94511b7d6a42ffcff00d81f06
BLAKE2b-256 430dff000df36ca1bd041591a11b011450a0e901af22f80d56963fdeafdeb5c9

See more details on using hashes here.

File details

Details for the file mcp_backpressure-0.1.2-py3-none-any.whl.

File metadata

File hashes

Hashes for mcp_backpressure-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 4d9d857d3bbc47010a90f4bab929087488d49eeb8cc7404d6117b432547faed6
MD5 c128b04baca76751c383e62948827a95
BLAKE2b-256 dd2a6b6899288b8ba28d3731342a8b99d9b32932d9a1d2b6dd6fdad46556a00f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page