Skip to main content

OAuth 2.0 Authorization Server framework for MCP (Model Context Protocol) servers

Project description

mcp-authflow

OAuth 2.0 Authorization Server framework for MCP servers. Issue and manage tokens that protect MCP tool access.

Pair with mcp-authflow-resource on the resource server side.

Features

  • Token storage with PostgreSQL and in-memory backends
  • RFC 6749 standardized OAuth error responses
  • RFC 7523 private_key_jwt client authentication with algorithm allowlist and JTI replay protection (Redis or in-memory)
  • Sliding-window rate limiting for token endpoints
  • Input validation for client IDs, scopes, and PKCE parameters
  • CORS helpers with origin allowlisting
  • Async-first design, built on Starlette

Installation

pip install mcp-authflow

# With PostgreSQL token storage (production)
pip install mcp-authflow[postgres]

Quick Start

Build an OAuth authorization server that issues tokens for MCP clients:

import secrets
import time
from contextlib import asynccontextmanager

from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Route

from mcp_authflow.rate_limiting import SlidingWindowRateLimiter
from mcp_authflow.responses import invalid_request, rate_limit_exceeded
from mcp_authflow.storage import MemoryTokenStorage
from mcp_authflow.validation import parse_scope_field, validate_client_id

# --- Setup ---

storage = MemoryTokenStorage()  # Use PostgresTokenStorage for production
limiter = SlidingWindowRateLimiter(requests_per_window=60, window_seconds=3600)


# --- Token endpoint ---

async def token_endpoint(request: Request) -> JSONResponse:
    form = await request.form()
    client_id = str(form.get("client_id", ""))

    # Rate limit per client
    if not limiter.is_allowed(client_id):
        return rate_limit_exceeded(
            "Too many requests",
            retry_after=limiter.get_retry_after(client_id),
        )

    # Validate client
    if not validate_client_id(client_id):
        return invalid_request("Invalid client_id format")

    # Issue token
    token = secrets.token_urlsafe(32)
    scopes = parse_scope_field(form.get("scope"))
    expires_at = int(time.time()) + 3600

    await storage.store_token(
        token=token,
        client_id=client_id,
        scopes=scopes.split(),
        expires_at=expires_at,
        resource=str(form.get("resource", "")),
    )

    return JSONResponse({
        "access_token": token,
        "token_type": "bearer",
        "expires_in": 3600,
        "scope": scopes,
    })


# --- Introspection endpoint (called by resource servers) ---

async def introspect_endpoint(request: Request) -> JSONResponse:
    form = await request.form()
    token = str(form.get("token", ""))

    token_data = await storage.load_token(token)
    if not token_data or token_data["expires_at"] < time.time():
        return JSONResponse({"active": False})

    return JSONResponse({
        "active": True,
        "client_id": token_data["client_id"],
        "scope": " ".join(token_data["scopes"]),
        "exp": token_data["expires_at"],
        "aud": token_data.get("resource", ""),
    })


@asynccontextmanager
async def lifespan(app):
    await storage.initialize()
    yield
    await storage.close()


app = Starlette(
    routes=[
        Route("/token", token_endpoint, methods=["POST"]),
        Route("/introspect", introspect_endpoint, methods=["POST"]),
    ],
    lifespan=lifespan,
)

Run with: uvicorn myapp:app --port 8000

Architecture

                         MCP Client (Claude, etc.)
                                |
                  1. Authorization request
                                |
                                v
                    +---------------------+
                    |   Auth Server        |   <-- this package
                    |   (mcp-authflow)  |
                    |                     |
                    |  /token             |   2. Issues access token
                    |  /introspect        |   4. Validates token
                    +---------------------+
                                ^
                                |
                     4. Token introspection (RFC 7662)
                                |
                    +---------------------+
                    |   Resource Server    |   <-- mcp-authflow-resource
                    |   (MCP tools)       |
                    |                     |
                    |  3. Client calls    |
                    |     MCP tools with  |
                    |     Bearer token    |
                    +---------------------+
  1. MCP client authenticates with the auth server
  2. Auth server issues an access token (stored in PostgreSQL or memory)
  3. Client calls MCP tools on the resource server with the Bearer token
  4. Resource server validates the token by calling the auth server's /introspect endpoint

API Reference

Token Storage

Abstract base class with two implementations:

from mcp_authflow.storage import MemoryTokenStorage, PostgresTokenStorage

# In-memory (development/testing)
storage = MemoryTokenStorage()

# PostgreSQL (production) -- requires `postgres` extra
storage = PostgresTokenStorage(database_url="postgresql://user:pass@host/db")
# Or reads DATABASE_URL env var if no argument provided
storage = PostgresTokenStorage()

