A lightweight OAuth2/OpenID Connect authentication library for Python with JWT validation and role-based authorization
Project description
🔐 miraveja-authentication
A lightweight OAuth2/OpenID Connect authentication library for Python with JWT validation and role-based authorization
Etymology: Combining "authentication" with the Miraveja ecosystem naming convention
🚀 Overview
miraveja-authentication is a modern authentication library that implements OAuth2/OpenID Connect standards with JWT token validation and role-based authorization. Built with DDD/Hexagonal Architecture principles, it provides a clean, protocol-based interface that works with any compliant OAuth2/OIDC provider (Keycloak, Auth0, AWS Cognito, Azure AD, Google, Okta).
Part of the Miraveja ecosystem, miraveja-authentication provides authentication and authorization infrastructure for all ecosystem services.
✨ Key Features
- 🔑 OAuth2/OIDC Protocol - Standards-compliant implementation working with all OAuth2/OIDC providers
- 🎫 JWT Token Validation - Signature verification using JWKS (JSON Web Key Sets)
- 🔄 Automatic OIDC Discovery - Auto-configuration via
.well-known/openid-configuration - 👥 Role-Based Authorization - Keycloak-style realm and client roles with extensible mappers
- ⚡ FastAPI Integration - First-class support with dependency injection helpers
- 🧪 Testing Utilities - Built-in mock providers for unit and integration testing
- 🏗️ Clean Architecture - Organized following DDD/Hexagonal Architecture principles
🛠️ Technology Stack
🐍 Core Runtime
- Python 3.10+ - Type hints and modern Python features
- pydantic - Configuration validation and data modeling
- httpx - Async HTTP client for OIDC discovery and JWKS fetching
- PyJWT[crypto] - JWT token validation and signature verification
🌐 Optional Integrations
- FastAPI - Web framework integration
🧪 Development
- pytest - Testing framework with async support
- pytest-asyncio - Async testing utilities
- pytest-cov - Coverage reporting
- black - Code formatter
- pylint - Code quality checker
- isort - Import statement organizer
- mypy - Static type checker
- pre-commit - Git hook framework for automated checks
🏛️ Architecture
miraveja-authentication follows Domain-Driven Design and Hexagonal Architecture principles:
src/miraveja_auth/
├── 🧠 domain/ # Core business logic
│ # - Models: User, BaseClaims, Token, Role
│ # - Interfaces: IOAuth2Provider, IClaimsParser, IOIDCDiscoveryService, IAuthenticator
│ # - Exceptions: AuthenticationException, AuthorizationException, etc.
├── 🎬 application/ # Use cases and orchestration
│ # - OAuth2Configuration: Config validation and management
│ # - OAuth2Provider: Token validation use case
└── 🔌 infrastructure/ # External integrations
# - OIDCDiscoveryService: HTTP-based OIDC discovery and JWKS
# - KeycloakClaimsParser: Keycloak JWT claims parser
# - FastAPI authenticators: HTTP, WebSocket, unified
# - MockOAuth2Provider: Testing utilities
Dependency Rule: Domain ← Application ← Infrastructure
- Domain has no dependencies on other layers
- Application depends only on Domain
- Infrastructure depends on Application and Domain
🎯 Getting Started
📋 Prerequisites
- Python 3.10+
- Poetry 2.0+ (recommended) or pip
🚀 Installation
poetry add miraveja-authentication
Or with pip:
pip install miraveja-authentication
For FastAPI integration:
poetry add miraveja-authentication[fastapi]
📖 Quick Start
Basic Token Validation
from miraveja_auth import (
OAuth2Configuration,
OAuth2Provider,
OIDCDiscoveryService,
)
from miraveja_auth.infrastructure.providers.keycloak import KeycloakClaimsParser
# Configure OAuth2/OIDC provider
config = OAuth2Configuration(
issuer="https://your-keycloak.com/realms/myrealm",
client_id="my-client",
client_secret="your-secret",
)
# Create discovery service, claims parser, and provider
discovery = OIDCDiscoveryService(config)
parser = KeycloakClaimsParser()
provider = OAuth2Provider(config, discovery, parser)
# Validate token
async def authenticate_user(token: str):
user = await provider.validate_token(token)
print(f"User ID: {user.id}")
print(f"Username: {user.username}")
print(f"Email: {user.email}")
print(f"Realm Roles: {user.realm_roles}")
print(f"Client Roles: {user.client_roles}")
Environment-Based Configuration
import os
from miraveja_auth import OAuth2Configuration, OAuth2Provider
# Set environment variables
os.environ["OAUTH2_ISSUER"] = "https://your-keycloak.com/realms/myrealm"
os.environ["OAUTH2_CLIENT_ID"] = "my-client"
os.environ["OAUTH2_CLIENT_SECRET"] = "your-secret"
# Load configuration from environment
config = OAuth2Configuration.from_env()
discovery = OIDCDiscoveryService(config)
parser = KeycloakClaimsParser()
provider = OAuth2Provider(config, discovery, parser)
Role-Based Authorization
from miraveja_auth import (
OAuth2Configuration,
OAuth2Provider,
OIDCDiscoveryService,
AuthorizationException,
)
from miraveja_auth.infrastructure.providers.keycloak import KeycloakClaimsParser
async def process_admin_action(token: str):
discovery = OIDCDiscoveryService(config)
parser = KeycloakClaimsParser()
provider = OAuth2Provider(config, discovery, parser)
user = await provider.validate_token(token)
# Check role (returns bool)
if user.has_realm_role("admin"):
print("User is admin")
# Require role (raises AuthorizationException if missing)
try:
user.require_realm_role("admin")
# Proceed with admin action
except AuthorizationException as e:
print(f"Access denied: {e}")
Client-Specific Roles
from miraveja_auth import (
OAuth2Configuration,
OAuth2Provider,
OIDCDiscoveryService,
)
async def process_api_request(token: str):
discovery = OIDCDiscoveryService(config)
parser = KeycloakClaimsParser()
provider = OAuth2Provider(config, discovery, parser)
user = await provider.validate_token(token)
# Check client-specific role
if user.has_client_role("api-client", "read:documents"):
# User has read:documents role for api-client
pass
# Require client role
user.require_client_role("api-client", "write:documents")
⚡ FastAPI Integration
Basic Integration
from fastapi import FastAPI, Depends, HTTPException
from miraveja_auth import (
OAuth2Configuration,
OAuth2Provider,
OIDCDiscoveryService,
)
from miraveja_auth.infrastructure import FastAPIAuthenticator
app = FastAPI()
# Configure authentication
config = OAuth2Configuration.from_env()
discovery = OIDCDiscoveryService(config)
parser = KeycloakClaimsParser()
provider = OAuth2Provider(config, discovery, parser)
authenticator = FastAPIAuthenticator(provider)
# Use authentication dependency
@app.get("/users/me")
async def get_user_profile(user = Depends(authenticator.http.get_current_user)):
return {
"id": user.id,
"username": user.username,
"email": user.email,
"realm_roles": user.realm_roles,
"client_roles": user.client_roles,
}
Protected Endpoints with Role Requirements
from fastapi import FastAPI, Depends
from miraveja_auth.infrastructure import FastAPIAuthenticator
authenticator = FastAPIAuthenticator(provider)
@app.get("/admin/users")
async def list_all_users(user = Depends(authenticator.http.require_realm_role("admin"))):
# Only users with 'admin' realm role can access
return {"users": [...]}
@app.post("/api/documents")
async def create_document(user = Depends(authenticator.http.require_client_role("api-client", "write:documents"))):
# Only users with 'write:documents' role for 'api-client' can create
return {"document_id": 123}
Optional Authentication
from typing import Optional
from miraveja_auth.infrastructure import FastAPIAuthenticator
from miraveja_auth.domain import User
authenticator = FastAPIAuthenticator(provider)
@app.get("/public/content")
async def get_content(user: Optional[User] = Depends(authenticator.http.get_current_user_optional)):
if user:
# Return personalized content for authenticated users
return {"content": f"Welcome back, {user.username}!"}
else:
# Return public content for anonymous users
return {"content": "Welcome, guest!"}
Complete FastAPI Example
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from miraveja_auth import (
OAuth2Configuration,
OAuth2Provider,
OIDCDiscoveryService,
AuthenticationException,
AuthorizationException,
)
from miraveja_auth.infrastructure.providers.keycloak import KeycloakClaimsParser
from miraveja_auth.infrastructure import FastAPIAuthenticator
# Initialize FastAPI app
app = FastAPI(title="My Secured API")
# Configure OAuth2/OIDC authentication
config = OAuth2Configuration(
issuer="https://keycloak.example.com/realms/myrealm",
client_id="my-api",
client_secret="secret",
)
discovery = OIDCDiscoveryService(config)
parser = KeycloakClaimsParser()
provider = OAuth2Provider(config, discovery, parser)
authenticator = FastAPIAuthenticator(provider)
# Public endpoint (no authentication)
@app.get("/")
async def root():
return {"message": "Welcome to My Secured API"}
# Protected endpoint (authentication required)
@app.get("/profile")
async def get_profile(user = Depends(authenticator.http.get_current_user)):
return {
"id": user.id,
"username": user.username,
"email": user.email,
}
# Admin endpoint (requires 'admin' realm role)
@app.get("/admin/dashboard")
async def admin_dashboard(user = Depends(authenticator.http.require_realm_role("admin"))):
return {"message": f"Welcome to admin dashboard, {user.username}"}
# API endpoint (requires client-specific role)
@app.post("/api/documents")
async def create_document(
document: dict,
user = Depends(authenticator.http.require_client_role("my-api", "write:documents"))
):
return {
"id": 123,
"created_by": user.username,
"data": document,
}
# Mixed endpoint (optional authentication)
@app.get("/content")
async def get_content(user = Depends(authenticator.http.get_current_user_optional)):
if user:
return {"message": f"Hello, {user.username}!", "premium": True}
return {"message": "Hello, guest!", "premium": False}
# Global exception handlers
@app.exception_handler(AuthenticationException)
async def authentication_error_handler(request, exc):
return HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=str(exc),
headers={"WWW-Authenticate": "Bearer"},
)
@app.exception_handler(AuthorizationException)
async def authorization_error_handler(request, exc):
return HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=str(exc),
)
WebSocket Support
FastAPI authenticators support both HTTP and WebSocket connections through separate implementations:
from fastapi import FastAPI, Depends, WebSocket
from miraveja_auth import OAuth2Configuration, OAuth2Provider, OIDCDiscoveryService
from miraveja_auth.infrastructure import FastAPIAuthenticator
from miraveja_auth.domain import User
app = FastAPI()
# Setup
config = OAuth2Configuration.from_env()
discovery = OIDCDiscoveryService(config)
parser = KeycloakClaimsParser()
provider = OAuth2Provider(config, discovery, parser)
authenticator = FastAPIAuthenticator(provider)
# HTTP endpoint - uses Authorization header
@app.get("/api/data")
async def get_data(user: User = Depends(authenticator.http.get_current_user)):
return {"data": "...", "user": user.username}
# WebSocket endpoint - uses query parameter (?token=...)
@app.websocket("/ws")
async def websocket_endpoint(
websocket: WebSocket,
user: User = Depends(authenticator.ws.get_current_user)
):
await websocket.accept()
await websocket.send_json({
"message": f"Connected as {user.username}",
"roles": user.realm_roles
})
while True:
data = await websocket.receive_text()
await websocket.send_json({"echo": data})
Separate Authenticators (Advanced)
For more control, use HTTP and WebSocket authenticators separately:
from miraveja_auth.infrastructure import HTTPAuthenticator, WebSocketAuthenticator
# Create separate authenticators
http_auth = HTTPAuthenticator(provider)
ws_auth = WebSocketAuthenticator(provider)
# HTTP endpoints
@app.get("/api/profile")
async def profile(user: User = Depends(http_auth.get_current_user)):
return {"id": user.id, "username": user.username}
@app.get("/api/admin")
async def admin(user: User = Depends(http_auth.require_realm_role("admin"))):
return {"message": "Admin area"}
# WebSocket endpoints
@app.websocket("/ws/notifications")
async def notifications(
websocket: WebSocket,
user: User = Depends(ws_auth.get_current_user)
):
await websocket.accept()
# ... handle WebSocket communication
# Optional WebSocket authentication
@app.websocket("/ws/public")
async def public_ws(
websocket: WebSocket,
user: User = Depends(ws_auth.get_current_user_optional)
):
await websocket.accept()
if user:
await websocket.send_json({"user": user.username})
else:
await websocket.send_json({"user": "anonymous"})
🔧 Configuration
OAuth2Configuration Model
from miraveja_auth import OAuth2Configuration
config = OAuth2Configuration(
issuer="https://keycloak.example.com/realms/myrealm", # OIDC issuer URL
client_id="my-client", # OAuth2 client ID
client_secret="your-secret", # OAuth2 client secret (optional)
verify_ssl=True, # Verify SSL certificates (default: True)
public_key=None, # Static public key (optional, for offline validation)
token_verification_algorithm="RS256", # JWT algorithm (default: RS256)
token_minimum_ttl_seconds=60, # Minimum token TTL in seconds (default: 60)
)
Environment Variables
Configure your OAuth2/OIDC provider using environment variables:
# Required
OAUTH2_ISSUER=https://keycloak.example.com/realms/myrealm # OIDC issuer URL
OAUTH2_CLIENT_ID=my-client # OAuth2 client ID
# Optional
OAUTH2_CLIENT_SECRET=your-secret # Client secret (for confidential clients)
OAUTH2_VERIFY_SSL=true # Verify SSL certificates (default: true)
OAUTH2_PUBLIC_KEY=-----BEGIN PUBLIC KEY-----... # Static public key for offline validation
OAUTH2_TOKEN_ALGORITHM=RS256 # JWT verification algorithm (default: RS256)
OAUTH2_TOKEN_MIN_TTL=60 # Minimum token TTL in seconds (default: 60)
Then load configuration:
from miraveja_auth import OAuth2Configuration
config = OAuth2Configuration.from_env()
Supported OAuth2/OIDC Providers
miraveja-authentication works with any OAuth2/OpenID Connect compliant provider:
Keycloak
config = OAuth2Configuration(
issuer="https://keycloak.example.com/realms/myrealm",
client_id="my-client",
client_secret="your-secret",
)
Auth0
config = OAuth2Configuration(
issuer="https://your-tenant.auth0.com/",
client_id="your-client-id",
client_secret="your-client-secret",
)
AWS Cognito
config = OAuth2Configuration(
issuer="https://cognito-idp.{region}.amazonaws.com/{userPoolId}",
client_id="your-app-client-id",
client_secret="your-app-client-secret",
)
Azure AD (Microsoft Entra)
config = OAuth2Configuration(
issuer="https://login.microsoftonline.com/{tenant-id}/v2.0",
client_id="your-application-id",
client_secret="your-client-secret",
)
config = OAuth2Configuration(
issuer="https://accounts.google.com",
client_id="your-client-id.apps.googleusercontent.com",
client_secret="your-client-secret",
)
Okta
config = OAuth2Configuration(
issuer="https://your-domain.okta.com/oauth2/default",
client_id="your-client-id",
client_secret="your-client-secret",
)
📚 API Reference
OAuth2Configuration
Pydantic model for OAuth2/OIDC configuration with validation.
Fields:
issuer: str- OIDC issuer URL (required)client_id: str- OAuth2 client ID (required)client_secret: Optional[str]- Client secret for confidential clientsverify_ssl: bool- Verify SSL certificates (default: True)public_key: Optional[str]- Static public key for offline validationtoken_verification_algorithm: str- JWT algorithm (default: RS256)token_minimum_ttl_seconds: int- Minimum token TTL in seconds (default: 60)
Class Methods:
from_env() -> OAuth2Configuration- Creates configuration from environment variables
- Returns: OAuth2Configuration instance
- Validates required fields (OAUTH2_ISSUER, OAUTH2_CLIENT_ID)
Validators:
validate_issuer()- Ensures issuer is a valid HTTPS URL
OAuth2Provider
Token validation use case in the application layer. Orchestrates the validation flow.
Constructor:
OAuth2Provider(
config: OAuth2Configuration,
discovery_service: IOIDCDiscoveryService,
claims_parser: IClaimsParser
)
config: OAuth2Configuration instancediscovery_service: OIDC discovery service (e.g., OIDCDiscoveryService)claims_parser: Claims parser for converting JWT payload to BaseClaims (e.g., KeycloakClaimsParser)
Methods:
async validate_token(token: str) -> User- Validates JWT token and returns User instance
- Checks expiration and minimum TTL
- Verifies signature (offline with static key or online with JWKS)
- Parses claims into User model with roles
- Raises:
TokenExpiredException,TokenInvalidException,AuthenticationException
OIDCDiscoveryService
HTTP-based OIDC discovery and JWKS service in the infrastructure layer.
Constructor:
OIDCDiscoveryService(config: OAuth2Configuration)
Methods:
-
async get_signing_key(token: str) -> Any- Gets signing key for JWT validation from JWKS
- Caches keys for 1 hour
- Raises:
AuthenticationException
-
async discover_configuration() -> Dict[str, Any]- Fetches OIDC discovery configuration from
.well-known/openid-configuration - Returns: OIDC configuration dictionary
- Raises:
AuthenticationException
- Fetches OIDC discovery configuration from
Domain Models
User
User representation with authentication claims and roles.
Fields:
id: str- User ID (from 'sub' claim)username: str- Username (from 'preferred_username' claim)email: Optional[str]- User emailrealm_roles: List[str]- Realm-level rolesclient_roles: Dict[str, List[str]]- Client-specific roles
Class Methods:
from_claims(claims: Claims) -> User- Create User from JWT claims
Instance Methods:
has_realm_role(role: str) -> bool- Check if user has a specific realm rolerequire_realm_role(role: str) -> None- Require user to have a realm role (raises AuthorizationException if missing)has_client_role(client: str, role: str) -> bool- Check if user has a client-specific rolerequire_client_role(client: str, role: str) -> None- Require user to have a client role (raises AuthorizationException if missing)
BaseClaims
Abstract base class for JWT token claims representation.
Fields:
iss: str- Issuersub: str- Subject (user ID)aud: str- Audienceexp: int- Expiration timestampiat: int- Issued at timestamp- Additional OIDC standard claims (email, preferred_username, etc.)
Abstract Methods:
get_roles() -> List[str]- Extract realm/global roles from claimsget_client_roles(client_id: str) -> List[str]- Extract roles for specific clientget_all_client_roles() -> Dict[str, List[str]]- Extract all client roles
Token
OAuth2 token representation.
Fields:
access_token: str- JWT access tokenrefresh_token: Optional[str]- Refresh tokenexpires_in: int- Token lifetime in seconds
Role
Role value object.
Fields:
id: str- Role IDname: str- Role namedescription: Optional[str]- Role descriptioncomposite: bool- Whether role is compositeclient_role: bool- Whether role is client-specificcontainer_id: Optional[str]- Container (realm or client) ID
FastAPI Integration
FastAPIAuthenticator
Unified authenticator providing both HTTP and WebSocket authentication.
Constructor:
FastAPIAuthenticator(
provider: IOAuth2Provider
)
Properties:
http: HTTPAuthenticator- HTTP-specific authenticatorws: WebSocketAuthenticator- WebSocket-specific authenticator
Methods (HTTP delegation):
get_current_user(token: str) -> User- Validate token and return user (raises on failure)get_current_user_optional(token: Optional[str]) -> Optional[User]- Validate token if providedrequire_realm_role(*roles: str)- Dependency requiring any of the specified realm rolesrequire_client_role(client_id: str, *roles: str)- Dependency requiring client-specific roles
Usage:
from miraveja_auth.infrastructure.fastapi_integration import FastAPIAuthenticator
authenticator = FastAPIAuthenticator(provider)
# Use .http for HTTP endpoints
@app.get("/users/me")
async def read_current_user(user: User = Depends(authenticator.http.get_current_user)):
return user
# Use .ws for WebSocket endpoints
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
user = await authenticator.ws.get_current_user(websocket)
await websocket.accept()
HTTPAuthenticator
HTTP-specific authenticator extracting JWT from Authorization: Bearer header.
Constructor:
HTTPAuthenticator(
provider: IOAuth2Provider
)
Methods:
get_current_user(token: str = Depends(oauth2_scheme)) -> User- FastAPI dependency for HTTP endpointsget_current_user_optional(token: Optional[str] = Depends(optional_oauth2_scheme)) -> Optional[User]require_realm_role(*roles: str)- Dependency requiring realm rolesrequire_client_role(client_id: str, *roles: str)- Dependency requiring client roles
WebSocketAuthenticator
WebSocket-specific authenticator extracting JWT from query parameter ?token=....
Constructor:
WebSocketAuthenticator(
provider: IOAuth2Provider
)
Methods:
get_current_user(websocket: WebSocket) -> User- Extract and validate token from WebSocket query parametersget_current_user_optional(websocket: WebSocket) -> Optional[User]- Optional WebSocket authenticationrequire_realm_role(websocket: WebSocket, *roles: str) -> User- Validate token and require realm rolesrequire_client_role(websocket: WebSocket, client_id: str, *roles: str) -> User- Validate token and require client roles
Note: WebSocket authentication extracts the token from the query parameter (e.g., ws://localhost:8000/ws?token=eyJ...) since WebSockets don't support custom headers in browsers.
KeycloakClaimsParser
Claims parser for Keycloak-specific JWT tokens.
Constructor:
KeycloakClaimsParser()
Methods:
parse(payload: Dict[str, Any]) -> KeycloakClaims- Parse raw JWT payload into KeycloakClaims
KeycloakClaims Methods:
get_roles() -> List[str]- Extract realm roles fromrealm_access.rolesget_client_roles(client_id: str) -> List[str]- Extract roles for specific client fromresource_accessget_all_client_roles() -> Dict[str, List[str]]- Extract all client roles fromresource_access
Usage:
from miraveja_auth.infrastructure.providers.keycloak import KeycloakClaimsParser
parser = KeycloakClaimsParser()
provider = OAuth2Provider(config, discovery_service, parser)
MockOAuth2Provider
Testing utility for simulating OAuth2 authentication without a real provider.
Constructor:
MockOAuth2Provider()
Methods:
add_user(user_id: str, username: str, email: Optional[str] = None, realm_roles: List[str] = [], client_roles: Dict[str, List[str]] = {})- Add test userset_token_for_user(user_id: str, token: Optional[str] = None) -> str- Generate mock token for usersimulate_failure(mode: Optional[str])- Simulate authentication failures (modes: 'expired', 'invalid', or None to clear)validate_token(token: str) -> User- Validate mock tokenget_user_by_id(user_id: str) -> User- Get user directly by ID
Usage:
from miraveja_auth.infrastructure.testing import MockOAuth2Provider
mock_provider = MockOAuth2Provider()
mock_provider.add_user("123", "testuser", realm_roles=["admin"])
token = mock_provider.set_token_for_user("123")
user = await mock_provider.validate_token(token)
Exceptions
AuthenticationException- Base exception for authentication failuresTokenExpiredException- Token has expired (includes expiration time and TTL)TokenInvalidException- Token signature or structure is invalidAuthorizationException- User lacks required permissionsConfigurationException- Invalid configuration
🔥 Advanced Usage
Custom Claims Parser
Create custom claims parsers for different OAuth2/OIDC providers:
from miraveja_auth import BaseClaims, IClaimsParser, OAuth2Provider, TokenInvalidException
from typing import List, Dict, Any, Optional
class Auth0Claims(BaseClaims):
"""Claims for Auth0 tokens."""
permissions: Optional[List[str]] = None
def get_roles(self) -> List[str]:
# Auth0 stores roles/permissions in custom namespace or permissions field
if self.permissions:
return self.permissions
return getattr(self, "https://myapp.com/roles", [])
def get_client_roles(self, client_id: str) -> List[str]:
# Auth0 typically doesn't use client-specific roles
return []
def get_all_client_roles(self) -> Dict[str, List[str]]:
return {}
class Auth0ClaimsParser(IClaimsParser):
"""Parser for Auth0 JWT payloads."""
def parse(self, payload: Dict[str, Any]) -> Auth0Claims:
try:
return Auth0Claims.model_validate(payload)
except Exception:
raise TokenInvalidException()
# Use custom parser
config = OAuth2Configuration.from_env()
parser = Auth0ClaimsParser()
provider = OAuth2Provider(config, discovery_service, parser)
Offline Token Validation
For offline validation without JWKS fetching:
from miraveja_auth import OAuth2Configuration, OAuth2Provider
config = OAuth2Configuration(
issuer="https://keycloak.example.com/realms/myrealm",
client_id="my-client",
public_key="""-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA...
-----END PUBLIC KEY-----""",
)
parser = KeycloakClaimsParser()
provider = OAuth2Provider(config, discovery_service, parser)
# Token validation uses static public key instead of JWKS
user = await provider.validate_token(token)
Multiple OAuth2 Providers
Support multiple authentication providers:
from miraveja_auth import OAuth2Configuration, OAuth2Provider
# Keycloak provider
keycloak_config = OAuth2Configuration(
issuer="https://keycloak.example.com/realms/internal",
client_id="internal-api",
)
keycloak_discovery = OIDCDiscoveryService(keycloak_config)
keycloak_parser = KeycloakClaimsParser()
keycloak_provider = OAuth2Provider(keycloak_config, keycloak_discovery, keycloak_parser)
# Auth0 provider
auth0_config = OAuth2Configuration(
issuer="https://your-tenant.auth0.com/",
client_id="external-api",
)
auth0_discovery = OIDCDiscoveryService(auth0_config)
auth0_parser = Auth0ClaimsParser() # Custom parser for Auth0
auth0_provider = OAuth2Provider(auth0_config, auth0_discovery, auth0_parser)
# Validate against appropriate provider
async def authenticate(token: str, provider_type: str):
if provider_type == "keycloak":
return await keycloak_provider.validate_token(token)
elif provider_type == "auth0":
return await auth0_provider.validate_token(token)
Custom Token TTL Validation
Ensure tokens have sufficient remaining lifetime:
config = OAuth2Configuration(
issuer="https://keycloak.example.com/realms/myrealm",
client_id="my-client",
token_minimum_ttl_seconds=300, # Require at least 5 minutes remaining
)
discovery = OIDCDiscoveryService(config)
parser = KeycloakClaimsParser()
provider = OAuth2Provider(config, discovery, parser)
# Raises TokenExpiredError if token expires in less than 5 minutes
user = await provider.validate_token(token)
🧪 Testing
Using MockOAuth2Provider
The MockOAuth2Provider allows you to create test environments with simulated authentication:
from miraveja_auth.infrastructure.testing import MockOAuth2Provider
import pytest
@pytest.fixture
def mock_provider():
return MockOAuth2Provider()
@pytest.mark.asyncio
async def test_user_authentication(mock_provider):
# Add test user with roles
mock_provider.add_user(
user_id="123",
username="testuser",
email="test@example.com",
realm_roles=["user", "admin"],
client_roles={"my-client": ["read", "write"]},
)
# Get token for user
token = mock_provider.set_token_for_user("123")
# Validate token
user = await mock_provider.validate_token(token)
assert user.id == "123"
assert user.username == "testuser"
assert "admin" in user.realm_roles
assert "read" in user.client_roles["my-client"]
Simulating Authentication Failures
@pytest.mark.asyncio
async def test_expired_token(mock_provider):
mock_provider.add_user("123", "testuser")
# Simulate token expiration
mock_provider.simulate_failure("expired")
with pytest.raises(TokenExpiredException):
await mock_provider.validate_token("any-token")
Testing FastAPI Dependencies
from fastapi.testclient import TestClient
from miraveja_auth.infrastructure.fastapi_integration import FastAPIAuthenticator
from miraveja_auth.infrastructure.testing import MockOAuth2Provider
def test_protected_endpoint():
# Setup mock provider
mock_provider = MockOAuth2Provider()
mock_provider.add_user(
"123",
"testuser",
realm_roles=["admin"]
)
token = mock_provider.set_token_for_user("123")
# Create authenticator with mock provider
authenticator = FastAPIAuthenticator(mock_provider)
# Test endpoint
client = TestClient(app)
response = client.get(
"/admin/dashboard",
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 200
Running Tests
# Run all tests
poetry run pytest
# Run with coverage
poetry run pytest --cov=src/miraveja_auth --cov-report=html
# Run specific test file
poetry run pytest tests/unit/miraveja_auth/domain/test_models.py
# Run with verbose output
poetry run pytest -v
# Run integration tests only
poetry run pytest tests/integration
# Run unit tests only
poetry run pytest tests/unit
📂 Examples
Complete working examples are available in the examples/ directory:
basic_usage.py- Basic token validation and role checkingfastapi_app.py- Complete FastAPI application with authenticationcustom_claims_parser.py- Custom Auth0 claims parser exampleseparate_authenticators.py- Using separate HTTP and WebSocket authenticators
Run examples:
# Basic usage
poetry run python examples/basic_usage.py
# FastAPI app (requires uvicorn)
poetry run uvicorn examples.fastapi_app:app --reload
# Custom claims parser
poetry run python examples/custom_claims_parser.py
# Separate authenticators
poetry run uvicorn examples.separate_authenticators:app --reload
💡 Best Practices
- Use environment variables: Store OAuth2 configuration in environment variables, not in code
- Enable SSL verification: Always use
verify_ssl=Truein production - Validate token TTL: Set appropriate
token_minimum_ttlto ensure tokens have sufficient lifetime - Use role-based authorization: Leverage realm and client roles for fine-grained access control
- Handle exceptions properly: Catch
AuthenticationExceptionandAuthorizationExceptionin your application - Test with mocks: Use
MockOAuth2Providerfor unit and integration tests - Cache provider instances: Reuse
OAuth2Providerinstances to benefit from key caching
🚧 Development Status
Active Development - Core features implemented and tested
🚀 Planned Features:
- OAuth2/OIDC protocol implementation
- JWT token validation with JWKS
- Automatic OIDC discovery
- Role-based authorization (Keycloak-style)
- FastAPI integration with dependency injection
- Testing utilities (MockOAuth2Provider)
- Exception handling and error reporting
- Environment-based configuration
- Additional role mappers (Auth0, Cognito, Azure AD)
- Token refresh support
- WebSocket authentication
- GraphQL integration
- Performance optimizations (connection pooling, key caching improvements)
See the implementation plan for detailed design notes.
🤝 Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
Development Setup
# Clone the repository
git clone https://github.com/JomarJunior/miraveja-authentication.git
cd miraveja-authentication
# Install dependencies
poetry install
# Install pre-commit hooks
poetry run pre-commit install
# Run tests
poetry run pytest --cov=src/miraveja_auth
Code Quality
# Format code
poetry run black src tests
# Sort imports
poetry run isort src tests
# Run linter
poetry run pylint src/miraveja_auth
# Run type checker
poetry run mypy src/miraveja_auth
# Run pre-commit hooks
poetry run pre-commit run --all-files
📄 License
This project is licensed under the MIT License.
🙏 Acknowledgments
- Built as part of the Miraveja ecosystem
- Follows OAuth2/OpenID Connect standards
- Inspired by Keycloak's role-based authorization model
- Follows DDD/Hexagonal Architecture principles
📞 Contact
- Author: Jomar Júnior de Souza Pereira
- Email: jomarjunior@poli.ufrj.br
- Repository: https://github.com/JomarJunior/miraveja-authentication
🔗 Related Projects
- miraveja-di - Dependency Injection container
- miraveja-log - Logging library
- miraveja - Main Miraveja project
Made with ❤️ for the Miraveja ecosystem
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file miraveja_authentication-0.1.1.tar.gz.
File metadata
- Download URL: miraveja_authentication-0.1.1.tar.gz
- Upload date:
- Size: 30.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ed14bb86e631128b22a1d215321bca7917b0d4efac8df7540ce0c77dc7d3c956
|
|
| MD5 |
e8c814457b68843472ebbb58e1ca892f
|
|
| BLAKE2b-256 |
7dd0796937a2e070fb93e94ca84fd886ae2e9b057688b885a3dd641166eee29c
|
Provenance
The following attestation bundles were made for miraveja_authentication-0.1.1.tar.gz:
Publisher:
publish.yml on JomarJunior/miraveja-authentication
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
miraveja_authentication-0.1.1.tar.gz -
Subject digest:
ed14bb86e631128b22a1d215321bca7917b0d4efac8df7540ce0c77dc7d3c956 - Sigstore transparency entry: 714730506
- Sigstore integration time:
-
Permalink:
JomarJunior/miraveja-authentication@19a4f2ffc3390d9c3e9d4830cfbd649ee8c451c9 -
Branch / Tag:
refs/tags/v0.1.1 - Owner: https://github.com/JomarJunior
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@19a4f2ffc3390d9c3e9d4830cfbd649ee8c451c9 -
Trigger Event:
release
-
Statement type:
File details
Details for the file miraveja_authentication-0.1.1-py3-none-any.whl.
File metadata
- Download URL: miraveja_authentication-0.1.1-py3-none-any.whl
- Upload date:
- Size: 29.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2a49aaf41db98194e2c2a63cedb890a8b17a9276d20ab92947a3fe0f8b357c0e
|
|
| MD5 |
41070bd2b17fd9941f1a3769af61f434
|
|
| BLAKE2b-256 |
da5143447f77d75fb86ea28716edd15ca693c2702cbb42de9db5d554e1a5d886
|
Provenance
The following attestation bundles were made for miraveja_authentication-0.1.1-py3-none-any.whl:
Publisher:
publish.yml on JomarJunior/miraveja-authentication
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
miraveja_authentication-0.1.1-py3-none-any.whl -
Subject digest:
2a49aaf41db98194e2c2a63cedb890a8b17a9276d20ab92947a3fe0f8b357c0e - Sigstore transparency entry: 714730550
- Sigstore integration time:
-
Permalink:
JomarJunior/miraveja-authentication@19a4f2ffc3390d9c3e9d4830cfbd649ee8c451c9 -
Branch / Tag:
refs/tags/v0.1.1 - Owner: https://github.com/JomarJunior
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@19a4f2ffc3390d9c3e9d4830cfbd649ee8c451c9 -
Trigger Event:
release
-
Statement type: