Skip to main content

RBAC and Auth core utilities including JWT token management.

Project description

ABS Auth RBAC Core

A comprehensive authentication and Role-Based Access Control (RBAC) package for FastAPI applications. This package provides robust JWT-based authentication and flexible role-based permission management with an in-memory per-user permission cache and Azure Service Bus for cross-server cache invalidation.

What's New in v0.6.0

Casbin has been removed. The RBAC system now uses a per-user in-memory permission cache with targeted Azure Service Bus invalidation. This eliminates the 3-4 second full policy reload that caused 403 errors during permission updates.

Key Changes

Before (Casbin) After (v0.6.0)
On permission change Reloads ALL policies (full table scan) Invalidates ONLY affected user's cache entry
Reload time 3-4 seconds ~200-500ms
Memory model Monolithic store of every policy Per-user granular cache (loaded on demand)
Read path In-memory lookup (stale for 3-4s during reload) In-memory lookup (stale for max ~500ms)
Dependencies removed - casbin, casbin-sqlalchemy-adapter, casbin-redis-watcher

Migration Guide

Constructor — The RBACService constructor now accepts **kwargs for backward compatibility. Old redis_config or servicebus_config (for the Casbin watcher) will be silently accepted. To enable cross-server cache invalidation, pass the new CacheInvalidationNotifierConfig:

from abs_auth_rbac_core.rbac import RBACService, CacheInvalidationNotifierConfig

# Single-server mode (cache-only, no cross-server sync)
rbac_service = RBACService(session=db_session)

# Multi-server mode (with Azure Service Bus invalidation)
rbac_service = RBACService(
    session=db_session,
    servicebus_config=CacheInvalidationNotifierConfig(
        connection_string="Endpoint=sb://your-ns.servicebus.windows.net/;...",
        topic_name="rbac-cache-invalidation",
    ),
    cache_ttl=300,       # 5 min default
    cache_max_size=10000, # LRU eviction cap
)

Removed methods — The following Casbin-specific methods no longer exist:

  • add_policy(), add_policies(), remove_policy(), remove_policies()
  • enforce_policy(), remove_filter_policy()
  • reload_policies(), get_policy_count(), clear_all_policies()

Removed exportsAzureServiceBusWatcherSchema is replaced by CacheInvalidationNotifierConfig.

Removed fileswatcher.py, policy.conf, gov_casbin_rule.py are deleted.

Unchanged — All decorators (rbac_require_permission, check_permissions), auth middleware, JWT functions, models, schemas, and permission constants work exactly as before.

Features

  • JWT-based Authentication: Secure token-based authentication with customizable expiration
  • Password Security: Secure password storage using bcrypt with passlib
  • Role-Based Access Control (RBAC): Flexible permission management with per-user caching
  • Real-time Cache Invalidation: Azure Service Bus for targeted cross-server sync
  • User-Role Management: Dynamic role assignment and revocation
  • Permission Enforcement: Decorator-based permission checking with nested AND/OR logic
  • Middleware Integration: Seamless FastAPI middleware integration
  • Comprehensive Error Handling: Built-in exception handling for security scenarios
  • Dependency Injection Ready: Compatible with dependency-injector
  • Permission Constants: Predefined permission constants and enums

Installation

pip install abs-auth-rbac-core

Architecture Overview

┌─────────────────────────────────────────────────────┐
│                    Server A                          │
│  ┌──────────────┐    ┌──────────────────────────┐   │
│  │ Permission   │<───│ RBACService              │   │
│  │ Cache        │    │  - check_permission()    │   │
│  │ (per-user)   │    │  - create_role()         │   │
│  └──────────────┘    │  - assign_permissions()  │   │
│         ^            └─────────┬────────────────┘   │
│         │ invalidate           │ publish             │
│         │                      v                     │
│  ┌──────┴───────────────────────────────────────┐   │
│  │ CacheInvalidationNotifier (Service Bus)      │   │
│  └──────────────────────────────────────────────┘   │
└─────────────────────┬───────────────────────────────┘
                      │ Azure Service Bus Topic
                      │ (targeted invalidation messages)
