Skip to main content

Async connection pool for Model Context Protocol (MCP) client sessions — keep sessions warm, reuse across requests, auto-reconnect on failure.

Project description

mcpool

PyPI version PyPI Downloads Python 3.10+ License: MIT Tests

Async connection pool for Model Context Protocol (MCP) client sessions. mcpool keeps sessions warm, reuses them across requests, collapses concurrent tools/list refreshes, and hardens pool behavior with retries, a circuit breaker, proactive recycling, custom transports, dynamic auth, and optional OpenTelemetry hooks.

Why mcpool

Without pooling, every MCP call pays connection setup cost:

Request -> TCP connect -> TLS handshake -> MCP initialize -> tools/list -> tools/call -> close

With mcpool, warm sessions are reused:

App startup -> pool.start() -> N sessions ready
Request -> pool.session() -> tools/call -> return session
Shutdown -> pool.shutdown() -> drain + close

That removes repeated setup work, reduces tools/list churn, and gives you a safer operational model around endpoint failures.

Installation

pip install mcp-pool

Optional extras:

pip install "mcp-pool[otel]"       # OpenTelemetry spans/metrics
pip install "mcp-pool[aws]"        # CloudWatch metrics publishing
pip install "mcp-pool[langchain]"  # LangChain Tool integration
pip install "mcp-pool[oauth]"      # OAuth 2.1 with PKCE support
pip install "mcp-pool[docs]"       # mkdocs documentation

Quickstart

import asyncio
from mcpool import MCPPool, PoolConfig


async def main() -> None:
    config = PoolConfig(
        endpoint="http://gateway.example.com/mcp",
        transport="streamable_http",
        min_sessions=2,
        max_sessions=10,
        tool_cache_ttl_s=300,
        health_check_interval_s=30,
        max_session_lifetime_s=3600,
        recycle_window_s=60,
        borrow_timeout_s=2,
        retry_count=2,
        auth="your-api-key",
        enable_opentelemetry=True,
    )

    async with MCPPool(config=config) as pool:
        # Wait for pool readiness
        await pool.wait_ready(timeout_s=10)

        # Borrow a session
        async with pool.session(headers={"X-Request-ID": "req-123"}) as session:
            result = await session.call_tool("my_tool", arguments={"key": "value"})
            print(result)

        # One-liner convenience
        result = await pool.call_tool("my_tool", arguments={"key": "value"})

        # Non-blocking borrow
        async with pool.try_session() as session:
            if session is not None:
                await session.call_tool("best_effort_tool")

        # Cached tool list
        tools = await pool.list_tools()
        print(f"Tools: {[tool.name for tool in tools.tools]}")
        print(pool.metrics.snapshot())


asyncio.run(main())

What v1.0.0 Adds

Area Change
Rate Limiting Token-bucket rate limiter with burst support and backoff awareness
Adaptive Sizing Background auto-scaler that dynamically adjusts pool capacity based on EWMA wait time and utilization
Tenant Caps Provide QoS and prevent noisy neighbors with per-tenant concurrency limits
Multi-Endpoint MCPPoolManager to manage multiple named pools with failover and load balancing

What v0.4.0 Adds

Area Change
OAuth 2.1 OAuthConfig + OAuthProvider with PKCE, OIDC discovery, background refresh
Health Probes health_probe callback replaces default list_tools() ping
Warmup Hooks warmup_hook runs custom setup after each session initialize()
Schema Detection Hash-based tool schema change detection with on_schema_changed hook

What v0.3.0 Adds

Area Change
AWS Auth auth_provider for dynamic SigV4/IAM credentials without library forks
Custom Transports transport_factory to plug in any transport (e.g. aws_iam_streamablehttp_client)
Per-Request Headers Headers passed to pool.session(headers=...) propagate to HTTP transport
LangChain pool.langchain_tools() returns LangChain Tool objects from cached definitions
Readiness pool.wait_ready() / pool.is_ready for health-check gating
Convenience API pool.call_tool("name", args) one-liner for borrow→call→return
CloudWatch pool.metrics.publish(namespace="ECC/MCPPool") pushes to CloudWatch
Runtime Resize pool.resize(min=5, max=20) adjusts capacity without restart
Session Affinity pool.session(affinity_key="site-1234") for sticky routing
Graceful Degradation Ephemeral fallback + background recovery on Gateway failure
Prometheus pool.metrics.to_prometheus() for Prometheus-compatible export
Introspection pool.debug_snapshot() for per-session diagnostics

