Skip to main content

OAuth 2.0 Authorization Server framework for MCP (Model Context Protocol) servers

Project description

mcp-authflow

OAuth 2.0 Authorization Server framework for MCP servers. Issue and manage tokens that protect MCP tool access.

Pair with mcp-authflow-resource on the resource server side.

Features

  • Token storage with PostgreSQL and in-memory backends
  • RFC 6749 standardized OAuth error responses
  • Sliding-window rate limiting for token endpoints
  • Input validation for client IDs, scopes, and PKCE parameters
  • CORS helpers with origin allowlisting
  • Async-first design, built on Starlette

Installation

pip install mcp-authflow

# With PostgreSQL token storage (production)
pip install mcp-authflow[postgres]

Quick Start

Build an OAuth authorization server that issues tokens for MCP clients:

import secrets
import time
from contextlib import asynccontextmanager

from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Route

from mcp_authflow.rate_limiting import SlidingWindowRateLimiter
from mcp_authflow.responses import invalid_request, rate_limit_exceeded
from mcp_authflow.storage import MemoryTokenStorage
from mcp_authflow.validation import parse_scope_field, validate_client_id

# --- Setup ---

storage = MemoryTokenStorage()  # Use PostgresTokenStorage for production
limiter = SlidingWindowRateLimiter(requests_per_window=60, window_seconds=3600)


# --- Token endpoint ---

async def token_endpoint(request: Request) -> JSONResponse:
    form = await request.form()
    client_id = str(form.get("client_id", ""))

    # Rate limit per client
    if not limiter.is_allowed(client_id):
        return rate_limit_exceeded(
            "Too many requests",
            retry_after=limiter.get_retry_after(client_id),
        )

    # Validate client
    if not validate_client_id(client_id):
        return invalid_request("Invalid client_id format")

    # Issue token
    token = secrets.token_urlsafe(32)
    scopes = parse_scope_field(form.get("scope"))
    expires_at = int(time.time()) + 3600

    await storage.store_token(
        token=token,
        client_id=client_id,
        scopes=scopes.split(),
        expires_at=expires_at,
        resource=str(form.get("resource", "")),
    )

    return JSONResponse({
        "access_token": token,
        "token_type": "bearer",
        "expires_in": 3600,
        "scope": scopes,
    })


# --- Introspection endpoint (called by resource servers) ---

async def introspect_endpoint(request: Request) -> JSONResponse:
    form = await request.form()
    token = str(form.get("token", ""))

    token_data = await storage.load_token(token)
    if not token_data or token_data["expires_at"] < time.time():
        return JSONResponse({"active": False})

    return JSONResponse({
        "active": True,
        "client_id": token_data["client_id"],
        "scope": " ".join(token_data["scopes"]),
        "exp": token_data["expires_at"],
        "aud": token_data.get("resource", ""),
    })


@asynccontextmanager
async def lifespan(app):
    await storage.initialize()
    yield
    await storage.close()


app = Starlette(
    routes=[
        Route("/token", token_endpoint, methods=["POST"]),
        Route("/introspect", introspect_endpoint, methods=["POST"]),
    ],
    lifespan=lifespan,
)

Run with: uvicorn myapp:app --port 8000

Architecture

                         MCP Client (Claude, etc.)
                                |
                  1. Authorization request
                                |
                                v
                    +---------------------+
                    |   Auth Server        |   <-- this package
                    |   (mcp-authflow)  |
                    |                     |
                    |  /token             |   2. Issues access token
                    |  /introspect        |   4. Validates token
                    +---------------------+
                                ^
                                |
                     4. Token introspection (RFC 7662)
                                |
                    +---------------------+
                    |   Resource Server    |   <-- mcp-authflow-resource
                    |   (MCP tools)       |
                    |                     |
                    |  3. Client calls    |
                    |     MCP tools with  |
                    |     Bearer token    |
                    +---------------------+
  1. MCP client authenticates with the auth server
  2. Auth server issues an access token (stored in PostgreSQL or memory)
  3. Client calls MCP tools on the resource server with the Bearer token
  4. Resource server validates the token by calling the auth server's /introspect endpoint

API Reference

Token Storage

Abstract base class with two implementations:

from mcp_authflow.storage import MemoryTokenStorage, PostgresTokenStorage

# In-memory (development/testing)
storage = MemoryTokenStorage()