┌─────────────────────v───────────────────────────────┐
│                    Server B                          │
│  ┌──────────────┐    ┌──────────────────────────┐   │
│  │ Permission   │<───│ CacheInvalidationNotifier│   │
│  │ Cache        │    │ (receives & invalidates) │   │
│  │ (per-user)   │    └──────────────────────────┘   │
│  └──────────────┘                                    │
└─────────────────────────────────────────────────────┘

How It Works

  1. Read path: check_permission() looks up the user's cache entry. On hit, it's a sub-millisecond set lookup. On miss, it runs a single SQL query to load all role + direct permissions for that user, caches the result, then checks.

  2. Write path: After any permission/role change (e.g., update_role_permissions()), the local cache is invalidated for affected users. If a notifier is configured, a targeted invalidation message is published via Azure Service Bus.

  3. Cross-server sync: Other servers receive the invalidation message and delete only the affected users' cache entries. The next permission check for those users will reload from DB.

  4. Safety TTL: Cache entries expire after a configurable TTL (default 5 minutes) as a fallback, even without explicit invalidation.

Quick Start

1. Basic Setup

from abs_auth_rbac_core.auth.jwt_functions import JWTFunctions
from abs_auth_rbac_core.rbac import RBACService, CacheInvalidationNotifierConfig
import os

# Initialize JWT functions
jwt_functions = JWTFunctions(
    secret_key=os.getenv("JWT_SECRET_KEY"),
    algorithm=os.getenv("JWT_ALGORITHM", "HS256"),
    expire_minutes=int(os.getenv("JWT_EXPIRE_MINUTES", "60"))
)

# Initialize RBAC service
rbac_service = RBACService(
    session=your_db_session,
    servicebus_config=CacheInvalidationNotifierConfig(
        connection_string=os.getenv("AZURE_SERVICEBUS_CONNECTION_STRING"),
        topic_name="rbac-cache-invalidation",
    )
)

2. Authentication Setup

from abs_auth_rbac_core.auth.middleware import auth_middleware
from fastapi import FastAPI, Depends

app = FastAPI()

# Create authentication middleware
auth_middleware = auth_middleware(
    db_session=your_db_session,
    jwt_secret_key=os.getenv("JWT_SECRET_KEY"),
    jwt_algorithm=os.getenv("JWT_ALGORITHM", "HS256")
)

# Apply to specific routers
app.include_router(
    protected_router,
    dependencies=[Depends(auth_middleware)]
)

# Public routes (no middleware)
app.include_router(public_router)

How it works:

  1. Validates JWT token from Authorization header
  2. Extracts user UUID from token payload
  3. Fetches user from database using UUID
  4. Sets user object in request.state.user
  5. Returns user object for route handlers

Accessing the user in routes:

@router.get("/profile")
async def get_profile(request: Request):
    user = request.state.user
    return {"user_id": user.uuid, "email": user.email}

3. RBAC Operations

# Create a role with permissions
role = rbac_service.create_role(
    name="admin",
    description="Administrator role with full access",
    permission_ids=["permission_uuid1", "permission_uuid2"]
)

# Assign roles to user
rbac_service.bulk_assign_roles_to_user(
    user_uuid="user_uuid",
    role_uuids=["role_uuid1", "role_uuid2"]
)

# Check user permissions (cache-first, sub-ms on hit)
has_permission = rbac_service.check_permission(
    user_uuid="user_uuid",
    resource="USER_MANAGEMENT",
    action="VIEW",
    module="USER_MANAGEMENT"
)

# Get user permissions and roles
user_permissions = rbac_service.get_user_permissions(user_uuid="user_uuid")
user_roles = rbac_service.get_user_roles(user_uuid="user_uuid")