Core API

MCPPool

pool = MCPPool(config=PoolConfig(endpoint="http://..."))
# or
pool = MCPPool(endpoint="http://...", min_sessions=2, max_sessions=10)
Method Description
await pool.start() Pre-warm min_sessions and start the health checker
await pool.shutdown() Drain in-flight work and close sessions
async with pool.session(headers=..., affinity_key=...) as session: Borrow a session, waiting up to borrow_timeout_s
async with pool.try_session(headers=...) as session: Borrow immediately or yield None if no capacity is free
await pool.call_tool("name", args, headers=...) Borrow → call → return in one line
await pool.list_tools() Return cached tools, refreshing with single-flight behavior when stale
pool.invalidate_tools_cache() Force the next list_tools() call to refresh
pool.langchain_tools() Convert cached tools to LangChain Tool objects
await pool.wait_ready(timeout_s=10) Block until min_sessions are warmed — returns True/False
pool.is_ready True when the pool is fully warmed and usable
pool.is_degraded True if in degraded (ephemeral fallback) mode
await pool.resize(min_sessions=5, max_sessions=20) Adjust pool capacity at runtime
pool.debug_snapshot() Per-session age, idle time, borrow count, affinity key
pool.metrics.snapshot() Return a metrics dictionary
pool.metrics.to_prometheus() Return Prometheus-compatible metrics dict
pool.metrics.publish(namespace="ECC/MCPPool") Push metrics to CloudWatch

MCPPoolManager

from mcpool import MCPPoolManager
manager = MCPPoolManager()
Method Description
manager.add("name", PoolConfig(...)) Add a pool to the manager
await manager.remove("name") Remove and shutdown a pool
async with manager.session(strategy=...) as session: Borrow using round_robin, failover, or least_connections
await manager.call_tool("tool", args, strategy=...) Borrow, call, return
manager.metrics_snapshot() Aggregate metrics for all managed pools
await manager.start() / shutdown() Control lifecycle for all pools

PoolConfig Highlights

Parameter Default Description
min_sessions 2 Sessions to pre-warm on startup
max_sessions 10 Hard cap on concurrent sessions
tool_cache_ttl_s 300.0 TTL for cached tools/list responses
health_check_interval_s 30.0 Seconds between idle-session health sweeps
max_session_lifetime_s 3600.0 Hard lifetime for a session
recycle_window_s 30.0 Recycle idle sessions this long before lifetime expiry
connect_timeout_s 10.0 Timeout for a single session creation attempt
borrow_timeout_s connect_timeout_s Timeout for waiting on pool capacity
retry_count 2 Retries for session creation after the first failure
retry_base_delay_s 0.1 Base delay for exponential retry backoff
retry_max_delay_s 2.0 Maximum retry delay
failure_threshold 5 Consecutive failures before the circuit opens
recovery_timeout_s 30.0 Time before the circuit allows a half-open probe
enable_opentelemetry False Enable OTel spans/metrics if opentelemetry-api is installed
auth None Static auth token (Bearer)
auth_provider None Async callable returning token or header dict
transport_factory None Custom transport creation callback
graceful_degradation False Ephemeral fallback on warmup failure
oauth None OAuthConfig for OAuth 2.1 (mutually exclusive with auth_provider)
warmup_hook None Async callback run on each session after initialize()
health_probe None Custom health check — replaces default list_tools() ping
rate_limit None RateLimiterConfig for token-bucket rate limits
autoscaler None AutoScalerConfig for dynamic pool sizing
tenant_limiter None TenantLimiterConfig for per-tenant concurrency caps

Feature Deep-Dives

Multi-Endpoint Pool Manager

MCPPoolManager orchestrates multiple pools across backends, supporting failover, round-robin, and least-connections routing:

