Skip to main content

RBAC and Auth core utilities including JWT token management.

Project description

ABS Auth RBAC Core

A comprehensive authentication and Role-Based Access Control (RBAC) package for FastAPI applications. This package provides robust JWT-based authentication and flexible role-based permission management with an in-memory per-user permission cache and Azure Service Bus for cross-server cache invalidation.

What's New in v0.6.0

Casbin has been removed. The RBAC system now uses a per-user in-memory permission cache with targeted Azure Service Bus invalidation. This eliminates the 3-4 second full policy reload that caused 403 errors during permission updates.

Key Changes

Before (Casbin) After (v0.6.0)
On permission change Reloads ALL policies (full table scan) Invalidates ONLY affected user's cache entry
Reload time 3-4 seconds ~200-500ms
Memory model Monolithic store of every policy Per-user granular cache (loaded on demand)
Read path In-memory lookup (stale for 3-4s during reload) In-memory lookup (stale for max ~500ms)
Dependencies removed - casbin, casbin-sqlalchemy-adapter, casbin-redis-watcher

Migration Guide

Constructor — The RBACService constructor now accepts **kwargs for backward compatibility. Old redis_config or servicebus_config (for the Casbin watcher) will be silently accepted. To enable cross-server cache invalidation, pass the new CacheInvalidationNotifierConfig:

from abs_auth_rbac_core.rbac import RBACService, CacheInvalidationNotifierConfig

# Single-server mode (cache-only, no cross-server sync)
rbac_service = RBACService(session=db_session)

# Multi-server mode (with Azure Service Bus invalidation)
rbac_service = RBACService(
    session=db_session,
    servicebus_config=CacheInvalidationNotifierConfig(
        connection_string="Endpoint=sb://your-ns.servicebus.windows.net/;...",
        topic_name="rbac-cache-invalidation",
    ),
    cache_ttl=300,       # 5 min default
    cache_max_size=10000, # LRU eviction cap
)

Removed methods — The following Casbin-specific methods no longer exist:

  • add_policy(), add_policies(), remove_policy(), remove_policies()
  • enforce_policy(), remove_filter_policy()
  • reload_policies(), get_policy_count(), clear_all_policies()

Removed exportsAzureServiceBusWatcherSchema is replaced by CacheInvalidationNotifierConfig.

Removed fileswatcher.py, policy.conf, gov_casbin_rule.py are deleted.

Unchanged — All decorators (rbac_require_permission, check_permissions), auth middleware, JWT functions, models, schemas, and permission constants work exactly as before.

Features

  • JWT-based Authentication: Secure token-based authentication with customizable expiration
  • Password Security: Secure password storage using bcrypt with passlib
  • Role-Based Access Control (RBAC): Flexible permission management with per-user caching
  • Real-time Cache Invalidation: Azure Service Bus for targeted cross-server sync
  • User-Role Management: Dynamic role assignment and revocation
  • Permission Enforcement: Decorator-based permission checking with nested AND/OR logic
  • Middleware Integration: Seamless FastAPI middleware integration
  • Comprehensive Error Handling: Built-in exception handling for security scenarios
  • Dependency Injection Ready: Compatible with dependency-injector
  • Permission Constants: Predefined permission constants and enums

Installation

pip install abs-auth-rbac-core

Architecture Overview

┌─────────────────────────────────────────────────────┐
│                    Server A                          │
│  ┌──────────────┐    ┌──────────────────────────┐   │
│  │ Permission   │<───│ RBACService              │   │
│  │ Cache        │    │  - check_permission()    │   │
│  │ (per-user)   │    │  - create_role()         │   │
│  └──────────────┘    │  - assign_permissions()  │   │
│         ^            └─────────┬────────────────┘   │
│         │ invalidate           │ publish             │
│         │                      v                     │
│  ┌──────┴───────────────────────────────────────┐   │
│  │ CacheInvalidationNotifier (Service Bus)      │   │
│  └──────────────────────────────────────────────┘   │
└─────────────────────┬───────────────────────────────┘
                      │ Azure Service Bus Topic
                      │ (targeted invalidation messages)