Core Components

Authentication (auth/)

  • jwt_functions.py: JWT token management and password hashing
  • middleware.py: Authentication middleware for FastAPI
  • auth_functions.py: Core authentication functions

RBAC (rbac/)

  • service.py: Main RBAC service with role and permission management
  • decorator.py: Decorators for permission checking (rbac_require_permission, check_permissions)
  • cache.py: Thread-safe per-user LRU permission cache with TTL
  • notifier.py: Azure Service Bus cache invalidation notifier
  • types.py: Type definitions for permission system

Models (models/)

  • user.py: User model
  • roles.py: Role model
  • permissions.py: Permission model
  • user_role.py: User-Role association model
  • role_permission.py: Role-Permission association model
  • user_permission.py: User-Permission association model
  • rbac_model.py: Base RBAC model
  • base_model.py: Base model with common fields

Schema (schema/)

  • permission.py: Permission-related schemas

Utilities (util/)

  • permission_constants.py: Predefined permission constants and enums
  • get_permissions_resources.py: Resource ID extraction from permissions

Dependency Injection Setup

from dependency_injector import containers, providers
from abs_auth_rbac_core.auth.middleware import auth_middleware
from abs_auth_rbac_core.rbac import RBACService, CacheInvalidationNotifierConfig
from abs_auth_rbac_core.util.permission_constants import (
    PermissionAction,
    PermissionModule,
    PermissionResource
)

class Container(containers.DeclarativeContainer):
    wiring_config = containers.WiringConfiguration(
        modules=[
            "src.api.auth_route",
            "src.api.endpoints.rbac.permission_route",
            "src.api.endpoints.rbac.role_route",
            "src.api.endpoints.rbac.users_route",
        ]
    )

    db_session = providers.Factory(your_db_session_factory)

    rbac_service = providers.Singleton(
        RBACService,
        session=db_session,
        servicebus_config=CacheInvalidationNotifierConfig(
            connection_string=os.getenv("AZURE_SERVICEBUS_CONNECTION_STRING"),
            topic_name="rbac-cache-invalidation",
        )
    )

    get_auth_middleware = providers.Factory(
        auth_middleware,
        db_session=db_session,
        jwt_secret_key=os.getenv("JWT_SECRET_KEY"),
        jwt_algorithm=os.getenv("JWT_ALGORITHM", "HS256")
    )

Route Implementation with Permissions

from fastapi import APIRouter, Depends, Request
from dependency_injector.wiring import Provide, inject
from abs_auth_rbac_core.rbac import rbac_require_permission, check_permissions
from abs_auth_rbac_core.util.permission_constants import (
    PermissionAction,
    PermissionModule,
    PermissionResource
)

router = APIRouter(prefix="/users")

# Simple permission check (all listed permissions required)
@router.get("/{user_id}", response_model=UserProfile)
@inject
@rbac_require_permission(
    f"{PermissionModule.USER_MANAGEMENT.value}:{PermissionResource.USER_MANAGEMENT.value}:{PermissionAction.VIEW.value}"
)
async def get_user(
    user_id: int,
    request: Request,
    service: UserService = Depends(Provide[Container.user_service]),
    rbac_service: RBACService = Depends(Provide[Container.rbac_service]),
):
    return service.get_user_profile("id", user_id, rbac_service)

# Advanced permission check with AND/OR logic, placeholders, pre/post callbacks
@router.put("/{entity_id}")
@inject
@check_permissions(
    permissions=[
        ("MODULE", "resource", "edit"),
        ("MODULE", "resource", "admin"),
    ],
    operator="OR",
)
async def update_entity(
    entity_id: str,
    request: Request,
    rbac_service: RBACService = Depends(Provide[Container.rbac_service]),
):
    ...

Permission System

Permission Format

Permissions follow the format: module:resource:action

  • Module: The system module (e.g., USER_MANAGEMENT, EMAIL_PROCESS)
  • Resource: The specific resource within the module (e.g., USER_MANAGEMENT, ROLE_MANAGEMENT)
  • Action: The action being performed (e.g., VIEW, CREATE, EDIT, DELETE)

Using Permission Constants

from abs_auth_rbac_core.util.permission_constants import (
    PermissionAction,
    PermissionModule,
    PermissionResource,
    PermissionConstants
)

# Using enums
permission_string = f"{PermissionModule.USER_MANAGEMENT.value}:{PermissionResource.USER_MANAGEMENT.value}:{PermissionAction.VIEW.value}"

# Using predefined constants
user_view_permission = PermissionConstants.RBAC_USER_MANAGEMENT_VIEW
permission_string = f"{user_view_permission.module}:{user_view_permission.resource}:{user_view_permission.action}"

Configuration

Environment Variables

# JWT Configuration
JWT_SECRET_KEY=your-secret-key-here
JWT_ALGORITHM=HS256
JWT_ACCESS_TOKEN_EXPIRE_MINUTES=60
JWT_REFRESH_TOKEN_EXPIRE_MINUTES=1440

# Azure Service Bus (for cross-server cache invalidation)
AZURE_SERVICEBUS_CONNECTION_STRING=Endpoint=sb://your-ns.servicebus.windows.net/;...
# Or use managed identity:
# AZURE_SERVICEBUS_NAMESPACE=your-ns.servicebus.windows.net

# Instance ID (auto-detected from HOSTNAME if not set)
RBAC_INSTANCE_ID=server-1

# Database Configuration
DATABASE_URI=postgresql://user:password@localhost/dbname

Cache Configuration

rbac_service = RBACService(
    session=db_session,
    cache_ttl=300,        # Cache entry TTL in seconds (default: 300 = 5 min)
    cache_max_size=10000, # Max cached users before LRU eviction (default: 10,000)
)

Error Handling

from abs_exception_core.exceptions import (
    UnauthorizedError,
    PermissionDeniedError,
    ValidationError,
    DuplicatedError,
    NotFoundError
)

Health Checks

@app.get("/health")
async def health_check():
    return {
        "status": "healthy",
        "rbac_notifier": rbac_service.is_watcher_active(),
        "rbac_notifier_details": rbac_service.get_watcher_status(),
    }

Troubleshooting

Common Issues

Issue Solution
Authentication Fails Check JWT secret key, token expiration, user existence
Permission Denied Verify user roles, role-permission assignments, permission format
Stale permissions after update Check Service Bus connection, or wait for cache TTL expiry
Cache invalidation not working Verify AZURE_SERVICEBUS_CONNECTION_STRING, topic exists, subscription auto-created
High memory usage Reduce cache_max_size or lower cache_ttl

Debug Mode

import logging
logging.basicConfig(level=logging.DEBUG)

# Check notifier status
print(f"Notifier active: {rbac_service.is_watcher_active()}")
print(f"Status: {rbac_service.get_watcher_status()}")

API Reference

RBACService Methods

Method Description
check_permission(user_uuid, resource, action, module) Check role-based permission (cache-first)
check_permission_by_user(user_uuid, resource, action, module) Check direct user permission (cache-first)
check_permission_by_role(role_uuid, resource, action, module) Check role permission (direct SQL)
create_role(name, description, permission_ids) Create a new role
update_role_permissions(role_uuid, permissions, name, description) Update role permissions
delete_role(role_uuid, exception_roles) Delete a role
bulk_assign_roles_to_user(user_uuid, role_uuids) Assign multiple roles to user
bulk_revoke_roles_from_user(user_uuid, role_uuids) Revoke roles from user
bulk_attach_roles_to_user(user_uuid, role_uuids) Attach additional roles to user
attach_permissions_to_user(user_uuid, permission_uuids) Attach direct permissions
revoke_user_permissions(user_uuid, permission_uuids) Revoke direct permissions
revoke_all_user_access(user_uuid) Revoke all roles and permissions
get_user_permissions(user_uuid) Get role-based permissions
get_user_only_permissions(user_uuid) Get direct user permissions
get_user_roles(user_uuid) Get user roles
list_roles() List all roles
list_permissions() List all permissions
is_watcher_active() Check notifier status
get_watcher_status() Get notifier details

CacheInvalidationNotifierConfig

Field Type Default Description
connection_string str | None None Service Bus connection string
fully_qualified_namespace str | None None For managed identity auth
topic_name str "rbac-cache-invalidation" Service Bus topic name
subscription_name str | None Auto from RBAC_INSTANCE_ID Subscription name
max_wait_time_seconds int 5 Receiver poll interval
message_ttl_seconds int 300 Message time-to-live
auto_create_subscription bool True Auto-create subscription

JWT Functions

Method Description
create_access_token(data) Create JWT access token
decode_jwt(token) Decode and validate JWT
get_password_hash(password) Hash password with bcrypt
verify_password(password, hashed) Verify password hash

Dependencies

Core Dependencies

  • pyjwt (>=2.10.1,<3.0.0): JWT token handling
  • fastapi[standard] (>=0.115.2): Web framework
  • passlib (>=1.7.4,<2.0.0): Password hashing
  • sqlalchemy (>=2.0.40,<3.0.0): Database ORM
  • azure-servicebus (>=7.14.3,<8.0.0): Cross-server cache invalidation
  • azure-identity (>=1.25.2,<2.0.0): Azure authentication

Internal Dependencies

  • abs-exception-core (>=0.2.2,<0.3.0): Exception handling
  • abs-utils (>=0.4.3,<0.5.0): Logger and utilities
  • abs-repository-core (>=0.3.0,<0.4.0): Base repository
  • abs-nosql-repository-core (>=0.11.8,<0.12.0): NoSQL repository
  • psycopg2-binary (>=2.9.10,<3.0.0): PostgreSQL adapter

License

This project is licensed under the MIT License - see the LICENSE file for details.


Version: 0.6.0 Last Updated: 2026 Python Version: >=3.11,<4.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

abs_auth_rbac_core-0.6.1.tar.gz (46.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

abs_auth_rbac_core-0.6.1-py3-none-any.whl (52.1 kB view details)

Uploaded Python 3

File details

Details for the file abs_auth_rbac_core-0.6.1.tar.gz.

File metadata

  • Download URL: abs_auth_rbac_core-0.6.1.tar.gz
  • Upload date:
  • Size: 46.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.2 CPython/3.13.12 Darwin/23.6.0

File hashes

Hashes for abs_auth_rbac_core-0.6.1.tar.gz
Algorithm Hash digest
SHA256 9e7c30bd777133787d923801ef8144be4a44ef6ef903b98e5b8e89f1a69d5c00
MD5 5d7e37b58279271e54a185d4ccf0765c
BLAKE2b-256 4742db4c7cf63be21fca0b37d87999feaa0b30dcac9acd7a482a65c6cc7a5043

See more details on using hashes here.

File details

Details for the file abs_auth_rbac_core-0.6.1-py3-none-any.whl.

File metadata

  • Download URL: abs_auth_rbac_core-0.6.1-py3-none-any.whl
  • Upload date:
  • Size: 52.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.2 CPython/3.13.12 Darwin/23.6.0

File hashes

Hashes for abs_auth_rbac_core-0.6.1-py3-none-any.whl
Algorithm Hash digest
SHA256 ba1985e6faf3f17cf2fea04f081dfac4edee1d48ebc98aa330bfc84964b09d96
MD5 6feb2a029b67fe8e6605d36333cbabe7
BLAKE2b-256 d39729a55f00a2c40df6f7f62fa0ff77fb162990ca7257631566e055d5b19da1

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page