Skip to main content

Official Python SDK for AuthPI identity provider

Project description

authpi-idp

Official Python SDK for AuthPI identity provider.

Requires Python 3.11+

Installation

pip install authpi-idp
# or
poetry add authpi-idp
# or
uv add authpi-idp

Quick Start

from authpi_idp import IdpClient

async def main():
    idp = IdpClient(
        issuer_url="https://idp.authpi.com/iss_xxx",
        client_id="cli_xxx",
        client_secret="secret",  # omit for public clients (SPAs)
        redirect_uri="https://app.example.com/callback",
    )

    # 1. Create authorization URL (sync - no await needed)
    auth = idp.create_authorization_url(
        scopes=["openid", "profile", "email"],
        # org="org_xxx",  # optional selected-org token restriction
    )

    # 2. Store code_verifier, state, and nonce in session, redirect user to auth.url
    session["oauth"] = {"code_verifier": auth.code_verifier, "state": auth.state, "nonce": auth.nonce}
    # redirect(auth.url)

    # 3. Handle callback - exchange code for authenticated agent
    agent = await idp.exchange_code(code, auth.code_verifier)

    # 4. Store tokens for future requests
    session["tokens"] = agent.tokens.model_dump()

    # 5. Check authorization
    if agent.has_access_in("org_xxx", "write", "projects"):
        # User can write to projects in org_xxx
        pass

Session Management

The SDK automatically refreshes expired tokens when creating an agent from stored tokens:

from authpi_idp import IdpClient

idp = IdpClient(
    issuer_url="https://idp.authpi.com/iss_xxx",
    client_id="cli_xxx",
    redirect_uri="https://app.example.com/callback",
)

# Load tokens from your session store (dict or TokenSet)
tokens = session.get("tokens")

# create_agent() automatically refreshes if tokens are expired
agent = await idp.create_agent(
    tokens,
    # Called when tokens are refreshed - persist the new tokens (receives a dict)
    on_refresh=lambda new_tokens: session.update({"tokens": new_tokens}),
    # Called when refresh fails - handle the error
    on_refresh_error=lambda error: print(f"Session expired: {error}"),
)

Configuring Auto-Refresh

idp = IdpClient(
    issuer_url="https://idp.authpi.com/iss_xxx",
    client_id="cli_xxx",
    redirect_uri="https://app.example.com/callback",
    auto_refresh=True,        # Default: True
    refresh_buffer_seconds=60, # Refresh 60s before expiry (default)
)

# Or disable per-call
agent = await idp.create_agent(tokens, auto_refresh=False)

Token Expiration

# Check expiration
agent.expires_at              # Unix timestamp
agent.expires_in              # Seconds until expiry (negative if expired)
agent.is_expired()            # True if expires within 30 seconds (default clock skew buffer)
agent.is_expired(60)          # True if expires within 60 seconds
agent.is_expired(0)           # True only if actually expired (no buffer)

Machine-to-Machine Authentication

For server-to-server or background service authentication using the client credentials flow:

from authpi_idp import IdpClient, PrincipalType

idp = IdpClient(
    issuer_url="https://idp.authpi.com/iss_xxx",
    client_id="agt_machine1",
    client_secret="secret",
    # No redirect_uri needed for client_credentials
)

agent = await idp.client_credentials(scopes=["users:read", "users:write"])

# Agent uses token-level scopes (no organizations)
agent.has_access("read", "users")  # True
agent.type  # PrincipalType.AGENT

Sync Support

For non-async contexts, use the sync client:

from authpi_idp import IdpClientSync

idp = IdpClientSync(
    issuer_url="https://idp.authpi.com/iss_xxx",
    client_id="cli_xxx",
    redirect_uri="https://app.example.com/callback",
)

auth = idp.create_authorization_url(scopes=["openid", "profile", "email"])
agent = idp.exchange_code(code, auth.code_verifier)

Authorization

The SDK includes an optional authorization framework based on scopes. You can use it to make local authorization decisions without additional API calls, or you can ignore it entirely and implement your own authorization logic.

The authorization data comes from the organizations claim in the ID token, which AuthPI populates based on your organization memberships, optional selected-org requests, and client organization restrictions. When you request a selected-org token with org, agent.org_id identifies the selected organization.

Using the Built-in Authorization

# Check access across all organizations
agent.has_access("read", "users")

# Check access in a specific organization
agent.has_access_in("org_xxx", "write", "projects")
agent.has_access_in("org_xxx", "delete", "projects.tasks")

# Role checks
agent.is_owner_of("org_xxx")   # Has "owner" scope
agent.is_admin_of("org_xxx")   # Has "admin" scope
agent.is_member_of("org_xxx")  # Has any membership

