Skip to main content

Role-Based Access Control for AI Agents with ML-Powered Anomaly Detection

Project description

AgentRBAC

Role-Based Access Control for AI Agents

Python 3.10+ License: MIT Tests Version

A drop-in Python library that gates every agent action — tool calls, memory access, sub-agent spawning — through a layered RBAC policy engine with behavioral ML scoring.

InstallationQuick StartUser GuideAPI ReferenceExamples


What is AgentRBAC?

AgentRBAC provides fine-grained access control for AI agent systems. It sits between your agents and their tools/resources, enforcing permissions at every step.

Agent request → Behavior tracking → ML risk scoring → Policy evaluation → Capability check → ALLOW / DENY

Key features:

  • Role hierarchy with wildcard capabilities and explicit deny rules
  • Registration middleware (MW1–MW5) for secure agent onboarding
  • Delegation tokens — scoped, time-bound, HMAC-signed capability delegation
  • ML anomaly detection — Isolation Forest behavioral scoring with heuristic fallback
  • Multi-agent orchestration with cumulative risk tracking
  • Append-only audit log with pluggable backends
  • Framework integrations — LangChain, CrewAI, AutoGen, or any custom framework
  • Zero required dependencies — ML and framework extras are opt-in

Installation

From PyPI (when published)

pip install agent-rbac

From GitHub

pip install git+https://github.com/Parikshit-jadhav-ai/agent-rbac.git

From source

git clone https://github.com/Parikshit-jadhav-ai/agent-rbac.git
cd agent-rbac
pip install -e .

Optional extras

Extra Install command What it adds
ml pip install "agent-rbac[ml]" scikit-learn, sentence-transformers — full ML risk scoring
langchain pip install "agent-rbac[langchain]" LangChain tool wrapper
crewai pip install "agent-rbac[crewai]" CrewAI task interceptor
autogen pip install "agent-rbac[autogen]" AutoGen agent wrapper
observability pip install "agent-rbac[observability]" Prometheus + OpenTelemetry
data pip install "agent-rbac[data]" Dask for distributed data pipelines
all pip install "agent-rbac[all]" Everything above
dev pip install "agent-rbac[dev]" pytest + coverage

Quick Start

import agent_rbac as rbac_lib
from agent_rbac import AgentRBAC, AgentIdentity

# 1. Create the RBAC engine
rbac = AgentRBAC()

# 2. Define roles
rbac.define_role("viewer", capabilities=["tool:*:read"])
rbac.define_role("agent", parent="viewer",
                 capabilities=["memory:*:read", "memory:*:write"])

# 3. Create an agent identity
agent = AgentIdentity(agent_id="my_bot", roles=["agent"])

# 4. Check access
result = rbac.check(agent, "tool:web_search", "read")
print(result.decision)   # Decision.ALLOW
print(result.allowed)    # True
print(result.risk_score) # 0.0

Using the decorator

@rbac.enforce(resource="tool:web_search", action="read")
def web_search(query: str) -> str:
    return f"results for {query}"

result = web_search("AI news", _principal=agent)  # passes RBAC check

Registering agents (v2 middleware)

from agent_rbac import AgentRBAC, AgentType

rbac = AgentRBAC()

agent = rbac.register(
    name="DataProcessor",
    description="Cleans tabular datasets",
    role="data_engineer",
    intent="Process datasets: clean, aggregate, summarise",
    type=AgentType.DATA_AGENT,
    version="1.0.0",
    owner_id="user_123",
    endpoint="http://agents.internal/data-processor",
)

rbac.check(agent, "data", "write").allowed  # True
rbac.check(agent, "model", "train").allowed # False

User Guide

Core Concepts

Concept Description
Principal Anything that makes a request — a user, an AI agent, or a sub-agent
Role Named permission set in a DAG hierarchy. Children inherit parent capabilities
Capability Permission string: resource_type:resource_name:action with wildcard support
Context Runtime state passed to policies: environment, task, timestamps
Decision Outcome: ALLOW, DENY, DELEGATE, or CONDITIONAL

Capability Format

tool:web_search:read    # specific tool, specific action
tool:*:read             # all tools, read only
memory:*:*              # all memory, all actions
agent:spawn:*           # spawn any sub-agent type

Defining Roles