┌─────────────────────v───────────────────────────────┐
│                    Server B                          │
│  ┌──────────────┐    ┌──────────────────────────┐   │
│  │ Permission   │<───│ CacheInvalidationNotifier│   │
│  │ Cache        │    │ (receives & invalidates) │   │
│  │ (per-user)   │    └──────────────────────────┘   │
│  └──────────────┘                                    │
└─────────────────────────────────────────────────────┘

How It Works

  1. Read path: check_permission() looks up the user's cache entry. On hit, it's a sub-millisecond set lookup. On miss, it runs a single SQL query to load all role + direct permissions for that user, caches the result, then checks.

  2. Write path: After any permission/role change (e.g., update_role_permissions()), the local cache is invalidated for affected users. If a notifier is configured, a targeted invalidation message is published via Azure Service Bus.

  3. Cross-server sync: Other servers receive the invalidation message and delete only the affected users' cache entries. The next permission check for those users will reload from DB.

  4. Safety TTL: Cache entries expire after a configurable TTL (default 5 minutes) as a fallback, even without explicit invalidation.

Quick Start

1. Basic Setup

from abs_auth_rbac_core.auth.jwt_functions import JWTFunctions
from abs_auth_rbac_core.rbac import RBACService, CacheInvalidationNotifierConfig
import os

# Initialize JWT functions
jwt_functions = JWTFunctions(
    secret_key=os.getenv("JWT_SECRET_KEY"),
    algorithm=os.getenv("JWT_ALGORITHM", "HS256"),
    expire_minutes=int(os.getenv("JWT_EXPIRE_MINUTES", "60"))
)

# Initialize RBAC service
rbac_service = RBACService(
    session=your_db_session,
    servicebus_config=CacheInvalidationNotifierConfig(
        connection_string=os.getenv("AZURE_SERVICEBUS_CONNECTION_STRING"),
        topic_name="rbac-cache-invalidation",
    )
)

2. Authentication Setup

from abs_auth_rbac_core.auth.middleware import auth_middleware
from fastapi import FastAPI, Depends

app = FastAPI()

# Create authentication middleware
auth_middleware = auth_middleware(
    db_session=your_db_session,
    jwt_secret_key=os.getenv("JWT_SECRET_KEY"),
    jwt_algorithm=os.getenv("JWT_ALGORITHM", "HS256")
)

# Apply to specific routers
app.include_router(
    protected_router,
    dependencies=[Depends(auth_middleware)]
)

# Public routes (no middleware)
app.include_router(public_router)

How it works:

  1. Validates JWT token from Authorization header
  2. Extracts user UUID from token payload
  3. Fetches user from database using UUID
  4. Sets user object in request.state.user
  5. Returns user object for route handlers

Accessing the user in routes:

@router.get("/profile")
async def get_profile(request: Request):
    user = request.state.user
    return {"user_id": user.uuid, "email": user.email}

3. RBAC Operations

# Create a role with permissions
role = rbac_service.create_role(
    name="admin",
    description="Administrator role with full access",
    permission_ids=["permission_uuid1", "permission_uuid2"]
)

# Assign roles to user
rbac_service.bulk_assign_roles_to_user(
    user_uuid="user_uuid",
    role_uuids=["role_uuid1", "role_uuid2"]
)

# Check user permissions (cache-first, sub-ms on hit)
has_permission = rbac_service.check_permission(
    user_uuid="user_uuid",
    resource="USER_MANAGEMENT",
    action="VIEW",
    module="USER_MANAGEMENT"
)

# Get user permissions and roles
user_permissions = rbac_service.get_user_permissions(user_uuid="user_uuid")
user_roles = rbac_service.get_user_roles(user_uuid="user_uuid")