await storage.initialize()  # Create tables / prepare connections

Storage interface:

Method Description
store_token(token, client_id, scopes, expires_at, resource?, user_id?) Store an access token
load_token(token) -> dict | None Look up a token
delete_token(token) Revoke a token
cleanup_expired_tokens() -> int Purge expired tokens, returns count
get_token_count() -> int Count active tokens
store_refresh_token(...) Store a refresh token (same interface)
load_refresh_token(token) -> dict | None Look up a refresh token
delete_refresh_token(token) Revoke a refresh token
cleanup_expired_refresh_tokens() -> int Purge expired refresh tokens, returns count

Token data returned by load_token():

{
    "token": str,
    "client_id": str,
    "scopes": list[str],
    "resource": str | None,       # RFC 8707 resource binding
    "expires_at": int,            # Unix timestamp
    "created_at": int,            # Unix timestamp
    "user_id": int | None,
}

OAuth Error Responses

Standardized error helpers following RFC 6749:

from mcp_authflow.responses import (
    invalid_request,       # 400 - Missing/invalid parameters
    invalid_client,        # 401 - Authentication failure
    invalid_grant,         # 400 - Expired/invalid code or token
    invalid_scope,         # 400 - Scope violation
    slow_down,             # 400 - Device flow rate limiting
    rate_limit_exceeded,   # 429 - Too many requests
    server_error,          # 500 - Internal error
    backend_timeout,       # 504 - Upstream timeout
)

Each returns a Starlette JSONResponse with the appropriate status code and Cache-Control: no-store header.

Rate Limiting

from mcp_authflow.rate_limiting import SlidingWindowRateLimiter

limiter = SlidingWindowRateLimiter(
    requests_per_window=60,   # Max requests per window
    window_seconds=3600,      # Window duration (1 hour)
)

if not limiter.is_allowed(client_id):
    retry_after = limiter.get_retry_after(client_id)  # Seconds until next allowed request

Input Validation

from mcp_authflow.validation import validate_client_id, parse_scope_field

validate_client_id("my-client-123")  # True (alphanumeric + hyphens/underscores)
validate_client_id("")               # False

parse_scope_field("read write")      # "read write"
parse_scope_field(["read", "write"]) # "read write"
parse_scope_field(None)              # "read" (default)

CORS

from mcp_authflow.cors import parse_allowed_origins, build_cors_headers

# Reads ALLOWED_MCP_ORIGINS env var (comma-separated)
origins = parse_allowed_origins()

# Returns CORS headers if request origin is in allowlist
headers = build_cors_headers(request, origins)

Configuration

Env Variable Description Default
DATABASE_URL PostgreSQL connection string (for PostgresTokenStorage) Required for postgres
ALLOWED_MCP_ORIGINS Comma-separated allowed CORS origins Empty (no CORS)

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mcp_authflow-0.5.0.tar.gz (183.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mcp_authflow-0.5.0-py3-none-any.whl (26.2 kB view details)

Uploaded Python 3

File details

Details for the file mcp_authflow-0.5.0.tar.gz.

File metadata

  • Download URL: mcp_authflow-0.5.0.tar.gz
  • Upload date:
  • Size: 183.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for mcp_authflow-0.5.0.tar.gz
Algorithm Hash digest
SHA256 b9a6002872932d97b4e5d7673b27ee93ebacedfafc0731facba1dfa12219762b
MD5 ffd224d59141192df20d67fff3e9d89f
BLAKE2b-256 4b8d10d0133fee53118c729c21e0a70f6f375172ea387105754902039bb0b4fb

See more details on using hashes here.

Provenance

The following attestation bundles were made for mcp_authflow-0.5.0.tar.gz:

Publisher: publish.yml on brooksmcmillin/mcp-authflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file mcp_authflow-0.5.0-py3-none-any.whl.

File metadata

  • Download URL: mcp_authflow-0.5.0-py3-none-any.whl
  • Upload date:
  • Size: 26.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for mcp_authflow-0.5.0-py3-none-any.whl
Algorithm Hash digest
SHA256 22ac4d40101459e374619f8504b03c48ae3017199091546f66ed36e4c71b914a
MD5 dc9d5ec95dec7cb9a1dd0128e0927e28
BLAKE2b-256 f199532da7de5d1d316b5baf8e96e0ad1563f5cf1b785da86ae3a48cc611b3f2

See more details on using hashes here.

Provenance

The following attestation bundles were made for mcp_authflow-0.5.0-py3-none-any.whl:

Publisher: publish.yml on brooksmcmillin/mcp-authflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page