from mcpool import MCPPoolManager, PoolConfig

manager = MCPPoolManager()
manager.add("us-east-1", PoolConfig(endpoint="https://us-east-1.example.mcp"))
manager.add("eu-west-1", PoolConfig(endpoint="https://eu-west-1.example.mcp"))

await manager.start()

# Automatic proxying to the optimal endpoint:
# fallback, round_robin, or least_connections
result = await manager.call_tool(
    "query", 
    {"data": "xyz"},
    strategy="least_connections" 
)

Rate Limiting

Apply strict or burstable global rate limits to your pool via a Token Bucket algorithm. It's safe against concurrency and can optionally react to server Throttling indicators.

from mcpool import MCPPool, PoolConfig, RateLimiterConfig

pool = MCPPool(config=PoolConfig(
    endpoint="https://mcp.example.com",
    # Limit to 100 requests per second with a burst of up to 50
    rate_limit=RateLimiterConfig(
        requests_per_second=100.0,
        burst_size=50,
    )
))

Adaptive Pool Sizing (Autoscaler)

Save resources by having the pool automatically resize() when load fluctuates.

from mcpool import MCPPool, PoolConfig, AutoScalerConfig

pool = MCPPool(config=PoolConfig(
    endpoint="https://mcp.example.com",
    min_sessions=2,
    max_sessions=50,
    autoscaler=AutoScalerConfig(
        enabled=True,
        # Scale UP if average wait time drops below this threshold
        scale_up_threshold_ms=30.0,
        # Scale DOWN if sessions remain idle this long
        scale_down_idle_s=600.0,
        # Lockout period to prevent oscillation
        cooldown_s=30.0,
    )
))

Per-Tenant Concurrency Caps

In multi-tenant setups, prevent one noisy tenant from starving the entire pool while providing fair quality-of-service limits.

from mcpool import MCPPool, PoolConfig, TenantLimiterConfig

pool = MCPPool(config=PoolConfig(
    endpoint="https://mcp.example.com",
    tenant_limiter=TenantLimiterConfig(
        # Never let one tenant consume more than 5 parallel sessions
        max_concurrent_per_tenant=5,
        # Derive tenant key from an HTTP Header...
        tenant_key_header="X-Tenant-ID",
        # ... or it falls back to the `affinity_key`
    )
))

# Fast rejects if "TenantA" goes over their quota
async with pool.session(headers={"X-Tenant-ID": "TenantA"}) as session:
    await session.call_tool("heavy_query")

Custom Transport Factory

Plug in any MCP transport without forking the library:

from mcpool import MCPPool, PoolConfig

async def aws_iam_transport(endpoint: str, headers: dict[str, str]):
    """Use AWS IAM-signed Streamable HTTP client."""
    from aws_agents_mcp import aws_iam_streamablehttp_client

    transport_ctx = aws_iam_streamablehttp_client(endpoint)
    read, write, _ = await transport_ctx.__aenter__()

    from mcp import ClientSession
    session_ctx = ClientSession(read, write)
    session = await session_ctx.__aenter__()
    await session.initialize()

    return read, write, transport_ctx, session_ctx, session

pool = MCPPool(config=PoolConfig(
    endpoint="https://agentcore.us-east-1.amazonaws.com/mcp",
    transport_factory=aws_iam_transport,
))

Dynamic Auth Provider (SigV4 / IAM)

Replace static tokens with dynamic credential resolution:

async def sigv4_provider() -> dict[str, str]:
    """Sign requests with AWS SigV4."""
    import boto3
    from botocore.auth import SigV4Auth
    # ... signing logic ...
    return {
        "Authorization": "AWS4-HMAC-SHA256 ...",
        "X-Amz-Date": "20260323T000000Z",
        "X-Amz-Security-Token": "...",
    }

pool = MCPPool(config=PoolConfig(
    endpoint="https://gateway.example.com/mcp",
    auth_provider=sigv4_provider,
))

Per-Request Headers at Transport Level

Headers are propagated to the actual HTTP request, not just stored:

async with pool.session(headers={"X-Access-Token": "eyJ..."}) as session:
    # X-Access-Token is sent on the HTTP tools/call request
    result = await session.call_tool("invoke_backend_api")

LangChain Integration

from mcpool import MCPPool, PoolConfig
from langchain.agents import create_react_agent

async with MCPPool(config=PoolConfig(endpoint="http://...")) as pool:
    await pool.list_tools()  # Warm the cache
    tools = pool.langchain_tools()
    agent = create_react_agent(llm, tools)

Pool Readiness

async with MCPPool(config=config) as pool:
    # Use as a health check endpoint
    if not pool.is_ready:
        return {"status": "unhealthy"}

    # Or block until ready
    ready = await pool.wait_ready(timeout_s=30)
    if not ready:
        raise RuntimeError("Pool warmup timed out")

Call Tool Convenience

# Instead of:
async with pool.session() as session:
    result = await session.call_tool("my_tool", arguments={"key": "val"})

# Just:
result = await pool.call_tool("my_tool", {"key": "val"})

# With per-call headers:
result = await pool.call_tool("my_tool", {"key": "val"}, headers={"X-Trace": "abc"})

Runtime Resize

# Scale up during peak hours
await pool.resize(min_sessions=10, max_sessions=50)

# Scale down after peak
await pool.resize(min_sessions=2, max_sessions=10)

Session Affinity

# Route same tenant to same session for server-side state benefit
async with pool.session(affinity_key="tenant-42") as session:
    await session.call_tool("get_context")

# Next request with same key reuses the same session
async with pool.session(affinity_key="tenant-42") as session:
    await session.call_tool("invoke_tool")

Graceful Degradation

pool = MCPPool(config=PoolConfig(
    endpoint="https://gateway.example.com/mcp",
    graceful_degradation=True,
    min_sessions=5,
))
async with pool:
    # If Gateway is unreachable:
    # 1. Pool enters degraded mode
    # 2. Ephemeral per-request sessions are created
    # 3. Background recovery retries warmup
    # 4. When recovery succeeds, pool.is_degraded -> False

    async with pool.session() as session:
        await session.call_tool("my_tool")  # Works even in degraded mode

CloudWatch Metrics

# Push to CloudWatch
pool.metrics.publish(
    namespace="ECC/MCPPool",
    dimensions={"Environment": "production"},
    region_name="us-east-1",
)

Prometheus Export

prom = pool.metrics.to_prometheus()
# {"mcpool_active_sessions": 3, "mcpool_borrow_total": 42, ...}

Debug Snapshot

for info in pool.debug_snapshot():
    print(f"{info['session_id']}: {info['state']}, age={info['age_s']}s, "
          f"borrows={info['borrow_count']}, affinity={info['affinity_key']}")

Event Hooks

PoolConfig.event_hooks accepts async or sync callbacks for:

  • on_session_created
  • on_session_destroyed
  • on_borrow
  • on_return
  • on_health_check_failed
  • on_circuit_open
  • on_circuit_close
  • on_schema_changed
  • on_rate_limited
  • on_autoscale

OAuth 2.1 Authentication

from mcpool import MCPPool, PoolConfig, OAuthConfig

pool = MCPPool(config=PoolConfig(
    endpoint="https://mcp.example.com",
    oauth=OAuthConfig(
        client_id="my-client",
        client_secret="my-secret",
        token_endpoint="https://auth.example.com/token",
        scopes=["mcp:read", "mcp:write"],
    ),
))

Or with OIDC discovery:

pool = MCPPool(config=PoolConfig(
    endpoint="https://mcp.example.com",
    oauth=OAuthConfig(
        client_id="my-client",
        discovery_url="https://auth.example.com/.well-known/oauth-authorization-server",
    ),
))

Custom Health Probes

async def my_health_check(session) -> bool:
    """Custom readiness check."""
    try:
        result = await session.call_tool("health_ping")
        return result is not None
    except Exception:
        return False

pool = MCPPool(config=PoolConfig(
    endpoint="https://mcp.example.com",
    health_probe=my_health_check,
))

Warmup Hooks

async def warmup(session):
    """Pre-load context after session init."""
    await session.call_tool("load_context", {"scope": "global"})

pool = MCPPool(config=PoolConfig(
    endpoint="https://mcp.example.com",
    warmup_hook=warmup,
))

Schema Change Detection

from mcpool import MCPPool, PoolConfig, EventHooks

async def on_schema_changed(payload):
    print(f"Tool schema changed! Hash: {payload['schema_hash']}")
    # Invalidate downstream caches, re-sync agents, etc.

pool = MCPPool(config=PoolConfig(
    endpoint="https://mcp.example.com",
    event_hooks=EventHooks(on_schema_changed=on_schema_changed),
))

## Metrics

`pool.metrics.snapshot()` includes resilience, cache, and degradation fields:

```python
{
    "active": 1,
    "idle": 4,
    "total": 5,
    "borrow_count": 42,
    "avg_borrow_wait_s": 0.003,
    "return_count": 42,
    "cache_hits": 38,
    "cache_misses": 4,
    "cache_refresh_count": 4,
    "cache_waiters": 7,
    "cache_hit_rate": 0.905,
    "reconnect_count": 1,
    "retry_attempts": 2,
    "health_check_count": 120,
    "health_check_failures": 1,
    "recycled_count": 3,
    "sessions_created": 8,
    "sessions_destroyed": 3,
    "errors": 0,
    "circuit_state": "closed",
    "degraded": False,
    "rate_limit_waits": 2,
    "rate_limit_rejects": 0,
    "tenant_rejects": 1,
    "autoscale_ups": 3,
    "autoscale_downs": 1,
    "uptime_s": 3600.5,
}

Observability

When OTel is enabled, mcpool emits spans for:

  • pool.borrow
  • pool.return
  • session.create
  • session.close
  • list_tools

And OTel metrics for:

  • pool.active_sessions
  • pool.idle_sessions
  • pool.borrow_wait_seconds
  • pool.errors_total

Documentation

The repository includes a mkdocs site in docs/ covering:

  • Quickstart
  • Configuration reference
  • Migration guide for 0.1.0 -> 0.2.0 -> 0.3.0 -> 0.4.0
  • Architecture
  • Deployment patterns
  • Performance tuning

Build it locally with:

mkdocs serve

Development

git clone https://github.com/adwantg/mcp-pool.git
cd mcp-pool
pip install -e ".[dev]"
pytest -v
pytest tests/benchmarks tests/fuzz -v --no-cov
pytest tests/performance -v --no-cov
ruff check src/ tests/
mypy src/
mkdocs build --strict

License

MIT — see LICENSE.

Citation

If you use mcpool in research, please cite:

@software{adwant_mcpool_2025,
  author = {Goutam Adwant},
  title = {mcpool: Async Connection Pool for MCP Sessions},
  year = {2025},
  url = {https://github.com/adwantg/mcp-pool}
}

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mcp_pool-1.0.0.tar.gz (61.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mcp_pool-1.0.0-py3-none-any.whl (43.9 kB view details)

Uploaded Python 3

File details

Details for the file mcp_pool-1.0.0.tar.gz.

File metadata

  • Download URL: mcp_pool-1.0.0.tar.gz
  • Upload date:
  • Size: 61.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.7

File hashes

Hashes for mcp_pool-1.0.0.tar.gz
Algorithm Hash digest
SHA256 2623eaa901d587deaeab959f055700245deda8f1e1c35d8f05120c308818484c
MD5 bb7d3ac3f2a0fc9aa2f1fe314c1b2dc3
BLAKE2b-256 50e45424779ae84f39dc14ab27c07ebd49e50c7e7e3f3e16ede56e670922a6a8

See more details on using hashes here.

File details

Details for the file mcp_pool-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: mcp_pool-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 43.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.7

File hashes

Hashes for mcp_pool-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 e190fad5045c7a279bd724eb62c88a16ed726246fe657a4d2b9e0958f557291c
MD5 294a3280d10b6a0c514cff73cb51904a
BLAKE2b-256 ff0ffdb831d48db88c985bb775aeb7f74e0c67200829a377f5aa19c31b6caab4

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page