Roles form a hierarchy. Child roles inherit all parent capabilities. Explicit deny rules always override allows.

rbac.define_role("viewer", capabilities=[
    "tool:web_search:read",
    "tool:calculator:read",
])

rbac.define_role("research_agent", parent="viewer",
    capabilities=["tool:arxiv:read", "memory:notes:write"])

rbac.define_role("restricted",
    capabilities=["tool:web_search:read"],
    deny=["tool:payment:*", "memory:credentials:*"])

rbac.define_role("admin",
    capabilities=["tool:*:*", "memory:*:*", "agent:spawn:*"])

Built-in Role Hierarchy

viewer
  ├── developer        (+ agent:create, agent:update, agent:deploy, logs:read)
  ├── data_engineer    (+ data:read, data:write, model:infer)
  └── agent            (+ agent:invoke, data:read, model:infer)
       └── senior_agent (+ data:write, agent:spawn, memory:read)
            └── orchestrator (+ agent:spawn:any, graph:route, pipeline:manage)
                 └── admin   (+ agent:delete, rbac:manage, audit:read, model:train)

Checking Access

# Full result object
result = rbac.check(agent, "tool:web_search", "read")
result.decision    # Decision.ALLOW
result.allowed     # True
result.reason      # ""
result.risk_score  # 0.0

# Boolean shorthand
rbac.is_allowed(agent, "tool:web_search", "read")  # True

# Exception-raising variant
from agent_rbac import PermissionDenied
try:
    rbac.check_or_raise(agent, "model", "train")
except PermissionDenied as e:
    print(e.reason)  # "no_matching_capability"

Context-Aware Policies

Policies are callables that fire after role resolution. Return a Decision to override, or None to abstain.

from agent_rbac import Decision, Context

def sandbox_guard(principal, resource, action, ctx):
    if ctx.environment == "sandbox" and action == "write":
        return Decision.DENY
    return None

rbac.add_policy("sandbox_guard", sandbox_guard, priority=10)

result = rbac.check(agent, "memory:data", "write",
                    context=Context(environment="sandbox"))
# result.decision == Decision.DENY
Context Field Type Description
environment str "production" / "staging" / "sandbox"
task_id str Current task identifier
task_type str Category of the current task
agent_depth int Depth in the agent spawn chain
timestamp float When the request was made
conversation_hash str Hash of conversation history
extra dict Additional key-value pairs

Delegation — Sub-Agent Tokens

A parent agent can delegate a subset of its capabilities with time-bound, HMAC-signed tokens.

token = parent.delegate(
    to="summarizer_001",
    capabilities=["tool:web_search:read"],
    roles=["viewer"],
    expires_in=300,  # seconds
)

assert parent.verify_token(token)

child = token.to_identity()
rbac.check(child, "tool:web_search", "read").allowed   # True
rbac.check(child, "memory:data", "write").allowed       # False — exceeds ceiling

ML Risk Scoring

AgentRBAC tracks each agent's behavior in a sliding window and scores anomalies from 0 (normal) to 1 (anomalous). Scoring activates after 10 events per agent.

Risk Level Score Action
Low < 0.2 Pass through
Medium 0.2 – 0.7 Flagged + logged
High > 0.7 Overridden to CONDITIONAL
# Adjust thresholds
rbac.set_risk_thresholds(low=0.15, high=0.6)

# Train on custom data
rbac.train_ml_model(samples, contamination=0.05)
rbac.save_ml_model("models/risk_model.pkl")
rbac.load_ml_model("models/risk_model.pkl")

Feature vector (6 dimensions):

Feature Description
action_rate Actions per minute in sliding window
unique_resource_ratio Distinct resources / total actions
boundary_probe_rate Ratio of resource types probed
repeat_ratio How often current resource is re-hit
time_since_last Seconds since previous action
resource_entropy Shannon entropy of resource access distribution

Multi-Agent Orchestration

orch = rbac.orchestrator(cumulative_risk_ceiling=0.85)
orch.pipeline([gateway.agent_id, processor.agent_id, model.agent_id])

result = orch.run(
    input={"dataset_path": "/data/raw/sales.csv"},
    caller=user_identity,
)

result["hops"]              # per-hop RBAC results
result["cumulative_risk"]   # accumulated risk score
result["short_circuited"]   # True if risk ceiling was hit
result["output"]            # final agent output