Core Components

Authentication (auth/)

  • jwt_functions.py: JWT token management and password hashing
  • middleware.py: Authentication middleware for FastAPI
  • auth_functions.py: Core authentication functions

RBAC (rbac/)

  • service.py: Main RBAC service with role and permission management
  • decorator.py: Decorators for permission checking (rbac_require_permission, check_permissions)
  • cache.py: Thread-safe per-user LRU permission cache with TTL
  • notifier.py: Azure Service Bus cache invalidation notifier
  • types.py: Type definitions for permission system

Models (models/)

  • user.py: User model
  • roles.py: Role model
  • permissions.py: Permission model
  • user_role.py: User-Role association model
  • role_permission.py: Role-Permission association model
  • user_permission.py: User-Permission association model
  • rbac_model.py: Base RBAC model
  • base_model.py: Base model with common fields

Schema (schema/)

  • permission.py: Permission-related schemas

Utilities (util/)

  • permission_constants.py: Predefined permission constants and enums
  • get_permissions_resources.py: Resource ID extraction from permissions

Dependency Injection Setup

from dependency_injector import containers, providers
from abs_auth_rbac_core.auth.middleware import auth_middleware
from abs_auth_rbac_core.rbac import RBACService, CacheInvalidationNotifierConfig
from abs_auth_rbac_core.util.permission_constants import (
    PermissionAction,
    PermissionModule,
    PermissionResource
)

class Container(containers.DeclarativeContainer):
    wiring_config = containers.WiringConfiguration(
        modules=[
            "src.api.auth_route",
            "src.api.endpoints.rbac.permission_route",
            "src.api.endpoints.rbac.role_route",
            "src.api.endpoints.rbac.users_route",
        ]
    )

    db_session = providers.Factory(your_db_session_factory)

    rbac_service = providers.Singleton(
        RBACService,
        session=db_session,
        servicebus_config=CacheInvalidationNotifierConfig(
            connection_string=os.getenv("AZURE_SERVICEBUS_CONNECTION_STRING"),
            topic_name="rbac-cache-invalidation",
        )
    )

    get_auth_middleware = providers.Factory(
        auth_middleware,
        db_session=db_session,
        jwt_secret_key=os.getenv("JWT_SECRET_KEY"),
        jwt_algorithm=os.getenv("JWT_ALGORITHM", "HS256")
    )

Route Implementation with Permissions

from fastapi import APIRouter, Depends, Request
from dependency_injector.wiring import Provide, inject
from abs_auth_rbac_core.rbac import rbac_require_permission, check_permissions
from abs_auth_rbac_core.util.permission_constants import (
    PermissionAction,
    PermissionModule,
    PermissionResource
)

router = APIRouter(prefix="/users")

# Simple permission check (all listed permissions required)
@router.get("/{user_id}", response_model=UserProfile)
@inject
@rbac_require_permission(
    f"{PermissionModule.USER_MANAGEMENT.value}:{PermissionResource.USER_MANAGEMENT.value}:{PermissionAction.VIEW.value}"
)
async def get_user(
    user_id: int,
    request: Request,
    service: UserService = Depends(Provide[Container.user_service]),
    rbac_service: RBACService = Depends(Provide[Container.rbac_service]),
):
    return service.get_user_profile("id", user_id, rbac_service)

# Advanced permission check with AND/OR logic, placeholders, pre/post callbacks
@router.put("/{entity_id}")
@inject
@check_permissions(
    permissions=[
        ("MODULE", "resource", "edit"),
        ("MODULE", "resource", "admin"),
    ],
    operator="OR",
)
async def update_entity(
    entity_id: str,
    request: Request,
    rbac_service: RBACService = Depends(Provide[Container.rbac_service]),
):
    ...

Permission System

Permission Format