# Get scopes for an organization
scopes = agent.get_scopes_for("org_xxx")
# ["users:read", "projects:**"]

Rolling Your Own Authorization

If the built-in scope system doesn't fit your needs, you can access the raw data directly:

# Access organizations directly
for org in agent.organizations:
    print(org.id)       # "org_xxx"
    print(org.title)    # "Admin" or None
    print(org.scopes)   # ["users:read", "projects:**"]
    print(org.joined_at) # Unix timestamp

# Use agent.id for your own authorization lookups
permissions = my_permission_service.get_permissions(agent.id)

How Scopes Work

AuthPI uses a hierarchical scope format: resource:action

Basic format:

resource:action
resource.subresource:action

Examples:

  • users:read — Can read users
  • users:write — Can create/update users
  • projects.tasks:delete — Can delete tasks within projects

Wildcards:

Pattern Description
users:* All actions on users (but not sub-resources)
users:** All actions on users AND all sub-resources
*:read Read access to all top-level resources
*:** Full access to everything (super-admin)

The difference between * and **:

  • projects:* grants projects:read, projects:write, projects:delete
  • projects:* does NOT grant projects.tasks:read (sub-resource)
  • projects:** grants all of the above PLUS projects.tasks:read, projects.tasks.comments:write, etc.

Scope evaluation example:

# User has scopes: ["projects:**", "users:read"]

# These all return True:
agent.has_access_in("org_xxx", "read", "projects")
agent.has_access_in("org_xxx", "write", "projects")
agent.has_access_in("org_xxx", "delete", "projects.tasks")
agent.has_access_in("org_xxx", "read", "projects.tasks.comments")
agent.has_access_in("org_xxx", "read", "users")

# These return False:
agent.has_access_in("org_xxx", "write", "users")      # Only has users:read
agent.has_access_in("org_xxx", "read", "billing")     # No billing scope

Special Role Scopes

Three scopes have special meaning and dedicated helper methods:

Scope Method Typical Use
owner is_owner_of(org_id) Organization billing, deletion, transfer
admin is_admin_of(org_id) Member management, settings
member is_member_of(org_id) Basic membership check

These are checked directly (not via wildcard expansion):

# User has scopes: ["owner", "admin", "projects:**"]
agent.is_owner_of("org_xxx")  # True - has "owner" scope
agent.is_admin_of("org_xxx")  # True - has "admin" scope
agent.is_member_of("org_xxx") # True - has any membership

# Note: "*:**" does NOT grant owner/admin status
# These are explicit role assignments, not permissions

Scope Utilities

For advanced use cases, you can use the scope utilities directly:

from authpi_idp import has_access, parse_scope

# Check if a list of scopes grants access
scopes = ["users:read", "projects:**"]
has_access(scopes, "read", "users")           # True
has_access(scopes, "write", "projects.tasks") # True (** is recursive)
has_access(scopes, "delete", "users")         # False

# Parse a scope string into its components
parsed = parse_scope("users.verifiers:write")
# ParsedScope(resource='users.verifiers', action='write')

parsed = parse_scope("projects:**")
# ParsedScope(resource='projects', action='**')

Error Handling

The SDK provides specific error types for different failure modes:

from authpi_idp import (
    OAuthError,
    TokenExpiredError,
    RefreshError,
    TokenParseError,
    SubjectMismatchError,
    ConfigurationError,
    InsufficientScopeError,
    SessionExpiredError,
    UserBlockedError,
    AccountLinkingRequiredError,
    InteractionRequiredError,
    LoginRequiredError,
    ConsentRequiredError,
)

try:
    agent = await idp.create_agent(tokens)
except TokenExpiredError:
    # Token expired and no refresh token available
    redirect_to_login()
except RefreshError as error:
    # Refresh request failed (e.g., refresh token revoked)
    print(f"Refresh failed: {error.error_description}")
    print(f"Status: {error.status_code}")
    redirect_to_login()
except TokenParseError:
    # ID token missing or malformed
    print("Invalid token data")
except SubjectMismatchError as error:
    # Security: refreshed token belongs to different user
    print(f"Expected {error.expected_sub}, got {error.actual_sub}")

Error Hierarchy

All OAuth errors extend OAuthError:

class OAuthError(Exception):
    error: str            # OAuth error code
    error_description: str | None

class TokenExpiredError(OAuthError): ...
class RefreshError(OAuthError):
    status_code: int | None      # HTTP status from token endpoint

class TokenParseError(OAuthError): ...
class SubjectMismatchError(OAuthError):
    expected_sub: str
    actual_sub: str