DataPipeline

from agent_rbac import DataPipeline

pipeline = (
    DataPipeline(source="s3://bucket/raw/sales.csv", partition_size="256MB")
    .clean(drop_nulls=True, deduplicate=True)
    .aggregate(group_by="user_id", agg={"spend": "sum"})
    .write(sink="s3://bucket/processed/", format="parquet")
)

pipeline.plan()                        # inspect steps without running
pipeline.execute(mode="local")         # run locally
pipeline.execute(mode="distributed")   # run with Dask

Audit Logging

Every decision is recorded in an append-only audit log with full context.

from agent_rbac import AgentRBAC, FileBackend, StdoutBackend, MemoryBackend

# File backend
rbac = AgentRBAC(audit_file="rbac_audit.jsonl")

# In-memory (testing)
mem = MemoryBackend()
rbac.audit.add_backend(mem)

rbac.check(agent, "tool:web_search", "read")
print(mem.records[-1])
# {'timestamp': ..., 'event': 'access_check', 'agent_id': 'bot_001',
#  'decision': 'ALLOW', 'risk_score': 0.0, ...}

# Custom events
rbac.audit.log_event("agent_spawned", {"child_id": "sub_001"})

Custom Backend

class MyBackend:
    def write(self, record: dict):
        my_db.insert(record)
    def flush(self):
        pass

rbac.audit.add_backend(MyBackend())

Framework Integrations

LangChain

from agent_rbac.integrations.langchain import LangChainRBACToolWrapper

wrapped = LangChainRBACToolWrapper(
    rbac=rbac, tool=search_tool,
    resource="tool:duckduckgo", action="read",
)
result = wrapped.run("query", principal=agent, context=ctx)

CrewAI

from agent_rbac.integrations.crewai import CrewAITaskInterceptor

interceptor = CrewAITaskInterceptor(rbac=rbac)
interceptor.before_task(
    agent_id="researcher",
    task_description="Find papers",
    tools_needed=["tool:arxiv:read"],
    principal=agent, context=ctx,
)

AutoGen

from agent_rbac.integrations.autogen import AutoGenRBACAgent

guarded = AutoGenRBACAgent(
    rbac=rbac, agent=my_autogen_agent, principal=agent,
    tool_resource_map={"code_exec": "tool:code", "web_search": "tool:web_search"},
)
reply = guarded.generate_reply(messages=[...])

Docker

# Build and run
docker build -t agent-rbac:latest .
docker run --rm agent-rbac:latest

# With persistent audit log
docker run --rm -v $(pwd)/logs:/app/logs \
  -e RBAC_AUDIT_FILE=/app/logs/rbac_audit.jsonl agent-rbac:latest

# docker-compose
docker-compose up                              # start demo
docker-compose run --rm rbac pytest tests/ -v  # run tests
docker-compose --profile ml up                 # with ML deps

Testing

# Install dev dependencies
pip install -e ".[dev]"

# Run tests
pytest tests/ -v

# With coverage
pytest tests/ -v --cov=agent_rbac --cov-report=term-missing

# Run examples
python examples/quickstart.py
python examples/test_agent_demo.py
python examples/delegation.py
python examples/audit_demo.py

API Reference

AgentRBAC — Main Engine

Method Returns Description
AgentRBAC(audit_file=None) AgentRBAC Create the RBAC engine
.register(**kwargs) AgentIdentity Register agent through MW1–MW5 chain
.define_role(name, capabilities, deny, parent) None Add a custom role to the hierarchy
.check(principal, resource, action, context) DecisionResult Full RBAC + ML pipeline check
.is_allowed(principal, resource, action, ctx) bool Boolean shorthand for .check()
.check_or_raise(principal, resource, action) DecisionResult Raises PermissionDenied / RiskGated
.add_policy(name, fn, priority) None Register a context-aware policy
.bind_role(entity_id, roles, expires_in) None Bind roles at runtime with optional TTL
.get_bound_roles(entity_id) list[str] Get active (non-expired) bound roles
.orchestrator(cumulative_risk_ceiling) Orchestrator Create a pipeline orchestrator
.set_risk_thresholds(low, high) None Configure ML gating levels
.train_ml_model(samples, contamination) None Train Isolation Forest model
.save_ml_model(path) None Persist trained model
.load_ml_model(path) None Load trained model
.audit_log(limit) list[dict] Retrieve recent audit records
@rbac.enforce(resource, action) decorator Gate function calls with RBAC

AgentIdentity

Method Returns Description
AgentIdentity(agent_id, roles, capability_ceiling) AgentIdentity Create a principal
.delegate(to, capabilities, roles, expires_in) DelegationToken Mint a scoped child token
.verify_token(token) bool Verify HMAC signature

DelegationToken

Method Returns Description
.to_identity() AgentIdentity Convert token to usable identity

DecisionResult

Attribute Type Description
decision Decision ALLOW, DENY, CONDITIONAL, or DELEGATE
allowed bool True only when decision is ALLOW
reason str Human-readable explanation
risk_score float ML/heuristic risk score (0–1)

Orchestrator

Method Returns Description
.pipeline(agent_ids) None Define a linear agent pipeline
.run(input, caller, context) dict Execute with RBAC enforcement at each hop

DataPipeline

Method Returns Description
DataPipeline(source, partition_size) DataPipeline Create a pipeline
.clean(drop_nulls, deduplicate) DataPipeline Add cleaning step
.aggregate(group_by, agg) DataPipeline Add aggregation step
.filter(expr) DataPipeline Add filter step
.write(sink, format) DataPipeline Add write step
.plan() list[dict] Inspect pipeline steps
.execute(mode) dict Run "local" or "distributed"

Project Structure

agent_rbac/
├── __init__.py              # Public API & exports
├── config.py                # AgentRBAC entry point
├── decorators.py            # @rbac.enforce
├── exceptions.py            # AgentRBACError, PermissionDenied, RiskGated
├── core/
│   ├── identity.py          # AgentIdentity, DelegationToken
│   ├── roles.py             # Role hierarchy (DAG)
│   ├── capabilities.py      # Wildcard capability matching
│   ├── policy.py            # Policy evaluator + Context
│   └── decision.py          # Decision enum + DecisionResult
├── ml/
│   ├── behavior.py          # Sliding window behavior tracker
│   └── risk_scorer.py       # Isolation Forest + heuristic fallback
├── audit/
│   └── logger.py            # FileBackend, StdoutBackend, MemoryBackend
├── registration/
│   └── ...                  # MW1–MW5 middleware chain
├── orchestration/
│   └── orchestrator.py      # Multi-agent pipeline orchestrator
├── data/
│   └── pipeline.py          # DataPipeline fluent API
└── integrations/
    ├── langchain.py         # LangChain tool wrapper
    ├── crewai.py            # CrewAI task interceptor
    └── autogen.py           # AutoGen agent wrapper
tests/
├── test_agent_rbac.py       # Core RBAC tests
└── test_v2_features.py      # v2 feature tests
examples/
├── quickstart.py            # Basic usage
├── test_agent_demo.py       # Full pipeline demo
├── delegation.py            # Delegation tokens
└── audit_demo.py            # Audit logging

License

MIT


Built for the age of AI agents. Secure every action.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

agent_rbac-0.2.0.tar.gz (51.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

agent_rbac-0.2.0-py3-none-any.whl (51.6 kB view details)

Uploaded Python 3

File details

Details for the file agent_rbac-0.2.0.tar.gz.

File metadata

  • Download URL: agent_rbac-0.2.0.tar.gz
  • Upload date:
  • Size: 51.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.3

File hashes

Hashes for agent_rbac-0.2.0.tar.gz
Algorithm Hash digest
SHA256 251a8c26efcb94a0a9e57246867d9476b886fc66ef1d94a652963e1822addcaa
MD5 88013f8ec9cbccb93cda9cdaffcf985e
BLAKE2b-256 146a5f5b77a60f67a93af2b0a29afd4c60a42b82da5c75825036f3c30717c237

See more details on using hashes here.

File details

Details for the file agent_rbac-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: agent_rbac-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 51.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.3

File hashes

Hashes for agent_rbac-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 92858937bb87d775ae6d7ab4dc08f2ad5b02b509b299a4d0bc01c1bcc7cddce3
MD5 b181dea40061a226431aff90caade29a
BLAKE2b-256 afe055512f04976d12508dc0767a1d6c8c0003d755d0f1b4c234732235f0a8e8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page