Permissions follow the format: module:resource:action

  • Module: The system module (e.g., USER_MANAGEMENT, EMAIL_PROCESS)
  • Resource: The specific resource within the module (e.g., USER_MANAGEMENT, ROLE_MANAGEMENT)
  • Action: The action being performed (e.g., VIEW, CREATE, EDIT, DELETE)

Using Permission Constants

from abs_auth_rbac_core.util.permission_constants import (
    PermissionAction,
    PermissionModule,
    PermissionResource,
    PermissionConstants
)

# Using enums
permission_string = f"{PermissionModule.USER_MANAGEMENT.value}:{PermissionResource.USER_MANAGEMENT.value}:{PermissionAction.VIEW.value}"

# Using predefined constants
user_view_permission = PermissionConstants.RBAC_USER_MANAGEMENT_VIEW
permission_string = f"{user_view_permission.module}:{user_view_permission.resource}:{user_view_permission.action}"

Configuration

Environment Variables

# JWT Configuration
JWT_SECRET_KEY=your-secret-key-here
JWT_ALGORITHM=HS256
JWT_ACCESS_TOKEN_EXPIRE_MINUTES=60
JWT_REFRESH_TOKEN_EXPIRE_MINUTES=1440

# Azure Service Bus (for cross-server cache invalidation)
AZURE_SERVICEBUS_CONNECTION_STRING=Endpoint=sb://your-ns.servicebus.windows.net/;...
# Or use managed identity:
# AZURE_SERVICEBUS_NAMESPACE=your-ns.servicebus.windows.net

# Instance ID (auto-detected from HOSTNAME if not set)
RBAC_INSTANCE_ID=server-1

# Database Configuration
DATABASE_URI=postgresql://user:password@localhost/dbname

Cache Configuration

rbac_service = RBACService(
    session=db_session,
    cache_ttl=300,        # Cache entry TTL in seconds (default: 300 = 5 min)
    cache_max_size=10000, # Max cached users before LRU eviction (default: 10,000)
)

Error Handling

from abs_exception_core.exceptions import (
    UnauthorizedError,
    PermissionDeniedError,
    ValidationError,
    DuplicatedError,
    NotFoundError
)

Health Checks

@app.get("/health")
async def health_check():
    return {
        "status": "healthy",
        "rbac_notifier": rbac_service.is_watcher_active(),
        "rbac_notifier_details": rbac_service.get_watcher_status(),
    }

Troubleshooting

Common Issues

Issue Solution
Authentication Fails Check JWT secret key, token expiration, user existence
Permission Denied Verify user roles, role-permission assignments, permission format
Stale permissions after update Check Service Bus connection, or wait for cache TTL expiry
Cache invalidation not working Verify AZURE_SERVICEBUS_CONNECTION_STRING, topic exists, subscription auto-created
High memory usage Reduce cache_max_size or lower cache_ttl

Debug Mode

import logging
logging.basicConfig(level=logging.DEBUG)

# Check notifier status
print(f"Notifier active: {rbac_service.is_watcher_active()}")
print(f"Status: {rbac_service.get_watcher_status()}")

API Reference

RBACService Methods

Method Description
check_permission(user_uuid, resource, action, module) Check role-based permission (cache-first)
check_permission_by_user(user_uuid, resource, action, module) Check direct user permission (cache-first)
check_permission_by_role(role_uuid, resource, action, module) Check role permission (direct SQL)
create_role(name, description, permission_ids) Create a new role
update_role_permissions(role_uuid, permissions, name, description) Update role permissions
delete_role(role_uuid, exception_roles) Delete a role
bulk_assign_roles_to_user(user_uuid, role_uuids) Assign multiple roles to user
bulk_revoke_roles_from_user(user_uuid, role_uuids) Revoke roles from user
bulk_attach_roles_to_user(user_uuid, role_uuids) Attach additional roles to user
attach_permissions_to_user(user_uuid, permission_uuids) Attach direct permissions
revoke_user_permissions(user_uuid, permission_uuids) Revoke direct permissions
revoke_all_user_access(user_uuid) Revoke all roles and permissions
get_user_permissions(user_uuid) Get role-based permissions
get_user_only_permissions(user_uuid) Get direct user permissions
get_user_roles(user_uuid) Get user roles
list_roles() List all roles
list_permissions() List all permissions
is_watcher_active() Check notifier status
get_watcher_status() Get notifier details