# AuthPI-specific errors
class InsufficientScopeError(OAuthError): ...  # Token lacks required scope
class SessionExpiredError(OAuthError): ...      # Server-side session timed out
class UserBlockedError(OAuthError): ...         # Blocked user attempted auth
class AccountLinkingRequiredError(OAuthError): ...  # OAuth identity needs linking

# OIDC authorization endpoint errors
class InteractionRequiredError(OAuthError): ...  # Silent auth failed
class LoginRequiredError(OAuthError): ...        # No active session
class ConsentRequiredError(OAuthError): ...      # User hasn't consented

User Info

For full profile data beyond the ID token claims:

userinfo = await idp.get_user_info(agent)

userinfo.sub           # "usr_xxx"
userinfo.email         # "user@example.com"
userinfo.name          # "John Doe"
userinfo.picture       # "https://..."
userinfo.org_id        # "org_xxx" when the token is restricted to one org
userinfo.organizations # [Organization(...), ...]

Logout

from authpi_idp import LogoutOptions

logout_url = idp.create_logout_url(
    LogoutOptions(
        id_token_hint=agent.tokens.id_token,
        post_logout_redirect_uri="https://app.example.com",
        state="logout_state",
    )
)

# redirect(logout_url)

Token Revocation

# Revoke refresh token (recommended on logout)
await idp.revoke_token(agent.tokens.refresh_token, "refresh_token")

# Revoke access token
await idp.revoke_token(agent.tokens.access_token, "access_token")

API Reference

IdpClient / IdpClientSync

IdpClient(
    issuer_url: str,          # OIDC issuer URL
    client_id: str,           # OAuth client ID
    redirect_uri: str | None = None,  # Required for auth code flow, optional for client_credentials
    client_secret: str | None = None,  # Optional for public clients
    auto_refresh: bool = True,         # Auto-refresh expired tokens
    refresh_buffer_seconds: int = 60,  # Refresh buffer
)

Methods:

Method Description
create_authorization_url(scopes, state?, nonce?, org?) Create OAuth authorization URL with PKCE; accepts optional org for selected-org tokens
exchange_code(code, code_verifier) Exchange authorization code for agent
create_agent(tokens, *, on_refresh?, on_refresh_error?, auto_refresh?) Create agent from stored tokens (auto-refreshes)
refresh(agent) Manually refresh tokens
get_user_info(agent) Fetch full user profile
create_logout_url(options?) Create OIDC logout URL
client_credentials(scopes?) Authenticate via client credentials (M2M)
revoke_token(token, hint?) Revoke a token

AuthenticatedAgent

class AuthenticatedAgent:
    # Identity
    id: str                 # Subject ID (usr_xxx, tok_xxx, key_xxx, agt_xxx)
    type: PrincipalType     # "user" | "personal_token" | "api_key" | "agent"
    email: str | None
    email_verified: bool | None
    org_id: str | None       # Selected org for org-restricted tokens

    # Tokens (for storage)
    tokens: TokenSet

    # Organizations (from ID token)
    organizations: list[Organization]

    # Expiration
    @property
    def expires_at(self) -> int: ...          # Unix timestamp
    @property
    def expires_in(self) -> int: ...          # Seconds until expiry
    def is_expired(self, buffer: int = 30) -> bool: ...

    # Authorization
    def has_access(self, action: str, resource: str) -> bool: ...
    def has_access_in(self, org_id: str, action: str, resource: str) -> bool: ...
    def get_scopes_for(self, org_id: str) -> list[str]: ...
    def is_owner_of(self, org_id: str) -> bool: ...
    def is_admin_of(self, org_id: str) -> bool: ...
    def is_member_of(self, org_id: str) -> bool: ...

Framework Examples

FastAPI

from fastapi import FastAPI, Request, Depends, HTTPException
from fastapi.responses import RedirectResponse
from authpi_idp import IdpClient, TokenExpiredError

app = FastAPI()
idp = IdpClient(
    issuer_url="https://idp.authpi.com/iss_xxx",
    client_id="cli_xxx",
    redirect_uri="http://localhost:8000/callback",
)


@app.get("/login")
async def login(request: Request):
    auth = idp.create_authorization_url(scopes=["openid", "profile", "email"])
    request.session["code_verifier"] = auth.code_verifier
    request.session["state"] = auth.state
    return RedirectResponse(auth.url)


@app.get("/callback")
async def callback(request: Request, code: str, state: str):
    if state != request.session.get("state"):
        return {"error": "Invalid state"}

    agent = await idp.exchange_code(code, request.session["code_verifier"])
    request.session["tokens"] = agent.tokens.model_dump()
    return RedirectResponse("/dashboard")


