Official Python SDK for AuthPI identity provider
Project description
authpi-idp
Official Python SDK for AuthPI identity provider.
Requires Python 3.11+
Installation
pip install authpi-idp
# or
poetry add authpi-idp
# or
uv add authpi-idp
Quick Start
from authpi_idp import IdpClient
async def main():
idp = IdpClient(
issuer_url="https://idp.authpi.com/iss_xxx",
client_id="cli_xxx",
client_secret="secret", # omit for public clients (SPAs)
redirect_uri="https://app.example.com/callback",
)
# 1. Create authorization URL (sync - no await needed)
auth = idp.create_authorization_url(
scopes=["openid", "profile", "email"]
)
# 2. Store code_verifier, state, and nonce in session, redirect user to auth.url
session["oauth"] = {"code_verifier": auth.code_verifier, "state": auth.state, "nonce": auth.nonce}
# redirect(auth.url)
# 3. Handle callback - exchange code for authenticated agent
agent = await idp.exchange_code(code, auth.code_verifier)
# 4. Store tokens for future requests
session["tokens"] = agent.tokens.model_dump()
# 5. Check authorization
if agent.has_access_in("org_xxx", "write", "projects"):
# User can write to projects in org_xxx
pass
Session Management
The SDK automatically refreshes expired tokens when creating an agent from stored tokens:
from authpi_idp import IdpClient
idp = IdpClient(
issuer_url="https://idp.authpi.com/iss_xxx",
client_id="cli_xxx",
redirect_uri="https://app.example.com/callback",
)
# Load tokens from your session store (dict or TokenSet)
tokens = session.get("tokens")
# create_agent() automatically refreshes if tokens are expired
agent = await idp.create_agent(
tokens,
# Called when tokens are refreshed - persist the new tokens (receives a dict)
on_refresh=lambda new_tokens: session.update({"tokens": new_tokens}),
# Called when refresh fails - handle the error
on_refresh_error=lambda error: print(f"Session expired: {error}"),
)
Configuring Auto-Refresh
idp = IdpClient(
issuer_url="https://idp.authpi.com/iss_xxx",
client_id="cli_xxx",
redirect_uri="https://app.example.com/callback",
auto_refresh=True, # Default: True
refresh_buffer_seconds=60, # Refresh 60s before expiry (default)
)
# Or disable per-call
agent = await idp.create_agent(tokens, auto_refresh=False)
Token Expiration
# Check expiration
agent.expires_at # Unix timestamp
agent.expires_in # Seconds until expiry (negative if expired)
agent.is_expired() # True if expires within 30 seconds (default clock skew buffer)
agent.is_expired(60) # True if expires within 60 seconds
agent.is_expired(0) # True only if actually expired (no buffer)
Machine-to-Machine Authentication
For server-to-server or background service authentication using the client credentials flow:
from authpi_idp import IdpClient, PrincipalType
idp = IdpClient(
issuer_url="https://idp.authpi.com/iss_xxx",
client_id="agt_machine1",
client_secret="secret",
# No redirect_uri needed for client_credentials
)
agent = await idp.client_credentials(scopes=["users:read", "users:write"])
# Agent uses token-level scopes (no organizations)
agent.has_access("read", "users") # True
agent.type # PrincipalType.AGENT
Sync Support
For non-async contexts, use the sync client:
from authpi_idp import IdpClientSync
idp = IdpClientSync(
issuer_url="https://idp.authpi.com/iss_xxx",
client_id="cli_xxx",
redirect_uri="https://app.example.com/callback",
)
auth = idp.create_authorization_url(scopes=["openid", "profile", "email"])
agent = idp.exchange_code(code, auth.code_verifier)
Authorization
The SDK includes an optional authorization framework based on scopes. You can use it to make local authorization decisions without additional API calls, or you can ignore it entirely and implement your own authorization logic.
The authorization data comes from the organizations claim in the ID token, which AuthPI populates based on your organization and membership configuration.
Using the Built-in Authorization
# Check access across all organizations
agent.has_access("read", "users")
# Check access in a specific organization
agent.has_access_in("org_xxx", "write", "projects")
agent.has_access_in("org_xxx", "delete", "projects.tasks")
# Role checks
agent.is_owner_of("org_xxx") # Has "owner" scope
agent.is_admin_of("org_xxx") # Has "admin" scope
agent.is_member_of("org_xxx") # Has any membership
# Get scopes for an organization
scopes = agent.get_scopes_for("org_xxx")
# ["users:read", "projects:**"]
Rolling Your Own Authorization
If the built-in scope system doesn't fit your needs, you can access the raw data directly:
# Access organizations directly
for org in agent.organizations:
print(org.id) # "org_xxx"
print(org.title) # "Admin" or None
print(org.scopes) # ["users:read", "projects:**"]
print(org.joined_at) # Unix timestamp
# Use agent.id for your own authorization lookups
permissions = my_permission_service.get_permissions(agent.id)
How Scopes Work
AuthPI uses a hierarchical scope format: resource:action
Basic format:
resource:action
resource.subresource:action
Examples:
users:read— Can read usersusers:write— Can create/update usersprojects.tasks:delete— Can delete tasks within projects
Wildcards:
| Pattern | Description |
|---|---|
users:* |
All actions on users (but not sub-resources) |
users:** |
All actions on users AND all sub-resources |
*:read |
Read access to all top-level resources |
*:** |
Full access to everything (super-admin) |
The difference between * and **:
projects:*grantsprojects:read,projects:write,projects:deleteprojects:*does NOT grantprojects.tasks:read(sub-resource)projects:**grants all of the above PLUSprojects.tasks:read,projects.tasks.comments:write, etc.
Scope evaluation example:
# User has scopes: ["projects:**", "users:read"]
# These all return True:
agent.has_access_in("org_xxx", "read", "projects")
agent.has_access_in("org_xxx", "write", "projects")
agent.has_access_in("org_xxx", "delete", "projects.tasks")
agent.has_access_in("org_xxx", "read", "projects.tasks.comments")
agent.has_access_in("org_xxx", "read", "users")
# These return False:
agent.has_access_in("org_xxx", "write", "users") # Only has users:read
agent.has_access_in("org_xxx", "read", "billing") # No billing scope
Special Role Scopes
Three scopes have special meaning and dedicated helper methods:
| Scope | Method | Typical Use |
|---|---|---|
owner |
is_owner_of(org_id) |
Organization billing, deletion, transfer |
admin |
is_admin_of(org_id) |
Member management, settings |
member |
is_member_of(org_id) |
Basic membership check |
These are checked directly (not via wildcard expansion):
# User has scopes: ["owner", "admin", "projects:**"]
agent.is_owner_of("org_xxx") # True - has "owner" scope
agent.is_admin_of("org_xxx") # True - has "admin" scope
agent.is_member_of("org_xxx") # True - has any membership
# Note: "*:**" does NOT grant owner/admin status
# These are explicit role assignments, not permissions
Scope Utilities
For advanced use cases, you can use the scope utilities directly:
from authpi_idp import has_access, parse_scope
# Check if a list of scopes grants access
scopes = ["users:read", "projects:**"]
has_access(scopes, "read", "users") # True
has_access(scopes, "write", "projects.tasks") # True (** is recursive)
has_access(scopes, "delete", "users") # False
# Parse a scope string into its components
parsed = parse_scope("users.verifiers:write")
# ParsedScope(resource='users.verifiers', action='write')
parsed = parse_scope("projects:**")
# ParsedScope(resource='projects', action='**')
Error Handling
The SDK provides specific error types for different failure modes:
from authpi_idp import (
OAuthError,
TokenExpiredError,
RefreshError,
TokenParseError,
SubjectMismatchError,
ConfigurationError,
InsufficientScopeError,
SessionExpiredError,
UserBlockedError,
AccountLinkingRequiredError,
InteractionRequiredError,
LoginRequiredError,
ConsentRequiredError,
)
try:
agent = await idp.create_agent(tokens)
except TokenExpiredError:
# Token expired and no refresh token available
redirect_to_login()
except RefreshError as error:
# Refresh request failed (e.g., refresh token revoked)
print(f"Refresh failed: {error.error_description}")
print(f"Status: {error.status_code}")
redirect_to_login()
except TokenParseError:
# ID token missing or malformed
print("Invalid token data")
except SubjectMismatchError as error:
# Security: refreshed token belongs to different user
print(f"Expected {error.expected_sub}, got {error.actual_sub}")
Error Hierarchy
All OAuth errors extend OAuthError:
class OAuthError(Exception):
error: str # OAuth error code
error_description: str | None
class TokenExpiredError(OAuthError): ...
class RefreshError(OAuthError):
status_code: int | None # HTTP status from token endpoint
class TokenParseError(OAuthError): ...
class SubjectMismatchError(OAuthError):
expected_sub: str
actual_sub: str
# AuthPI-specific errors
class InsufficientScopeError(OAuthError): ... # Token lacks required scope
class SessionExpiredError(OAuthError): ... # Server-side session timed out
class UserBlockedError(OAuthError): ... # Blocked user attempted auth
class AccountLinkingRequiredError(OAuthError): ... # OAuth identity needs linking
# OIDC authorization endpoint errors
class InteractionRequiredError(OAuthError): ... # Silent auth failed
class LoginRequiredError(OAuthError): ... # No active session
class ConsentRequiredError(OAuthError): ... # User hasn't consented
User Info
For full profile data beyond the ID token claims:
userinfo = await idp.get_user_info(agent)
userinfo.sub # "usr_xxx"
userinfo.email # "user@example.com"
userinfo.name # "John Doe"
userinfo.picture # "https://..."
userinfo.organizations # [Organization(...), ...]
Logout
from authpi_idp import LogoutOptions
logout_url = idp.create_logout_url(
LogoutOptions(
id_token_hint=agent.tokens.id_token,
post_logout_redirect_uri="https://app.example.com",
state="logout_state",
)
)
# redirect(logout_url)
Token Revocation
# Revoke refresh token (recommended on logout)
await idp.revoke_token(agent.tokens.refresh_token, "refresh_token")
# Revoke access token
await idp.revoke_token(agent.tokens.access_token, "access_token")
API Reference
IdpClient / IdpClientSync
IdpClient(
issuer_url: str, # OIDC issuer URL
client_id: str, # OAuth client ID
redirect_uri: str | None = None, # Required for auth code flow, optional for client_credentials
client_secret: str | None = None, # Optional for public clients
auto_refresh: bool = True, # Auto-refresh expired tokens
refresh_buffer_seconds: int = 60, # Refresh buffer
)
Methods:
| Method | Description |
|---|---|
create_authorization_url(scopes, state?, nonce?) |
Create OAuth authorization URL with PKCE |
exchange_code(code, code_verifier) |
Exchange authorization code for agent |
create_agent(tokens, *, on_refresh?, on_refresh_error?, auto_refresh?) |
Create agent from stored tokens (auto-refreshes) |
refresh(agent) |
Manually refresh tokens |
get_user_info(agent) |
Fetch full user profile |
create_logout_url(options?) |
Create OIDC logout URL |
client_credentials(scopes?) |
Authenticate via client credentials (M2M) |
revoke_token(token, hint?) |
Revoke a token |
AuthenticatedAgent
class AuthenticatedAgent:
# Identity
id: str # Subject ID (usr_xxx, tok_xxx, key_xxx, agt_xxx)
type: PrincipalType # "user" | "personal_token" | "api_key" | "agent"
email: str | None
email_verified: bool | None
# Tokens (for storage)
tokens: TokenSet
# Organizations (from ID token)
organizations: list[Organization]
# Expiration
@property
def expires_at(self) -> int: ... # Unix timestamp
@property
def expires_in(self) -> int: ... # Seconds until expiry
def is_expired(self, buffer: int = 30) -> bool: ...
# Authorization
def has_access(self, action: str, resource: str) -> bool: ...
def has_access_in(self, org_id: str, action: str, resource: str) -> bool: ...
def get_scopes_for(self, org_id: str) -> list[str]: ...
def is_owner_of(self, org_id: str) -> bool: ...
def is_admin_of(self, org_id: str) -> bool: ...
def is_member_of(self, org_id: str) -> bool: ...
Framework Examples
FastAPI
from fastapi import FastAPI, Request, Depends, HTTPException
from fastapi.responses import RedirectResponse
from authpi_idp import IdpClient, TokenExpiredError
app = FastAPI()
idp = IdpClient(
issuer_url="https://idp.authpi.com/iss_xxx",
client_id="cli_xxx",
redirect_uri="http://localhost:8000/callback",
)
@app.get("/login")
async def login(request: Request):
auth = idp.create_authorization_url(scopes=["openid", "profile", "email"])
request.session["code_verifier"] = auth.code_verifier
request.session["state"] = auth.state
return RedirectResponse(auth.url)
@app.get("/callback")
async def callback(request: Request, code: str, state: str):
if state != request.session.get("state"):
return {"error": "Invalid state"}
agent = await idp.exchange_code(code, request.session["code_verifier"])
request.session["tokens"] = agent.tokens.model_dump()
return RedirectResponse("/dashboard")
async def get_agent(request: Request):
tokens_data = request.session.get("tokens")
if not tokens_data:
raise HTTPException(status_code=401)
try:
return await idp.create_agent(
tokens_data,
on_refresh=lambda t: request.session.update({"tokens": t}),
)
except TokenExpiredError:
raise HTTPException(status_code=401)
@app.get("/dashboard")
async def dashboard(agent=Depends(get_agent)):
if not agent.has_access_in("org_xxx", "read", "dashboard"):
raise HTTPException(status_code=403)
return {"user": agent.id, "email": agent.email}
Django
from django.shortcuts import redirect
from django.http import HttpResponse
from authpi_idp import IdpClientSync
idp = IdpClientSync(
issuer_url="https://idp.authpi.com/iss_xxx",
client_id="cli_xxx",
redirect_uri="http://localhost:8000/callback",
)
def login(request):
auth = idp.create_authorization_url(scopes=["openid", "profile", "email"])
request.session["code_verifier"] = auth.code_verifier
request.session["state"] = auth.state
return redirect(auth.url)
def callback(request):
code = request.GET.get("code")
state = request.GET.get("state")
if state != request.session.get("state"):
return HttpResponse("Invalid state", status=400)
agent = idp.exchange_code(code, request.session["code_verifier"])
request.session["tokens"] = agent.tokens.model_dump()
return redirect("/dashboard")
def dashboard(request):
tokens_data = request.session.get("tokens")
if not tokens_data:
return redirect("/login")
agent = idp.create_agent(tokens_data)
if not agent.has_access_in("org_xxx", "read", "dashboard"):
return HttpResponse("Forbidden", status=403)
return HttpResponse(f"Welcome, {agent.email}")
Flask
from flask import Flask, redirect, session, request
from authpi_idp import IdpClientSync
app = Flask(__name__)
app.secret_key = "your-secret-key"
idp = IdpClientSync(
issuer_url="https://idp.authpi.com/iss_xxx",
client_id="cli_xxx",
redirect_uri="http://localhost:5000/callback",
)
@app.route("/login")
def login():
auth = idp.create_authorization_url(scopes=["openid", "profile", "email"])
session["code_verifier"] = auth.code_verifier
session["state"] = auth.state
return redirect(auth.url)
@app.route("/callback")
def callback():
code = request.args.get("code")
state = request.args.get("state")
if state != session.get("state"):
return "Invalid state", 400
agent = idp.exchange_code(code, session["code_verifier"])
session["tokens"] = agent.tokens.model_dump()
return redirect("/dashboard")
@app.route("/dashboard")
def dashboard():
tokens_data = session.get("tokens")
if not tokens_data:
return redirect("/login")
agent = idp.create_agent(tokens_data)
return f"Welcome, {agent.email}"
Context Manager Support
Both clients support context managers for proper resource cleanup:
# Async
async with IdpClient(...) as idp:
auth = idp.create_authorization_url(scopes=["openid"])
# Sync
with IdpClientSync(...) as idp:
auth = idp.create_authorization_url(scopes=["openid"])
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file authpi_idp-0.3.0.tar.gz.
File metadata
- Download URL: authpi_idp-0.3.0.tar.gz
- Upload date:
- Size: 54.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
152fada879d91fb9a1e1ecfb4cf877a160f109ba1d0aaf91b7ae738f7111cf0a
|
|
| MD5 |
71ca8ed5d47642c7f376243f0553daac
|
|
| BLAKE2b-256 |
54cc9fade0322ab9a9c614769301040b1ac57678ab89937fd7b9740f6f7fed42
|
Provenance
The following attestation bundles were made for authpi_idp-0.3.0.tar.gz:
Publisher:
release-sdks.yml on arbfay/authpi
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
authpi_idp-0.3.0.tar.gz -
Subject digest:
152fada879d91fb9a1e1ecfb4cf877a160f109ba1d0aaf91b7ae738f7111cf0a - Sigstore transparency entry: 1328149806
- Sigstore integration time:
-
Permalink:
arbfay/authpi@5c5264ebc10e50bfe53881373c1be805f0041e71 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/arbfay
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release-sdks.yml@5c5264ebc10e50bfe53881373c1be805f0041e71 -
Trigger Event:
push
-
Statement type:
File details
Details for the file authpi_idp-0.3.0-py3-none-any.whl.
File metadata
- Download URL: authpi_idp-0.3.0-py3-none-any.whl
- Upload date:
- Size: 19.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7deb1e603d5f4f2ad12d8cb057978fff34eca83f3c259ddf29d7870f871bf0cf
|
|
| MD5 |
0dce7a1209e199de7749d2dbd41d4513
|
|
| BLAKE2b-256 |
7458433b5723388c7958fed9c0f7ed059dd87c1216ece8d4b48e2e677b5557e4
|
Provenance
The following attestation bundles were made for authpi_idp-0.3.0-py3-none-any.whl:
Publisher:
release-sdks.yml on arbfay/authpi
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
authpi_idp-0.3.0-py3-none-any.whl -
Subject digest:
7deb1e603d5f4f2ad12d8cb057978fff34eca83f3c259ddf29d7870f871bf0cf - Sigstore transparency entry: 1328149818
- Sigstore integration time:
-
Permalink:
arbfay/authpi@5c5264ebc10e50bfe53881373c1be805f0041e71 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/arbfay
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release-sdks.yml@5c5264ebc10e50bfe53881373c1be805f0041e71 -
Trigger Event:
push
-
Statement type: