RBAC and Auth core utilities including JWT token management.
Project description
ABS Auth RBAC Core
A comprehensive authentication and Role-Based Access Control (RBAC) package for FastAPI applications. This package provides robust JWT-based authentication and flexible role-based permission management with an in-memory per-user permission cache and Azure Service Bus for cross-server cache invalidation.
What's New in v0.6.0
Casbin has been removed. The RBAC system now uses a per-user in-memory permission cache with targeted Azure Service Bus invalidation. This eliminates the 3-4 second full policy reload that caused 403 errors during permission updates.
Key Changes
| Before (Casbin) | After (v0.6.0) | |
|---|---|---|
| On permission change | Reloads ALL policies (full table scan) | Invalidates ONLY affected user's cache entry |
| Reload time | 3-4 seconds | ~200-500ms |
| Memory model | Monolithic store of every policy | Per-user granular cache (loaded on demand) |
| Read path | In-memory lookup (stale for 3-4s during reload) | In-memory lookup (stale for max ~500ms) |
| Dependencies removed | - | casbin, casbin-sqlalchemy-adapter, casbin-redis-watcher |
Migration Guide
Constructor — The RBACService constructor now accepts **kwargs for backward compatibility. Old redis_config or servicebus_config (for the Casbin watcher) will be silently accepted. To enable cross-server cache invalidation, pass the new CacheInvalidationNotifierConfig:
from abs_auth_rbac_core.rbac import RBACService, CacheInvalidationNotifierConfig
# Single-server mode (cache-only, no cross-server sync)
rbac_service = RBACService(session=db_session)
# Multi-server mode (with Azure Service Bus invalidation)
rbac_service = RBACService(
session=db_session,
servicebus_config=CacheInvalidationNotifierConfig(
connection_string="Endpoint=sb://your-ns.servicebus.windows.net/;...",
topic_name="rbac-cache-invalidation",
),
cache_ttl=300, # 5 min default
cache_max_size=10000, # LRU eviction cap
)
Removed methods — The following Casbin-specific methods no longer exist:
add_policy(),add_policies(),remove_policy(),remove_policies()enforce_policy(),remove_filter_policy()reload_policies(),get_policy_count(),clear_all_policies()
Removed exports — AzureServiceBusWatcherSchema is replaced by CacheInvalidationNotifierConfig.
Removed files — watcher.py, policy.conf, gov_casbin_rule.py are deleted.
Unchanged — All decorators (rbac_require_permission, check_permissions), auth middleware, JWT functions, models, schemas, and permission constants work exactly as before.
Features
- JWT-based Authentication: Secure token-based authentication with customizable expiration
- Password Security: Secure password storage using bcrypt with passlib
- Role-Based Access Control (RBAC): Flexible permission management with per-user caching
- Real-time Cache Invalidation: Azure Service Bus for targeted cross-server sync
- User-Role Management: Dynamic role assignment and revocation
- Permission Enforcement: Decorator-based permission checking with nested AND/OR logic
- Middleware Integration: Seamless FastAPI middleware integration
- Comprehensive Error Handling: Built-in exception handling for security scenarios
- Dependency Injection Ready: Compatible with dependency-injector
- Permission Constants: Predefined permission constants and enums
Installation
pip install abs-auth-rbac-core
Architecture Overview
┌─────────────────────────────────────────────────────┐
│ Server A │
│ ┌──────────────┐ ┌──────────────────────────┐ │
│ │ Permission │<───│ RBACService │ │
│ │ Cache │ │ - check_permission() │ │
│ │ (per-user) │ │ - create_role() │ │
│ └──────────────┘ │ - assign_permissions() │ │
│ ^ └─────────┬────────────────┘ │
│ │ invalidate │ publish │
│ │ v │
│ ┌──────┴───────────────────────────────────────┐ │
│ │ CacheInvalidationNotifier (Service Bus) │ │
│ └──────────────────────────────────────────────┘ │
└─────────────────────┬───────────────────────────────┘
│ Azure Service Bus Topic
│ (targeted invalidation messages)
┌─────────────────────v───────────────────────────────┐
│ Server B │
│ ┌──────────────┐ ┌──────────────────────────┐ │
│ │ Permission │<───│ CacheInvalidationNotifier│ │
│ │ Cache │ │ (receives & invalidates) │ │
│ │ (per-user) │ └──────────────────────────┘ │
│ └──────────────┘ │
└─────────────────────────────────────────────────────┘
How It Works
-
Read path:
check_permission()looks up the user's cache entry. On hit, it's a sub-millisecond set lookup. On miss, it runs a single SQL query to load all role + direct permissions for that user, caches the result, then checks. -
Write path: After any permission/role change (e.g.,
update_role_permissions()), the local cache is invalidated for affected users. If a notifier is configured, a targeted invalidation message is published via Azure Service Bus. -
Cross-server sync: Other servers receive the invalidation message and delete only the affected users' cache entries. The next permission check for those users will reload from DB.
-
Safety TTL: Cache entries expire after a configurable TTL (default 5 minutes) as a fallback, even without explicit invalidation.
Quick Start
1. Basic Setup
from abs_auth_rbac_core.auth.jwt_functions import JWTFunctions
from abs_auth_rbac_core.rbac import RBACService, CacheInvalidationNotifierConfig
import os
# Initialize JWT functions
jwt_functions = JWTFunctions(
secret_key=os.getenv("JWT_SECRET_KEY"),
algorithm=os.getenv("JWT_ALGORITHM", "HS256"),
expire_minutes=int(os.getenv("JWT_EXPIRE_MINUTES", "60"))
)
# Initialize RBAC service
rbac_service = RBACService(
session=your_db_session,
servicebus_config=CacheInvalidationNotifierConfig(
connection_string=os.getenv("AZURE_SERVICEBUS_CONNECTION_STRING"),
topic_name="rbac-cache-invalidation",
)
)
2. Authentication Setup
from abs_auth_rbac_core.auth.middleware import auth_middleware
from fastapi import FastAPI, Depends
app = FastAPI()
# Create authentication middleware
auth_middleware = auth_middleware(
db_session=your_db_session,
jwt_secret_key=os.getenv("JWT_SECRET_KEY"),
jwt_algorithm=os.getenv("JWT_ALGORITHM", "HS256")
)
# Apply to specific routers
app.include_router(
protected_router,
dependencies=[Depends(auth_middleware)]
)
# Public routes (no middleware)
app.include_router(public_router)
How it works:
- Validates JWT token from Authorization header
- Extracts user UUID from token payload
- Fetches user from database using UUID
- Sets user object in
request.state.user - Returns user object for route handlers
Accessing the user in routes:
@router.get("/profile")
async def get_profile(request: Request):
user = request.state.user
return {"user_id": user.uuid, "email": user.email}
3. RBAC Operations
# Create a role with permissions
role = rbac_service.create_role(
name="admin",
description="Administrator role with full access",
permission_ids=["permission_uuid1", "permission_uuid2"]
)
# Assign roles to user
rbac_service.bulk_assign_roles_to_user(
user_uuid="user_uuid",
role_uuids=["role_uuid1", "role_uuid2"]
)
# Check user permissions (cache-first, sub-ms on hit)
has_permission = rbac_service.check_permission(
user_uuid="user_uuid",
resource="USER_MANAGEMENT",
action="VIEW",
module="USER_MANAGEMENT"
)
# Get user permissions and roles
user_permissions = rbac_service.get_user_permissions(user_uuid="user_uuid")
user_roles = rbac_service.get_user_roles(user_uuid="user_uuid")
Core Components
Authentication (auth/)
jwt_functions.py: JWT token management and password hashingmiddleware.py: Authentication middleware for FastAPIauth_functions.py: Core authentication functions
RBAC (rbac/)
service.py: Main RBAC service with role and permission managementdecorator.py: Decorators for permission checking (rbac_require_permission,check_permissions)cache.py: Thread-safe per-user LRU permission cache with TTLnotifier.py: Azure Service Bus cache invalidation notifiertypes.py: Type definitions for permission system
Models (models/)
user.py: User modelroles.py: Role modelpermissions.py: Permission modeluser_role.py: User-Role association modelrole_permission.py: Role-Permission association modeluser_permission.py: User-Permission association modelrbac_model.py: Base RBAC modelbase_model.py: Base model with common fields
Schema (schema/)
permission.py: Permission-related schemas
Utilities (util/)
permission_constants.py: Predefined permission constants and enumsget_permissions_resources.py: Resource ID extraction from permissions
Dependency Injection Setup
from dependency_injector import containers, providers
from abs_auth_rbac_core.auth.middleware import auth_middleware
from abs_auth_rbac_core.rbac import RBACService, CacheInvalidationNotifierConfig
from abs_auth_rbac_core.util.permission_constants import (
PermissionAction,
PermissionModule,
PermissionResource
)
class Container(containers.DeclarativeContainer):
wiring_config = containers.WiringConfiguration(
modules=[
"src.api.auth_route",
"src.api.endpoints.rbac.permission_route",
"src.api.endpoints.rbac.role_route",
"src.api.endpoints.rbac.users_route",
]
)
db_session = providers.Factory(your_db_session_factory)
rbac_service = providers.Singleton(
RBACService,
session=db_session,
servicebus_config=CacheInvalidationNotifierConfig(
connection_string=os.getenv("AZURE_SERVICEBUS_CONNECTION_STRING"),
topic_name="rbac-cache-invalidation",
)
)
get_auth_middleware = providers.Factory(
auth_middleware,
db_session=db_session,
jwt_secret_key=os.getenv("JWT_SECRET_KEY"),
jwt_algorithm=os.getenv("JWT_ALGORITHM", "HS256")
)
Route Implementation with Permissions
from fastapi import APIRouter, Depends, Request
from dependency_injector.wiring import Provide, inject
from abs_auth_rbac_core.rbac import rbac_require_permission, check_permissions
from abs_auth_rbac_core.util.permission_constants import (
PermissionAction,
PermissionModule,
PermissionResource
)
router = APIRouter(prefix="/users")
# Simple permission check (all listed permissions required)
@router.get("/{user_id}", response_model=UserProfile)
@inject
@rbac_require_permission(
f"{PermissionModule.USER_MANAGEMENT.value}:{PermissionResource.USER_MANAGEMENT.value}:{PermissionAction.VIEW.value}"
)
async def get_user(
user_id: int,
request: Request,
service: UserService = Depends(Provide[Container.user_service]),
rbac_service: RBACService = Depends(Provide[Container.rbac_service]),
):
return service.get_user_profile("id", user_id, rbac_service)
# Advanced permission check with AND/OR logic, placeholders, pre/post callbacks
@router.put("/{entity_id}")
@inject
@check_permissions(
permissions=[
("MODULE", "resource", "edit"),
("MODULE", "resource", "admin"),
],
operator="OR",
)
async def update_entity(
entity_id: str,
request: Request,
rbac_service: RBACService = Depends(Provide[Container.rbac_service]),
):
...
Permission System
Permission Format
Permissions follow the format: module:resource:action
- Module: The system module (e.g.,
USER_MANAGEMENT,EMAIL_PROCESS) - Resource: The specific resource within the module (e.g.,
USER_MANAGEMENT,ROLE_MANAGEMENT) - Action: The action being performed (e.g.,
VIEW,CREATE,EDIT,DELETE)
Using Permission Constants
from abs_auth_rbac_core.util.permission_constants import (
PermissionAction,
PermissionModule,
PermissionResource,
PermissionConstants
)
# Using enums
permission_string = f"{PermissionModule.USER_MANAGEMENT.value}:{PermissionResource.USER_MANAGEMENT.value}:{PermissionAction.VIEW.value}"
# Using predefined constants
user_view_permission = PermissionConstants.RBAC_USER_MANAGEMENT_VIEW
permission_string = f"{user_view_permission.module}:{user_view_permission.resource}:{user_view_permission.action}"
Configuration
Environment Variables
# JWT Configuration
JWT_SECRET_KEY=your-secret-key-here
JWT_ALGORITHM=HS256
JWT_ACCESS_TOKEN_EXPIRE_MINUTES=60
JWT_REFRESH_TOKEN_EXPIRE_MINUTES=1440
# Azure Service Bus (for cross-server cache invalidation)
AZURE_SERVICEBUS_CONNECTION_STRING=Endpoint=sb://your-ns.servicebus.windows.net/;...
# Or use managed identity:
# AZURE_SERVICEBUS_NAMESPACE=your-ns.servicebus.windows.net
# Instance ID (auto-detected from HOSTNAME if not set)
RBAC_INSTANCE_ID=server-1
# Database Configuration
DATABASE_URI=postgresql://user:password@localhost/dbname
Cache Configuration
rbac_service = RBACService(
session=db_session,
cache_ttl=300, # Cache entry TTL in seconds (default: 300 = 5 min)
cache_max_size=10000, # Max cached users before LRU eviction (default: 10,000)
)
Error Handling
from abs_exception_core.exceptions import (
UnauthorizedError,
PermissionDeniedError,
ValidationError,
DuplicatedError,
NotFoundError
)
Health Checks
@app.get("/health")
async def health_check():
return {
"status": "healthy",
"rbac_notifier": rbac_service.is_watcher_active(),
"rbac_notifier_details": rbac_service.get_watcher_status(),
}
Troubleshooting
Common Issues
| Issue | Solution |
|---|---|
| Authentication Fails | Check JWT secret key, token expiration, user existence |
| Permission Denied | Verify user roles, role-permission assignments, permission format |
| Stale permissions after update | Check Service Bus connection, or wait for cache TTL expiry |
| Cache invalidation not working | Verify AZURE_SERVICEBUS_CONNECTION_STRING, topic exists, subscription auto-created |
| High memory usage | Reduce cache_max_size or lower cache_ttl |
Debug Mode
import logging
logging.basicConfig(level=logging.DEBUG)
# Check notifier status
print(f"Notifier active: {rbac_service.is_watcher_active()}")
print(f"Status: {rbac_service.get_watcher_status()}")
API Reference
RBACService Methods
| Method | Description |
|---|---|
check_permission(user_uuid, resource, action, module) |
Check role-based permission (cache-first) |
check_permission_by_user(user_uuid, resource, action, module) |
Check direct user permission (cache-first) |
check_permission_by_role(role_uuid, resource, action, module) |
Check role permission (direct SQL) |
create_role(name, description, permission_ids) |
Create a new role |
update_role_permissions(role_uuid, permissions, name, description) |
Update role permissions |
delete_role(role_uuid, exception_roles) |
Delete a role |
bulk_assign_roles_to_user(user_uuid, role_uuids) |
Assign multiple roles to user |
bulk_revoke_roles_from_user(user_uuid, role_uuids) |
Revoke roles from user |
bulk_attach_roles_to_user(user_uuid, role_uuids) |
Attach additional roles to user |
attach_permissions_to_user(user_uuid, permission_uuids) |
Attach direct permissions |
revoke_user_permissions(user_uuid, permission_uuids) |
Revoke direct permissions |
revoke_all_user_access(user_uuid) |
Revoke all roles and permissions |
get_user_permissions(user_uuid) |
Get role-based permissions |
get_user_only_permissions(user_uuid) |
Get direct user permissions |
get_user_roles(user_uuid) |
Get user roles |
list_roles() |
List all roles |
list_permissions() |
List all permissions |
is_watcher_active() |
Check notifier status |
get_watcher_status() |
Get notifier details |
CacheInvalidationNotifierConfig
| Field | Type | Default | Description |
|---|---|---|---|
connection_string |
str | None |
None |
Service Bus connection string |
fully_qualified_namespace |
str | None |
None |
For managed identity auth |
topic_name |
str |
"rbac-cache-invalidation" |
Service Bus topic name |
subscription_name |
str | None |
Auto from RBAC_INSTANCE_ID |
Subscription name |
max_wait_time_seconds |
int |
5 |
Receiver poll interval |
message_ttl_seconds |
int |
300 |
Message time-to-live |
auto_create_subscription |
bool |
True |
Auto-create subscription |
JWT Functions
| Method | Description |
|---|---|
create_access_token(data) |
Create JWT access token |
decode_jwt(token) |
Decode and validate JWT |
get_password_hash(password) |
Hash password with bcrypt |
verify_password(password, hashed) |
Verify password hash |
Dependencies
Core Dependencies
- pyjwt (>=2.10.1,<3.0.0): JWT token handling
- fastapi[standard] (>=0.115.2): Web framework
- passlib (>=1.7.4,<2.0.0): Password hashing
- sqlalchemy (>=2.0.40,<3.0.0): Database ORM
- azure-servicebus (>=7.14.3,<8.0.0): Cross-server cache invalidation
- azure-identity (>=1.25.2,<2.0.0): Azure authentication
Internal Dependencies
- abs-exception-core (>=0.2.2,<0.3.0): Exception handling
- abs-utils (>=0.4.3,<0.5.0): Logger and utilities
- abs-repository-core (>=0.3.0,<0.4.0): Base repository
- abs-nosql-repository-core (>=0.11.8,<0.12.0): NoSQL repository
- psycopg2-binary (>=2.9.10,<3.0.0): PostgreSQL adapter
License
This project is licensed under the MIT License - see the LICENSE file for details.
Version: 0.6.0 Last Updated: 2026 Python Version: >=3.11,<4.0
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file abs_auth_rbac_core-0.7.0.tar.gz.
File metadata
- Download URL: abs_auth_rbac_core-0.7.0.tar.gz
- Upload date:
- Size: 46.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.1.2 CPython/3.13.12 Darwin/23.6.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9000b2aa1e34281683d3d1eeb09bf3fd7a1e0962197797eb9eb3067a24c4ddbd
|
|
| MD5 |
8858f6b90d4c6a171be116692e02b3f3
|
|
| BLAKE2b-256 |
8f51e441046ded3d1a5c6779e7fe82e23ac6dae20e421fe2d578632824ab5d0f
|
File details
Details for the file abs_auth_rbac_core-0.7.0-py3-none-any.whl.
File metadata
- Download URL: abs_auth_rbac_core-0.7.0-py3-none-any.whl
- Upload date:
- Size: 52.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.1.2 CPython/3.13.12 Darwin/23.6.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
06558dfbebe3614ccb02be60afff9ba30dcac182b7f9309de50a01d04dffa233
|
|
| MD5 |
dc4fe365eae9db93757520401353d680
|
|
| BLAKE2b-256 |
ba5827510ed98c74cde118aadb768ced9eb15eb7685eb87803ae694c18eecc96
|