# PostgreSQL (production) -- requires `postgres` extra
storage = PostgresTokenStorage(database_url="postgresql://user:pass@host/db")
# Or reads DATABASE_URL env var if no argument provided
storage = PostgresTokenStorage()

await storage.initialize()  # Create tables / prepare connections

Storage interface:

Method Description
store_token(token, client_id, scopes, expires_at, resource?, user_id?) Store an access token
load_token(token) -> dict | None Look up a token
delete_token(token) Revoke a token
cleanup_expired_tokens() -> int Purge expired tokens, returns count
get_token_count() -> int Count active tokens
store_refresh_token(...) Store a refresh token (same interface)
load_refresh_token(token) -> dict | None Look up a refresh token
delete_refresh_token(token) Revoke a refresh token
cleanup_expired_refresh_tokens() -> int Purge expired refresh tokens, returns count

Token data returned by load_token():

{
    "token": str,
    "client_id": str,
    "scopes": list[str],
    "resource": str | None,       # RFC 8707 resource binding
    "expires_at": int,            # Unix timestamp
    "created_at": int,            # Unix timestamp
    "user_id": int | None,
}

OAuth Error Responses

Standardized error helpers following RFC 6749:

from mcp_authflow.responses import (
    invalid_request,       # 400 - Missing/invalid parameters
    invalid_client,        # 401 - Authentication failure
    invalid_grant,         # 400 - Expired/invalid code or token
    invalid_scope,         # 400 - Scope violation
    slow_down,             # 400 - Device flow rate limiting
    rate_limit_exceeded,   # 429 - Too many requests
    server_error,          # 500 - Internal error
    backend_timeout,       # 504 - Upstream timeout
)

Each returns a Starlette JSONResponse with the appropriate status code and Cache-Control: no-store header.

Rate Limiting

from mcp_authflow.rate_limiting import SlidingWindowRateLimiter

limiter = SlidingWindowRateLimiter(
    requests_per_window=60,   # Max requests per window
    window_seconds=3600,      # Window duration (1 hour)
)

if not limiter.is_allowed(client_id):
    retry_after = limiter.get_retry_after(client_id)  # Seconds until next allowed request

Input Validation

from mcp_authflow.validation import validate_client_id, parse_scope_field

validate_client_id("my-client-123")  # True (alphanumeric + hyphens/underscores)
validate_client_id("")               # False

parse_scope_field("read write")      # "read write"
parse_scope_field(["read", "write"]) # "read write"
parse_scope_field(None)              # "read" (default)

CORS

from mcp_authflow.cors import parse_allowed_origins, build_cors_headers

# Reads ALLOWED_MCP_ORIGINS env var (comma-separated)
origins = parse_allowed_origins()

# Returns CORS headers if request origin is in allowlist
headers = build_cors_headers(request, origins)

Configuration

Env Variable Description Default
DATABASE_URL PostgreSQL connection string (for PostgresTokenStorage) Required for postgres
ALLOWED_MCP_ORIGINS Comma-separated allowed CORS origins Empty (no CORS)

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mcp_authflow-0.3.0.tar.gz (134.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mcp_authflow-0.3.0-py3-none-any.whl (16.7 kB view details)

Uploaded Python 3

File details

Details for the file mcp_authflow-0.3.0.tar.gz.

File metadata

  • Download URL: mcp_authflow-0.3.0.tar.gz
  • Upload date:
  • Size: 134.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for mcp_authflow-0.3.0.tar.gz
Algorithm Hash digest
SHA256 3b12432de10b4e2e7c8e78d0abba34ca09edc96035678bbcb655d014f129978b
MD5 98442afe76a6542b1b1e2e230b83da6b
BLAKE2b-256 276e68316abf39ca8a91e009fd311e34772fe0759beb3bd5029f533523d27762

See more details on using hashes here.

Provenance

The following attestation bundles were made for mcp_authflow-0.3.0.tar.gz:

Publisher: publish.yml on brooksmcmillin/mcp-authflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file mcp_authflow-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: mcp_authflow-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 16.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for mcp_authflow-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 a6b2c0821e2f52cd23db192e57965e66078b0c37521d4bec39fe41e279ed9b6f
MD5 6865218877ea83eb5eea572f794e96e1
BLAKE2b-256 3c5010f50fa9a98d2acc994ffb2108364572604b8469713ad8e9f20106819272

See more details on using hashes here.

Provenance

The following attestation bundles were made for mcp_authflow-0.3.0-py3-none-any.whl:

Publisher: publish.yml on brooksmcmillin/mcp-authflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page