async def get_agent(request: Request):
    tokens_data = request.session.get("tokens")
    if not tokens_data:
        raise HTTPException(status_code=401)

    try:
        return await idp.create_agent(
            tokens_data,
            on_refresh=lambda t: request.session.update({"tokens": t}),
        )
    except TokenExpiredError:
        raise HTTPException(status_code=401)


@app.get("/dashboard")
async def dashboard(agent=Depends(get_agent)):
    if not agent.has_access_in("org_xxx", "read", "dashboard"):
        raise HTTPException(status_code=403)
    return {"user": agent.id, "email": agent.email}

Django

from django.shortcuts import redirect
from django.http import HttpResponse
from authpi_idp import IdpClientSync

idp = IdpClientSync(
    issuer_url="https://idp.authpi.com/iss_xxx",
    client_id="cli_xxx",
    redirect_uri="http://localhost:8000/callback",
)


def login(request):
    auth = idp.create_authorization_url(scopes=["openid", "profile", "email"])
    request.session["code_verifier"] = auth.code_verifier
    request.session["state"] = auth.state
    return redirect(auth.url)


def callback(request):
    code = request.GET.get("code")
    state = request.GET.get("state")

    if state != request.session.get("state"):
        return HttpResponse("Invalid state", status=400)

    agent = idp.exchange_code(code, request.session["code_verifier"])
    request.session["tokens"] = agent.tokens.model_dump()
    return redirect("/dashboard")


def dashboard(request):
    tokens_data = request.session.get("tokens")
    if not tokens_data:
        return redirect("/login")

    agent = idp.create_agent(tokens_data)

    if not agent.has_access_in("org_xxx", "read", "dashboard"):
        return HttpResponse("Forbidden", status=403)

    return HttpResponse(f"Welcome, {agent.email}")

Flask

from flask import Flask, redirect, session, request
from authpi_idp import IdpClientSync

app = Flask(__name__)
app.secret_key = "your-secret-key"

idp = IdpClientSync(
    issuer_url="https://idp.authpi.com/iss_xxx",
    client_id="cli_xxx",
    redirect_uri="http://localhost:5000/callback",
)


@app.route("/login")
def login():
    auth = idp.create_authorization_url(scopes=["openid", "profile", "email"])
    session["code_verifier"] = auth.code_verifier
    session["state"] = auth.state
    return redirect(auth.url)


@app.route("/callback")
def callback():
    code = request.args.get("code")
    state = request.args.get("state")

    if state != session.get("state"):
        return "Invalid state", 400

    agent = idp.exchange_code(code, session["code_verifier"])
    session["tokens"] = agent.tokens.model_dump()
    return redirect("/dashboard")


@app.route("/dashboard")
def dashboard():
    tokens_data = session.get("tokens")
    if not tokens_data:
        return redirect("/login")

    agent = idp.create_agent(tokens_data)
    return f"Welcome, {agent.email}"

Context Manager Support

Both clients support context managers for proper resource cleanup:

# Async
async with IdpClient(...) as idp:
    auth = idp.create_authorization_url(scopes=["openid"])

# Sync
with IdpClientSync(...) as idp:
    auth = idp.create_authorization_url(scopes=["openid"])

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

authpi_idp-0.4.0.tar.gz (55.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

authpi_idp-0.4.0-py3-none-any.whl (20.1 kB view details)

Uploaded Python 3

File details

Details for the file authpi_idp-0.4.0.tar.gz.

File metadata

  • Download URL: authpi_idp-0.4.0.tar.gz
  • Upload date:
  • Size: 55.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for authpi_idp-0.4.0.tar.gz
Algorithm Hash digest
SHA256 71a1361a9c86c76de380cc77d717a15079506b1666d8c925bcb1b613e2f25fc9
MD5 c91836e30ce4da2f673d41e2ff1124dd
BLAKE2b-256 3b1cd35de91f2f1f1c890409e9f08b06d3f50f7a8956c6459c8a2104196a4b05

See more details on using hashes here.

Provenance

The following attestation bundles were made for authpi_idp-0.4.0.tar.gz:

Publisher: release-sdks.yml on arbfay/authpi

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file authpi_idp-0.4.0-py3-none-any.whl.

File metadata

  • Download URL: authpi_idp-0.4.0-py3-none-any.whl
  • Upload date:
  • Size: 20.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for authpi_idp-0.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 9616b12ac0ca73b0791a436ec8d86dfbca5fd82307b43711db368399b6e5e5d6
MD5 e34a943fce2130a1dbebcb4b20c104c2
BLAKE2b-256 50e1feae4ffd8f0da2946f3bb970afeb06a088ec222b10b365372c462ffc713c

See more details on using hashes here.

Provenance

The following attestation bundles were made for authpi_idp-0.4.0-py3-none-any.whl:

Publisher: release-sdks.yml on arbfay/authpi

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page