CacheInvalidationNotifierConfig

Field Type Default Description
connection_string str | None None Service Bus connection string
fully_qualified_namespace str | None None For managed identity auth
topic_name str "rbac-cache-invalidation" Service Bus topic name
subscription_name str | None Auto from RBAC_INSTANCE_ID Subscription name
max_wait_time_seconds int 5 Receiver poll interval
message_ttl_seconds int 300 Message time-to-live
auto_create_subscription bool True Auto-create subscription

JWT Functions

Method Description
create_access_token(data) Create JWT access token
decode_jwt(token) Decode and validate JWT
get_password_hash(password) Hash password with bcrypt
verify_password(password, hashed) Verify password hash

Dependencies

Core Dependencies

  • pyjwt (>=2.10.1,<3.0.0): JWT token handling
  • fastapi[standard] (>=0.115.2): Web framework
  • passlib (>=1.7.4,<2.0.0): Password hashing
  • sqlalchemy (>=2.0.40,<3.0.0): Database ORM
  • azure-servicebus (>=7.14.3,<8.0.0): Cross-server cache invalidation
  • azure-identity (>=1.25.2,<2.0.0): Azure authentication

Internal Dependencies

  • abs-exception-core (>=0.2.2,<0.3.0): Exception handling
  • abs-utils (>=0.4.3,<0.5.0): Logger and utilities
  • abs-repository-core (>=0.3.0,<0.4.0): Base repository
  • abs-nosql-repository-core (>=0.11.8,<0.12.0): NoSQL repository
  • psycopg2-binary (>=2.9.10,<3.0.0): PostgreSQL adapter

License

This project is licensed under the MIT License - see the LICENSE file for details.


Version: 0.6.0 Last Updated: 2026 Python Version: >=3.11,<4.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

abs_auth_rbac_core-0.6.0.tar.gz (45.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

abs_auth_rbac_core-0.6.0-py3-none-any.whl (50.3 kB view details)

Uploaded Python 3

File details

Details for the file abs_auth_rbac_core-0.6.0.tar.gz.

File metadata

  • Download URL: abs_auth_rbac_core-0.6.0.tar.gz
  • Upload date:
  • Size: 45.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.2 CPython/3.13.12 Darwin/23.6.0

File hashes

Hashes for abs_auth_rbac_core-0.6.0.tar.gz
Algorithm Hash digest
SHA256 b12a02d9a8166d06f059c95f1d26e3ee7eeb54ca3212775cb2b0958f4f662b88
MD5 6f8a2860d54e5172293febceb0f503a8
BLAKE2b-256 9af83e4f94c295faa192712d089c42fde2a38682f39e1456d5b290bedddf4c1c

See more details on using hashes here.

File details

Details for the file abs_auth_rbac_core-0.6.0-py3-none-any.whl.

File metadata

  • Download URL: abs_auth_rbac_core-0.6.0-py3-none-any.whl
  • Upload date:
  • Size: 50.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.2 CPython/3.13.12 Darwin/23.6.0

File hashes

Hashes for abs_auth_rbac_core-0.6.0-py3-none-any.whl
Algorithm Hash digest
SHA256 cf2a303426a21276a1d30425de467de99e370b73cf07804305ef2ec85dfb36ed
MD5 2bc34caded23d4fbc288b90210a3485a
BLAKE2b-256 1b010aec517bafa0bd0d753da9c9030352d7a7c073668972907eb4188a799856

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page