RBAC and Auth core utilities including JWT token management.
Project description
ABS Auth RBAC Core
A comprehensive authentication and Role-Based Access Control (RBAC) package for FastAPI applications. This package provides robust JWT-based authentication and flexible role-based permission management using Casbin with Redis support for real-time policy updates.
๐ Features
- ๐ JWT-based Authentication: Secure token-based authentication with customizable expiration
- ๐ Password Security: Secure password storage using bcrypt with passlib
- ๐ฅ Role-Based Access Control (RBAC): Flexible permission management using Casbin
- โก Real-time Policy Updates: Redis integration for live policy synchronization
- ๐ User-Role Management: Dynamic role assignment and revocation
- ๐ก๏ธ Permission Enforcement: Decorator-based permission checking
- ๐ Middleware Integration: Seamless FastAPI middleware integration
- ๐ Comprehensive Error Handling: Built-in exception handling for security scenarios
- ๐๏ธ Dependency Injection Ready: Compatible with dependency-injector
- ๐ Permission Constants: Predefined permission constants and enums
๐ฆ Installation
pip install abs-auth-rbac-core
๐๏ธ Architecture Overview
โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ
โ FastAPI App โ โ Auth Middlewareโ โ RBAC Service โ
โ โ โ โ โ โ
โ โโโโโโโโโโโโโโโ โ โ โโโโโโโโโโโโโโโ โ โ โโโโโโโโโโโโโโโ โ
โ โ Routes โ โโโโโบโ โJWT Validationโ โโโโโบโ โPermission โ โ
โ โ โ โ โ โUser Fetch โ โ โ โChecking โ โ
โ โโโโโโโโโโโโโโโ โ โ โโโโโโโโโโโโโโโ โ โ โโโโโโโโโโโโโโโ โ
โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ
โ โ โ
โ โ โ
โผ โผ โผ
โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ
โ Database โ โ Redis โ โ Casbin โ
โ (Users, โ โ (Policy โ โ (Policy โ
โ Roles, โ โ Updates) โ โ Engine) โ
โ Permissions) โ โ โ โ โ
โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ
๐ Quick Start
1. Basic Setup
from abs_auth_rbac_core.auth.jwt_functions import JWTFunctions
from abs_auth_rbac_core.rbac import RBACService
from abs_auth_rbac_core.schema.permission import RedisWatcherSchema
import os
# Initialize JWT functions
jwt_functions = JWTFunctions(
secret_key=os.getenv("JWT_SECRET_KEY"),
algorithm=os.getenv("JWT_ALGORITHM", "HS256"),
expire_minutes=int(os.getenv("JWT_EXPIRE_MINUTES", "60"))
)
# Initialize RBAC service with database session
rbac_service = RBACService(
session=your_db_session,
redis_config=RedisWatcherSchema(
host=os.getenv("REDIS_HOST"),
port=int(os.getenv("REDIS_PORT", "6379")),
channel=os.getenv("REDIS_CHANNEL", "casbin_policy_updates"),
password=os.getenv("REDIS_PASSWORD"),
ssl=os.getenv("REDIS_SSL", "false").lower() == "true"
)
)
2. Authentication Setup
Option 1: Using Package Middleware (Recommended)
from abs_auth_rbac_core.auth.middleware import auth_middleware
from fastapi import FastAPI, Depends
app = FastAPI()
# Create authentication middleware
auth_middleware = auth_middleware(
db_session=your_db_session,
jwt_secret_key=os.getenv("JWT_SECRET_KEY"),
jwt_algorithm=os.getenv("JWT_ALGORITHM", "HS256")
)
# Apply to specific routers
app.include_router(
protected_router,
dependencies=[Depends(auth_middleware)]
)
# Public routes (no middleware)
app.include_router(public_router)
How it works:
- โ Validates JWT token from Authorization header
- โ Extracts user UUID from token payload
- โ Fetches user from database using UUID
- โ
Sets user object in
request.state.user - โ Returns user object for route handlers
Accessing the user in routes:
@router.get("/profile")
async def get_profile(request: Request):
user = request.state.user
return {"user_id": user.uuid, "email": user.email}
Option 2: Custom Authentication Function
from abs_auth_rbac_core.auth.jwt_functions import JWTFunctions
from fastapi import Security, HTTPAuthorizationCredentials
from fastapi.security import HTTPBearer
from abs_exception_core.exceptions import UnauthorizedError
security = HTTPBearer(auto_error=False)
jwt_functions = JWTFunctions(
secret_key=os.getenv("JWT_SECRET_KEY"),
algorithm=os.getenv("JWT_ALGORITHM", "HS256"),
expire_minutes=int(os.getenv("JWT_EXPIRE_MINUTES", "60"))
)
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Security(security),
) -> Dict:
try:
if not credentials:
raise UnauthorizedError(detail="No authorization token provided")
token = credentials.credentials
if token.lower().startswith("bearer "):
token = token[7:]
decoded_token = jwt_functions.decode_jwt(token)
if not decoded_token:
raise UnauthorizedError(detail="Invalid or expired token")
return decoded_token
except Exception as e:
raise UnauthorizedError(detail=str(e))
@app.get("/protected")
async def protected_route(current_user: dict = Depends(get_current_user)):
return {"message": f"Hello {current_user.get('name')}"}
3. RBAC Operations
# Create a role with permissions
role = rbac_service.create_role(
name="admin",
description="Administrator role with full access",
permission_ids=["permission_uuid1", "permission_uuid2"]
)
# Assign roles to user
rbac_service.bulk_assign_roles_to_user(
user_uuid="user_uuid",
role_uuids=["role_uuid1", "role_uuid2"]
)
# Check user permissions
has_permission = rbac_service.check_permission(
user_uuid="user_uuid",
resource="USER_MANAGEMENT",
action="VIEW",
module="USER_MANAGEMENT"
)
# Get user permissions and roles
user_permissions = rbac_service.get_user_permissions(user_uuid="user_uuid")
user_roles = rbac_service.get_user_roles(user_uuid="user_uuid")
๐๏ธ Core Components
Authentication (auth/)
jwt_functions.py: JWT token management and password hashingmiddleware.py: Authentication middleware for FastAPIauth_functions.py: Core authentication functions
RBAC (rbac/)
service.py: Main RBAC service with role and permission managementdecorator.py: Decorators for permission checkingpolicy.conf: Casbin policy configuration
Models (models/)
user.py: User modelroles.py: Role modelpermissions.py: Permission modeluser_role.py: User-Role association modelrole_permission.py: Role-Permission association modeluser_permission.py: User-Permission association modelrbac_model.py: Base RBAC modelbase_model.py: Base model with common fieldsgov_casbin_rule.py: Casbin rule model
Schema (schema/)
permission.py: Permission-related schemas
Utilities (util/)
permission_constants.py: Predefined permission constants and enums
๐ง Complete Implementation Example
1. Dependency Injection Setup
from dependency_injector import containers, providers
from abs_auth_rbac_core.auth.middleware import auth_middleware
from abs_auth_rbac_core.rbac import RBACService
from abs_auth_rbac_core.schema.permission import RedisWatcherSchema
from abs_auth_rbac_core.util.permission_constants import (
PermissionAction,
PermissionModule,
PermissionResource
)
class Container(containers.DeclarativeContainer):
# Configure wiring for dependency injection
wiring_config = containers.WiringConfiguration(
modules=[
"src.api.auth_route",
"src.api.endpoints.rbac.permission_route",
"src.api.endpoints.rbac.role_route",
"src.api.endpoints.rbac.users_route",
]
)
# Database session provider
db_session = providers.Factory(your_db_session_factory)
# RBAC service provider
rbac_service = providers.Singleton(
RBACService,
session=db_session,
redis_config=RedisWatcherSchema(
host=os.getenv("REDIS_HOST"),
port=int(os.getenv("REDIS_PORT", "6379")),
channel=os.getenv("REDIS_CHANNEL", "casbin_policy_updates"),
password=os.getenv("REDIS_PASSWORD"),
ssl=os.getenv("REDIS_SSL", "false").lower() == "true"
)
)
# Auth middleware provider
get_auth_middleware = providers.Factory(
auth_middleware,
db_session=db_session,
jwt_secret_key=os.getenv("JWT_SECRET_KEY"),
jwt_algorithm=os.getenv("JWT_ALGORITHM", "HS256")
)
# Initialize container
container = Container()
app.container = container
2. Application Setup
from fastapi import FastAPI, Depends
from fastapi.middleware.cors import CORSMiddleware
from dependency_injector.wiring import Provide, inject
from src.core.container import Container
class CreateApp:
def __init__(self):
self.container = Container()
self.db = self.container.db()
self.auth_middleware = self.container.get_auth_middleware()
self.app = FastAPI(
title="Your Service",
description="Service Description",
version="0.2.0"
)
# Apply CORS middleware
self.app.add_middleware(
CORSMiddleware,
allow_origins=[str(origin) for origin in configs.BACKEND_CORS_ORIGINS],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Public routes (no authentication required)
self.app.include_router(auth_router, tags=["Auth"])
self.app.include_router(public_router_v1)
# Protected routes (authentication required)
self.app.include_router(
router_v1,
dependencies=[Depends(self.auth_middleware)]
)
# Register exception handlers
register_exception_handlers(self.app)
# Initialize application
application = CreateApp()
app = application.app
3. Route Implementation with Permissions
from fastapi import APIRouter, Depends, Request
from dependency_injector.wiring import Provide, inject
from abs_auth_rbac_core.rbac import rbac_require_permission
from abs_auth_rbac_core.util.permission_constants import (
PermissionAction,
PermissionModule,
PermissionResource
)
# Protected router (requires authentication)
router = APIRouter(prefix="/users")
# Public route example
@router.post("/all", response_model=FindUserResult)
@inject
async def get_user_list(
request: Request,
find_query: FindUser = Body(...),
rbac_service: RBACService = Depends(Provide[Container.rbac_service]),
service: UserService = Depends(Provide[Container.user_service]),
):
"""Get the list of users with filtering, sorting and pagination"""
find_query.searchable_fields = find_query.searchable_fields or ["name"]
users = service.get_list(schema=find_query)
return users
# Protected route with permission check
@router.get("/{user_id}", response_model=UserProfile)
@inject
@rbac_require_permission(
f"{PermissionModule.USER_MANAGEMENT.value}:{PermissionResource.USER_MANAGEMENT.value}:{PermissionAction.VIEW.value}"
)
async def get_user(
user_id: int,
request: Request,
service: UserService = Depends(Provide[Container.user_service]),
rbac_service: RBACService = Depends(Provide[Container.rbac_service]),
):
"""Get user profile with permissions and roles"""
return service.get_user_profile("id", user_id, rbac_service)
๐ Permission System
Permission Format
Permissions follow the format: module:resource:action
- Module: The system module (e.g.,
USER_MANAGEMENT,EMAIL_PROCESS) - Resource: The specific resource within the module (e.g.,
USER_MANAGEMENT,ROLE_MANAGEMENT) - Action: The action being performed (e.g.,
VIEW,CREATE,EDIT,DELETE)
Using Permission Constants
from abs_auth_rbac_core.util.permission_constants import (
PermissionAction,
PermissionModule,
PermissionResource,
PermissionConstants
)
# Using enums
permission_string = f"{PermissionModule.USER_MANAGEMENT.value}:{PermissionResource.USER_MANAGEMENT.value}:{PermissionAction.VIEW.value}"
# Using predefined constants
user_view_permission = PermissionConstants.RBAC_USER_MANAGEMENT_VIEW
permission_string = f"{user_view_permission.module}:{user_view_permission.resource}:{user_view_permission.action}"
Multiple Permissions
@rbac_require_permission([
f"{PermissionModule.USER_MANAGEMENT.value}:{PermissionResource.USER_MANAGEMENT.value}:{PermissionAction.VIEW.value}",
f"{PermissionModule.USER_MANAGEMENT.value}:{PermissionResource.ROLE_MANAGEMENT.value}:{PermissionAction.VIEW.value}"
])
async def get_user_with_roles():
# User needs both permissions to access this endpoint
pass
โ๏ธ Configuration
Environment Variables
# JWT Configuration
JWT_SECRET_KEY=your-secret-key-here
JWT_ALGORITHM=HS256
JWT_ACCESS_TOKEN_EXPIRE_MINUTES=60
JWT_REFRESH_TOKEN_EXPIRE_MINUTES=1440
# Redis Configuration (for real-time policy updates)
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=your-redis-password
REDIS_CHANNEL=casbin_policy_updates
REDIS_SSL=false
# Database Configuration
DATABASE_URI=postgresql://user:password@localhost/dbname
Casbin Policy Configuration
The package uses a default policy configuration that supports:
- Role-based access control
- Resource-based permissions
- Module-based organization
- Super admin bypass
Policy format: [role] [resource] [action] [module]
๐ ๏ธ Advanced Usage
User Profile with Permissions
def get_user_profile(self, attr: str, value: any, rbac_service: RBACService) -> UserProfile:
"""Get user profile with permissions and roles"""
user = self.user_repository.read_by_attr(attr, value, eager=True)
# Get user permissions and roles
permissions = rbac_service.get_user_permissions(user_uuid=user.uuid)
user_permissions = rbac_service.get_user_only_permissions(user_uuid=user.uuid)
roles = rbac_service.get_user_roles(user_uuid=user.uuid)
# Convert roles to response models
role_models = [UserRoleResponse.model_validate(role) for role in roles]
return UserProfile(
id=user.id,
uuid=user.uuid,
email=user.email,
name=user.name,
is_active=user.is_active,
last_login_at=user.last_login_at,
permissions=permissions,
user_permissions=user_permissions,
roles=role_models,
)
Role and Permission Management
@router.get("/roles")
@inject
@rbac_require_permission(
f"{PermissionModule.USER_MANAGEMENT.value}:{PermissionResource.ROLE_MANAGEMENT.value}:{PermissionAction.VIEW.value}"
)
async def get_roles(
request: Request,
rbac_service: RBACService = Depends(Provide[Container.rbac_service]),
):
"""Get all roles"""
return rbac_service.list_roles()
@router.post("/roles")
@inject
@rbac_require_permission(
f"{PermissionModule.USER_MANAGEMENT.value}:{PermissionResource.ROLE_MANAGEMENT.value}:{PermissionAction.CREATE.value}"
)
async def create_role(
role: CreateRoleSchema,
request: Request,
rbac_service: RBACService = Depends(Provide[Container.rbac_service]),
):
"""Create a new role with permissions"""
return rbac_service.create_role(
name=role.name,
description=role.description,
permission_ids=role.permission_ids
)
๐ Authentication Flow
1. Client Request
โโโโโโโโโโโโโโโโโโโ
โ Authorization: โ
โ Bearer <token> โ
โโโโโโโโโโโโโโโโโโโ
โ
โผ
2. Auth Middleware
โโโโโโโโโโโโโโโโโโโโโโโโ
โ Validate JWT โ
โ Extract User ID โ
โ Fetch User โ
โ Set in Request state โ
โโโโโโโโโโโโโโโโโโโโโโโโ
โ
โผ
3. RBAC Decorator
โโโโโโโโโโโโโโโโโโโ
โ Get User UUID โ
โ Check Permissionsโ
โ Allow/Deny โ
โโโโโโโโโโโโโโโโโโโ
โ
โผ
4. Route Handler
โโโโโโโโโโโโโโโโโโโ
โ Execute Logic โ
โ Return Response โ
โโโโโโโโโโโโโโโโโโโ
๐จ Error Handling
The package includes comprehensive error handling:
from abs_exception_core.exceptions import (
UnauthorizedError,
PermissionDeniedError,
ValidationError,
DuplicatedError,
NotFoundError
)
# Handle authentication errors
try:
user = await auth_middleware(request)
except UnauthorizedError as e:
return {"error": "Authentication failed", "detail": str(e)}
# Handle permission errors
try:
# Protected operation
pass
except PermissionDeniedError as e:
return {"error": "Permission denied", "detail": str(e)}
๐ Monitoring and Logging
import logging
from abs_utils.logger import setup_logger
logger = setup_logger(__name__)
# Log authentication events
logger.info(f"User {user_uuid} authenticated successfully")
# Log permission checks
logger.info(f"Permission check: {user_uuid} -> {resource}:{action}:{module}")
# Log role assignments
logger.info(f"Roles assigned to user {user_uuid}: {role_uuids}")
๐ฅ Health Checks
@app.get("/health")
async def health_check():
return {
"status": "healthy",
"rbac_watcher": rbac_service.is_watcher_active(),
"policy_count": rbac_service.get_policy_count()
}
๐ง Troubleshooting
Common Issues
| Issue | Solution |
|---|---|
| Authentication Fails | Check JWT secret key, token expiration, user existence |
| Permission Denied | Verify user roles, role-permission assignments, permission format |
| Redis Connection Issues | Check Redis server status, connection parameters, pub/sub support |
| Policy Not Updating | Verify Redis watcher configuration, policy format, Redis logs |
Debug Mode
# Enable debug logging
import logging
logging.basicConfig(level=logging.DEBUG)
# Check RBAC service status
print(f"Watcher active: {rbac_service.is_watcher_active()}")
print(f"Policy count: {rbac_service.get_policy_count()}")
๐ Dependencies
Core Dependencies
- pyjwt (>=2.10.1,<3.0.0): JWT token handling
- fastapi[standard] (>=0.115.12,<0.116.0): Web framework
- passlib (>=1.7.4,<2.0.0): Password hashing
- sqlalchemy (>=2.0.40,<3.0.0): Database ORM
- casbin (>=1.41.0,<2.0.0): RBAC policy engine
- casbin-sqlalchemy-adapter (>=1.4.0,<2.0.0): Database adapter
- casbin-redis-watcher (>=1.3.0,<2.0.0): Real-time policy updates
Internal Dependencies
- abs-exception-core (>=0.1.4,<0.2.0): Exception handling
- psycopg2-binary (>=2.9.10,<3.0.0): PostgreSQL adapter
๐ Best Practices
Security
- โ Use environment variables for sensitive data
- โ Implement proper password policies
- โ Regularly rotate JWT secret keys
- โ Use HTTPS in production
- โ Implement rate limiting for authentication endpoints
Permission Design
- โ Use descriptive permission names
- โ Group related permissions by module
- โ Implement least privilege principle
- โ Document permission requirements
Performance
- โ Use Redis for real-time policy updates
- โ Implement caching for frequently accessed permissions
- โ Optimize database queries with eager loading
- โ Monitor policy enforcement performance
Maintenance
- โ Regularly audit user permissions
- โ Implement permission cleanup for inactive users
- โ Monitor and log security events
- โ Keep dependencies updated
๐ API Reference
RBACService Methods
| Method | Description |
|---|---|
create_role(name, description, permission_ids) |
Create a new role |
assign_role_to_user(user_uuid, role_uuid) |
Assign role to user |
bulk_assign_roles_to_user(user_uuid, role_uuids) |
Assign multiple roles |
check_permission(user_uuid, resource, action, module) |
Check user permission |
get_user_permissions(user_uuid) |
Get all user permissions |
get_user_roles(user_uuid) |
Get user roles |
list_roles() |
List all roles |
is_watcher_active() |
Check Redis watcher status |
JWT Functions
| Method | Description |
|---|---|
create_access_token(data) |
Create JWT access token |
decode_jwt(token) |
Decode and validate JWT |
hash_password(password) |
Hash password with bcrypt |
verify_password(password, hashed) |
Verify password hash |
๐ License
This project is licensed under the MIT License - see the LICENSE file for details.
๐ค Contributing
- Fork the repository
- Create a feature branch
- Make your changes
- Add tests
- Submit a pull request
๐ Support
For support and questions:
- Email: info@autobridgesystems.com
- Documentation: [Link to documentation]
- Issues: [GitHub Issues]
Version: 0.2.0
Last Updated: 2024
Python Version: >=3.12,<4.0
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file abs_auth_rbac_core-0.4.3.tar.gz.
File metadata
- Download URL: abs_auth_rbac_core-0.4.3.tar.gz
- Upload date:
- Size: 44.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.1.2 CPython/3.13.7 Darwin/23.6.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
954d29b305b46b6d20e0849cb5b99ea606c9979efd00b5ef04ae3657436c0c40
|
|
| MD5 |
a1fe3b2a0e52e50cd5986412556ea428
|
|
| BLAKE2b-256 |
f6d0bfd854828fe01637f5892d7faf0c7a513c96761e7c00bce3107d965cae2e
|
File details
Details for the file abs_auth_rbac_core-0.4.3-py3-none-any.whl.
File metadata
- Download URL: abs_auth_rbac_core-0.4.3-py3-none-any.whl
- Upload date:
- Size: 48.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.1.2 CPython/3.13.7 Darwin/23.6.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6bacddeda0674ebe50c781a4d6e93c4ef532abfb0b83eeedaa7940f33f1a5e10
|
|
| MD5 |
4a8301fb298c6c917b34a10eec22d00f
|
|
| BLAKE2b-256 |
a578370450b824de3a36bafde13c666089cc200160ebef0366f2176417f668a4
|