Role-Based Access Control for AI Agents with ML-Powered Anomaly Detection
Project description
AgentRBAC
Role-Based Access Control for AI Agents
A drop-in Python library that gates every agent action — tool calls, memory access, sub-agent spawning — through a layered RBAC policy engine with behavioral ML scoring.
Installation • Quick Start • User Guide • API Reference • Examples
What is AgentRBAC?
AgentRBAC provides fine-grained access control for AI agent systems. It sits between your agents and their tools/resources, enforcing permissions at every step.
Agent request → Behavior tracking → ML risk scoring → Policy evaluation → Capability check → ALLOW / DENY
Key features:
- Role hierarchy with wildcard capabilities and explicit deny rules
- Registration middleware (MW1–MW5) for secure agent onboarding
- Delegation tokens — scoped, time-bound, HMAC-signed capability delegation
- ML anomaly detection — Isolation Forest behavioral scoring with heuristic fallback
- Multi-agent orchestration with cumulative risk tracking
- Append-only audit log with pluggable backends
- Framework integrations — LangChain, CrewAI, AutoGen, or any custom framework
- Zero required dependencies — ML and framework extras are opt-in
Installation
From PyPI (when published)
pip install agent-rbac
From GitHub
pip install git+https://github.com/Parikshit-jadhav-ai/agent-rbac.git
From source
git clone https://github.com/Parikshit-jadhav-ai/agent-rbac.git
cd agent-rbac
pip install -e .
Optional extras
| Extra | Install command | What it adds |
|---|---|---|
| ml | pip install "agent-rbac[ml]" |
scikit-learn, sentence-transformers — full ML risk scoring |
| langchain | pip install "agent-rbac[langchain]" |
LangChain tool wrapper |
| crewai | pip install "agent-rbac[crewai]" |
CrewAI task interceptor |
| autogen | pip install "agent-rbac[autogen]" |
AutoGen agent wrapper |
| observability | pip install "agent-rbac[observability]" |
Prometheus + OpenTelemetry |
| data | pip install "agent-rbac[data]" |
Dask for distributed data pipelines |
| all | pip install "agent-rbac[all]" |
Everything above |
| dev | pip install "agent-rbac[dev]" |
pytest + coverage |
Quick Start
import agent_rbac as rbac_lib
from agent_rbac import AgentRBAC, AgentIdentity
# 1. Create the RBAC engine
rbac = AgentRBAC()
# 2. Define roles
rbac.define_role("viewer", capabilities=["tool:*:read"])
rbac.define_role("agent", parent="viewer",
capabilities=["memory:*:read", "memory:*:write"])
# 3. Create an agent identity
agent = AgentIdentity(agent_id="my_bot", roles=["agent"])
# 4. Check access
result = rbac.check(agent, "tool:web_search", "read")
print(result.decision) # Decision.ALLOW
print(result.allowed) # True
print(result.risk_score) # 0.0
Using the decorator
@rbac.enforce(resource="tool:web_search", action="read")
def web_search(query: str) -> str:
return f"results for {query}"
result = web_search("AI news", _principal=agent) # passes RBAC check
Registering agents (v2 middleware)
from agent_rbac import AgentRBAC, AgentType
rbac = AgentRBAC()
agent = rbac.register(
name="DataProcessor",
description="Cleans tabular datasets",
role="data_engineer",
intent="Process datasets: clean, aggregate, summarise",
type=AgentType.DATA_AGENT,
version="1.0.0",
owner_id="user_123",
endpoint="http://agents.internal/data-processor",
)
rbac.check(agent, "data", "write").allowed # True
rbac.check(agent, "model", "train").allowed # False
User Guide
Core Concepts
| Concept | Description |
|---|---|
| Principal | Anything that makes a request — a user, an AI agent, or a sub-agent |
| Role | Named permission set in a DAG hierarchy. Children inherit parent capabilities |
| Capability | Permission string: resource_type:resource_name:action with wildcard support |
| Context | Runtime state passed to policies: environment, task, timestamps |
| Decision | Outcome: ALLOW, DENY, DELEGATE, or CONDITIONAL |
Capability Format
tool:web_search:read # specific tool, specific action
tool:*:read # all tools, read only
memory:*:* # all memory, all actions
agent:spawn:* # spawn any sub-agent type
Defining Roles
Roles form a hierarchy. Child roles inherit all parent capabilities. Explicit deny rules always override allows.
rbac.define_role("viewer", capabilities=[
"tool:web_search:read",
"tool:calculator:read",
])
rbac.define_role("research_agent", parent="viewer",
capabilities=["tool:arxiv:read", "memory:notes:write"])
rbac.define_role("restricted",
capabilities=["tool:web_search:read"],
deny=["tool:payment:*", "memory:credentials:*"])
rbac.define_role("admin",
capabilities=["tool:*:*", "memory:*:*", "agent:spawn:*"])
Built-in Role Hierarchy
viewer
├── developer (+ agent:create, agent:update, agent:deploy, logs:read)
├── data_engineer (+ data:read, data:write, model:infer)
└── agent (+ agent:invoke, data:read, model:infer)
└── senior_agent (+ data:write, agent:spawn, memory:read)
└── orchestrator (+ agent:spawn:any, graph:route, pipeline:manage)
└── admin (+ agent:delete, rbac:manage, audit:read, model:train)
Checking Access
# Full result object
result = rbac.check(agent, "tool:web_search", "read")
result.decision # Decision.ALLOW
result.allowed # True
result.reason # ""
result.risk_score # 0.0
# Boolean shorthand
rbac.is_allowed(agent, "tool:web_search", "read") # True
# Exception-raising variant
from agent_rbac import PermissionDenied
try:
rbac.check_or_raise(agent, "model", "train")
except PermissionDenied as e:
print(e.reason) # "no_matching_capability"
Context-Aware Policies
Policies are callables that fire after role resolution. Return a Decision to override, or None to abstain.
from agent_rbac import Decision, Context
def sandbox_guard(principal, resource, action, ctx):
if ctx.environment == "sandbox" and action == "write":
return Decision.DENY
return None
rbac.add_policy("sandbox_guard", sandbox_guard, priority=10)
result = rbac.check(agent, "memory:data", "write",
context=Context(environment="sandbox"))
# result.decision == Decision.DENY
| Context Field | Type | Description |
|---|---|---|
environment |
str |
"production" / "staging" / "sandbox" |
task_id |
str |
Current task identifier |
task_type |
str |
Category of the current task |
agent_depth |
int |
Depth in the agent spawn chain |
timestamp |
float |
When the request was made |
conversation_hash |
str |
Hash of conversation history |
extra |
dict |
Additional key-value pairs |
Delegation — Sub-Agent Tokens
A parent agent can delegate a subset of its capabilities with time-bound, HMAC-signed tokens.
token = parent.delegate(
to="summarizer_001",
capabilities=["tool:web_search:read"],
roles=["viewer"],
expires_in=300, # seconds
)
assert parent.verify_token(token)
child = token.to_identity()
rbac.check(child, "tool:web_search", "read").allowed # True
rbac.check(child, "memory:data", "write").allowed # False — exceeds ceiling
ML Risk Scoring
AgentRBAC tracks each agent's behavior in a sliding window and scores anomalies from 0 (normal) to 1 (anomalous). Scoring activates after 10 events per agent.
| Risk Level | Score | Action |
|---|---|---|
| Low | < 0.2 | Pass through |
| Medium | 0.2 – 0.7 | Flagged + logged |
| High | > 0.7 | Overridden to CONDITIONAL |
# Adjust thresholds
rbac.set_risk_thresholds(low=0.15, high=0.6)
# Train on custom data
rbac.train_ml_model(samples, contamination=0.05)
rbac.save_ml_model("models/risk_model.pkl")
rbac.load_ml_model("models/risk_model.pkl")
Feature vector (6 dimensions):
| Feature | Description |
|---|---|
action_rate |
Actions per minute in sliding window |
unique_resource_ratio |
Distinct resources / total actions |
boundary_probe_rate |
Ratio of resource types probed |
repeat_ratio |
How often current resource is re-hit |
time_since_last |
Seconds since previous action |
resource_entropy |
Shannon entropy of resource access distribution |
Multi-Agent Orchestration
orch = rbac.orchestrator(cumulative_risk_ceiling=0.85)
orch.pipeline([gateway.agent_id, processor.agent_id, model.agent_id])
result = orch.run(
input={"dataset_path": "/data/raw/sales.csv"},
caller=user_identity,
)
result["hops"] # per-hop RBAC results
result["cumulative_risk"] # accumulated risk score
result["short_circuited"] # True if risk ceiling was hit
result["output"] # final agent output
DataPipeline
from agent_rbac import DataPipeline
pipeline = (
DataPipeline(source="s3://bucket/raw/sales.csv", partition_size="256MB")
.clean(drop_nulls=True, deduplicate=True)
.aggregate(group_by="user_id", agg={"spend": "sum"})
.write(sink="s3://bucket/processed/", format="parquet")
)
pipeline.plan() # inspect steps without running
pipeline.execute(mode="local") # run locally
pipeline.execute(mode="distributed") # run with Dask
Audit Logging
Every decision is recorded in an append-only audit log with full context.
from agent_rbac import AgentRBAC, FileBackend, StdoutBackend, MemoryBackend
# File backend
rbac = AgentRBAC(audit_file="rbac_audit.jsonl")
# In-memory (testing)
mem = MemoryBackend()
rbac.audit.add_backend(mem)
rbac.check(agent, "tool:web_search", "read")
print(mem.records[-1])
# {'timestamp': ..., 'event': 'access_check', 'agent_id': 'bot_001',
# 'decision': 'ALLOW', 'risk_score': 0.0, ...}
# Custom events
rbac.audit.log_event("agent_spawned", {"child_id": "sub_001"})
Custom Backend
class MyBackend:
def write(self, record: dict):
my_db.insert(record)
def flush(self):
pass
rbac.audit.add_backend(MyBackend())
Framework Integrations
LangChain
from agent_rbac.integrations.langchain import LangChainRBACToolWrapper
wrapped = LangChainRBACToolWrapper(
rbac=rbac, tool=search_tool,
resource="tool:duckduckgo", action="read",
)
result = wrapped.run("query", principal=agent, context=ctx)
CrewAI
from agent_rbac.integrations.crewai import CrewAITaskInterceptor
interceptor = CrewAITaskInterceptor(rbac=rbac)
interceptor.before_task(
agent_id="researcher",
task_description="Find papers",
tools_needed=["tool:arxiv:read"],
principal=agent, context=ctx,
)
AutoGen
from agent_rbac.integrations.autogen import AutoGenRBACAgent
guarded = AutoGenRBACAgent(
rbac=rbac, agent=my_autogen_agent, principal=agent,
tool_resource_map={"code_exec": "tool:code", "web_search": "tool:web_search"},
)
reply = guarded.generate_reply(messages=[...])
Docker
# Build and run
docker build -t agent-rbac:latest .
docker run --rm agent-rbac:latest
# With persistent audit log
docker run --rm -v $(pwd)/logs:/app/logs \
-e RBAC_AUDIT_FILE=/app/logs/rbac_audit.jsonl agent-rbac:latest
# docker-compose
docker-compose up # start demo
docker-compose run --rm rbac pytest tests/ -v # run tests
docker-compose --profile ml up # with ML deps
Testing
# Install dev dependencies
pip install -e ".[dev]"
# Run tests
pytest tests/ -v
# With coverage
pytest tests/ -v --cov=agent_rbac --cov-report=term-missing
# Run examples
python examples/quickstart.py
python examples/test_agent_demo.py
python examples/delegation.py
python examples/audit_demo.py
API Reference
AgentRBAC — Main Engine
| Method | Returns | Description |
|---|---|---|
AgentRBAC(audit_file=None) |
AgentRBAC |
Create the RBAC engine |
.register(**kwargs) |
AgentIdentity |
Register agent through MW1–MW5 chain |
.define_role(name, capabilities, deny, parent) |
None |
Add a custom role to the hierarchy |
.check(principal, resource, action, context) |
DecisionResult |
Full RBAC + ML pipeline check |
.is_allowed(principal, resource, action, ctx) |
bool |
Boolean shorthand for .check() |
.check_or_raise(principal, resource, action) |
DecisionResult |
Raises PermissionDenied / RiskGated |
.add_policy(name, fn, priority) |
None |
Register a context-aware policy |
.bind_role(entity_id, roles, expires_in) |
None |
Bind roles at runtime with optional TTL |
.get_bound_roles(entity_id) |
list[str] |
Get active (non-expired) bound roles |
.orchestrator(cumulative_risk_ceiling) |
Orchestrator |
Create a pipeline orchestrator |
.set_risk_thresholds(low, high) |
None |
Configure ML gating levels |
.train_ml_model(samples, contamination) |
None |
Train Isolation Forest model |
.save_ml_model(path) |
None |
Persist trained model |
.load_ml_model(path) |
None |
Load trained model |
.audit_log(limit) |
list[dict] |
Retrieve recent audit records |
@rbac.enforce(resource, action) |
decorator | Gate function calls with RBAC |
AgentIdentity
| Method | Returns | Description |
|---|---|---|
AgentIdentity(agent_id, roles, capability_ceiling) |
AgentIdentity |
Create a principal |
.delegate(to, capabilities, roles, expires_in) |
DelegationToken |
Mint a scoped child token |
.verify_token(token) |
bool |
Verify HMAC signature |
DelegationToken
| Method | Returns | Description |
|---|---|---|
.to_identity() |
AgentIdentity |
Convert token to usable identity |
DecisionResult
| Attribute | Type | Description |
|---|---|---|
decision |
Decision |
ALLOW, DENY, CONDITIONAL, or DELEGATE |
allowed |
bool |
True only when decision is ALLOW |
reason |
str |
Human-readable explanation |
risk_score |
float |
ML/heuristic risk score (0–1) |
Orchestrator
| Method | Returns | Description |
|---|---|---|
.pipeline(agent_ids) |
None |
Define a linear agent pipeline |
.run(input, caller, context) |
dict |
Execute with RBAC enforcement at each hop |
DataPipeline
| Method | Returns | Description |
|---|---|---|
DataPipeline(source, partition_size) |
DataPipeline |
Create a pipeline |
.clean(drop_nulls, deduplicate) |
DataPipeline |
Add cleaning step |
.aggregate(group_by, agg) |
DataPipeline |
Add aggregation step |
.filter(expr) |
DataPipeline |
Add filter step |
.write(sink, format) |
DataPipeline |
Add write step |
.plan() |
list[dict] |
Inspect pipeline steps |
.execute(mode) |
dict |
Run "local" or "distributed" |
Project Structure
agent_rbac/
├── __init__.py # Public API & exports
├── config.py # AgentRBAC entry point
├── decorators.py # @rbac.enforce
├── exceptions.py # AgentRBACError, PermissionDenied, RiskGated
├── core/
│ ├── identity.py # AgentIdentity, DelegationToken
│ ├── roles.py # Role hierarchy (DAG)
│ ├── capabilities.py # Wildcard capability matching
│ ├── policy.py # Policy evaluator + Context
│ └── decision.py # Decision enum + DecisionResult
├── ml/
│ ├── behavior.py # Sliding window behavior tracker
│ └── risk_scorer.py # Isolation Forest + heuristic fallback
├── audit/
│ └── logger.py # FileBackend, StdoutBackend, MemoryBackend
├── registration/
│ └── ... # MW1–MW5 middleware chain
├── orchestration/
│ └── orchestrator.py # Multi-agent pipeline orchestrator
├── data/
│ └── pipeline.py # DataPipeline fluent API
└── integrations/
├── langchain.py # LangChain tool wrapper
├── crewai.py # CrewAI task interceptor
└── autogen.py # AutoGen agent wrapper
tests/
├── test_agent_rbac.py # Core RBAC tests
└── test_v2_features.py # v2 feature tests
examples/
├── quickstart.py # Basic usage
├── test_agent_demo.py # Full pipeline demo
├── delegation.py # Delegation tokens
└── audit_demo.py # Audit logging
License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file agent_rbac-0.2.0.tar.gz.
File metadata
- Download URL: agent_rbac-0.2.0.tar.gz
- Upload date:
- Size: 51.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
251a8c26efcb94a0a9e57246867d9476b886fc66ef1d94a652963e1822addcaa
|
|
| MD5 |
88013f8ec9cbccb93cda9cdaffcf985e
|
|
| BLAKE2b-256 |
146a5f5b77a60f67a93af2b0a29afd4c60a42b82da5c75825036f3c30717c237
|
File details
Details for the file agent_rbac-0.2.0-py3-none-any.whl.
File metadata
- Download URL: agent_rbac-0.2.0-py3-none-any.whl
- Upload date:
- Size: 51.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
92858937bb87d775ae6d7ab4dc08f2ad5b02b509b299a4d0bc01c1bcc7cddce3
|
|
| MD5 |
b181dea40061a226431aff90caade29a
|
|
| BLAKE2b-256 |
afe055512f04976d12508dc0767a1d6c8c0003d755d0f1b4c234732235f0a8e8
|