Governance SDK for AI agents — audit trails, policy engine, approval gates.
Project description
agentlattice — Python SDK
Governance-aware SDK for AI agents. Wraps agent actions with audit trails, approval gates, and circuit breaker integration.
Install
pip install agentlattice
For realtime event subscriptions: pip install agentlattice[realtime]
Method Inventory
Core
| Capability | Async | Sync | Details |
|---|---|---|---|
| Execute action | await al.execute("pr.open") |
al.execute_sync("pr.open") |
Reference |
| Gate (block until approved) | await al.gate("pr.merge") |
al.gate_sync("pr.merge") |
Reference |
Delegation
| Capability | Async | Sync | Details |
|---|---|---|---|
| Delegate sub-agent | async with al.delegate("reader", capabilities=[...], ttl=300) as sub: |
with al.delegate_sync("reader", ...) as sub: |
Reference |
| Parallel fan-out | async with AgentLattice.parallel(...) as [r1, r2]: |
— | Reference |
| Execute as child | await sub.execute("read_data") |
sub.execute_sync("read_data") |
Reference |
| Gate as child | await sub.gate("read_data") |
sub.gate_sync("read_data") |
Reference |
| Chain delegation | async with sub.delegate("sub-reader", ...) as s: |
with sub.delegate_sync(...) as s: |
Reference |
Introspection & Governance
| Capability | Async | Sync | Details |
|---|---|---|---|
| Agent identity & state | await al.whoami() |
al.whoami_sync() |
Reference |
| List policies | await al.policies(scope="org") |
al.policies_sync(scope="org") |
Reference |
| Dry-run policy check | await al.can_i("pr.merge") |
al.can_i_sync("pr.merge") |
Reference |
| Query audit trail | await al.audit(limit=50) |
al.audit_sync(limit=50) |
Reference |
| Verify audit integrity | await al.verify() |
al.verify_sync() |
Reference |
| List delegations | await al.delegations(role="parent") |
al.delegations_sync(role="parent") |
Reference |
| Revoke delegation | await al.revoke("del-abc123") |
al.revoke_sync("del-abc123") |
Reference |
| Report action outcome | await al.report("evt-abc123", outcome) |
al.report_sync("evt-abc123", outcome) |
Reference |
| Governance posture score | await al.posture() |
al.posture_sync() |
Reference |
Realtime
| Capability | Usage | Details |
|---|---|---|
| Subscribe to events | await al.subscribe("org-id", callback) |
Reference |
| Disconnect | await al.unsubscribe() |
Reference |
Types & Errors
| Export | Description | Details |
|---|---|---|
ActionResult |
Return type of execute() — status, audit ID, conditions |
Reference |
ActionOptions |
Options for execute() and gate() — data_accessed, metadata, event_id |
Reference |
ConditionResult |
Individual policy rule evaluation — field, operator, expected, result | Reference |
ReportOutcome |
Outcome for report() — status, message |
Reference |
GovernanceEvent |
Realtime event payload — event_type, agent_id, data | Reference |
AgentLatticeError |
Base error — code, message, details | Reference |
AgentLatticeDeniedError |
Policy denial or reviewer rejection — reason, policy, conditions | Reference |
AgentLatticeTimeoutError |
Approval window expired — approval_id | Reference |
Quickstart
import asyncio
import os
from agentlattice import AgentLattice
al = AgentLattice(api_key=os.environ["AL_API_KEY"])
async def main():
# gate() blocks until approved — your agent code only runs if it passes
await al.gate("pr.merge")
await merge_pr(42)
asyncio.run(main())
Handle denials and timeouts:
from agentlattice import AgentLattice, AgentLatticeDeniedError, AgentLatticeTimeoutError
al = AgentLattice(api_key=os.environ["AL_API_KEY"])
async def main():
try:
await al.gate("code.commit")
except AgentLatticeDeniedError as e:
print(f"Denied: {e.reason}, policy: {e.policy}")
print(f"Failed conditions: {e.conditions}")
except AgentLatticeTimeoutError as e:
print(f"Approval timed out: {e.approval_id}")
asyncio.run(main())
Sync usage
If you're not in an async context:
import os
from agentlattice import AgentLattice
al = AgentLattice(api_key=os.environ["AL_API_KEY"])
al.gate_sync("code.commit")
Core API
AgentLattice(api_key, *, base_url?, gate_poll_timeout_ms?, gate_poll_interval_ms?)
| Parameter | Default | Description |
|---|---|---|
api_key |
required | Bearer token — pass via os.environ["AL_API_KEY"], never hardcoded |
base_url |
https://www.agentlattice.io |
Override for self-hosted or staging |
gate_poll_timeout_ms |
8 * 60 * 60 * 1000 (8 hours) |
How long gate() polls for approval |
gate_poll_interval_ms |
5000 |
Poll interval in ms |
await al.execute(action_type, options?)
Submit an action and return immediately. Returns an ActionResult. Does not block.
from agentlattice import AgentLattice, ActionOptions
al = AgentLattice(api_key="al_...")
result = await al.execute("pr.open", ActionOptions(
data_accessed=[{"type": "source_code", "count": 5, "sensitivity": "low"}],
metadata={"pr_number": 42},
event_id="my-idempotency-key", # optional: dedup by your own key
))
if result.status == "executed":
print(f"Auto-executed under policy: {result.policy_name}")
elif result.status == "requested":
print(f"Awaiting approval: {result.approval_id}")
elif result.status == "denied":
print(f"Denied ({result.denial_reason}): {result.conditions_evaluated}")
await al.gate(action_type, options?)
Submit an action and block until it is approved or denied.
try:
await al.gate("pr.merge")
except AgentLatticeDeniedError as e:
# e.reason: "CONDITIONS_DENIED" | "POLICY_TAMPERED" | "DENIED_BY_REVIEWER"
# e.policy: name of the governing policy (if conditions-based)
# e.conditions: list of ConditionResult (which rule failed and why)
# e.approval_id: set if a human reviewer denied it
...
except AgentLatticeTimeoutError as e:
# e.approval_id: the approval that timed out
...
Sync: al.gate_sync(action_type, options?) / al.execute_sync(action_type, options?)
ActionResult
@dataclass
class ActionResult:
status: str # "executed" | "requested" | "denied" | ...
audit_event_id: str
approval_id: str | None # set when status="requested"
message: str | None
timeout_at: str | None
denial_reason: str | None # "CONDITIONS_DENIED" | "POLICY_TAMPERED" | ...
policy_name: str | None # governing policy name
conditions_evaluated: list[ConditionResult] # which rules passed/failed
ConditionResult
@dataclass
class ConditionResult:
field: str # e.g. "pr_size"
operator: str # e.g. "lt"
expected: Any # e.g. 100
result: bool # True = passed, False = failed
Agents can read conditions_evaluated to understand exactly which governance
rule blocked them and adapt their behavior accordingly.
Delegation — Scoped Sub-Agents
Create short-lived child agents with narrowed capabilities. Cleanup is automatic.
async with al.delegate("data-processor",
capabilities=["read_data", "write_results"],
ttl=300
) as sub:
await sub.execute("read_data")
result = await sub.execute("write_results")
Sync version:
with al.delegate_sync("data-processor",
capabilities=["read_data"], ttl=300
) as sub:
sub.execute_sync("read_data")
al.delegate(name, *, capabilities, ttl)
| Parameter | Description |
|---|---|
name |
Display name for the ephemeral child agent |
capabilities |
List of action types the child can perform (must be subset of parent's) |
ttl |
Time-to-live in seconds (max 86400 = 24 hours) |
Returns an async context manager yielding a DelegatedAgent. Cleanup runs
in __aexit__ regardless of exceptions.
DelegatedAgent
| Method | Description |
|---|---|
sub.execute(action_type, options?) |
Execute an action as the child |
sub.gate(action_type, options?) |
Execute + block until approved |
sub.delegate(name, *, capabilities, ttl) |
Chain further with narrower scope |
All methods have _sync variants.
Parallel fan-out
async with AgentLattice.parallel(
al.delegate("reader-1", capabilities=["read_data"], ttl=60),
al.delegate("reader-2", capabilities=["read_data"], ttl=60),
) as [r1, r2]:
result_1 = await r1.execute("read_data")
result_2 = await r2.execute("read_data")
See docs/ephemeral-agents.md for architecture details, security model, and cleanup guarantees.
Introspection & Governance
These methods give agents visibility into their own identity, policies, audit trails, and governance posture. Every method has a *_sync() variant.
await al.whoami()
Returns this agent's identity, active policies, and delegation relationships.
info = await al.whoami()
print(f"Agent: {info.name} ({info.config_id})")
print(f"Circuit breaker: {info.cb_state}")
print(f"Policies: {[p.name for p in info.policies]}")
await al.policies(scope?)
List policies. scope="own" (default) returns policies for this agent. scope="org" returns all org policies.
result = await al.policies(scope="org")
for p in result.policies:
print(f"{p.name}: {p.action_type} (approval={p.approval_required})")
await al.can_i(action_type)
Dry-run policy check. No audit event created, no approval requested.
check = await al.can_i("pr.merge")
if check.allowed:
print("Good to go")
elif check.needs_approval:
print(f"Will need approval under policy: {check.policy_name}")
await al.audit(cursor?, limit?, action_type?)
Query the audit trail with pagination.
page = await al.audit(limit=50, action_type="pr.merge")
for event in page.events:
print(f"{event.timestamp}: {event.action_type} -> {event.status}")
if page.next_cursor:
next_page = await al.audit(cursor=page.next_cursor)
await al.verify()
Verify the org's audit chain integrity (tamper detection).
result = await al.verify()
if not result.chain_valid:
print(f"Chain broken at row: {result.first_broken_at_row_id}")
await al.delegations(role?, active_only?)
List delegations. Filter by role="parent" or role="child", and active_only=False to include expired.
result = await al.delegations(role="parent", active_only=False)
for d in result.delegations:
print(f"{d.id}: active={d.active}, expires={d.expires_at}")
await al.revoke(delegation_id)
Revoke a delegation by ID.
result = await al.revoke("del-abc123")
assert result.revoked
await al.report(audit_event_id, outcome)
Report the outcome of a previously executed action. Closes the audit loop.
from agentlattice import ReportOutcome
result = await al.report(
"evt-abc123",
ReportOutcome(status="success", message="Deployed to prod"),
)
print(f"Reported at: {result.reported_at}")
Outcome statuses: "success", "failure", "partial".
await al.posture()
Get the org's governance posture score (0-100).
result = await al.posture()
print(f"Score: {result.score}/100")
for name, comp in result.components.items():
print(f" {name}: {comp.score}/{comp.max}")
Realtime Events
Subscribe to governance events via WebSocket. Requires pip install agentlattice[realtime].
from agentlattice import AgentLattice, GovernanceEvent
al = AgentLattice(api_key=os.environ["AL_API_KEY"])
def on_event(event: GovernanceEvent):
print(f"[{event.event_type}] agent={event.agent_id} data={event.data}")
await al.subscribe("org-id", on_event)
# Filter to specific event types
await al.subscribe("org-id", on_event, events=["action.denied", "anomaly.detected"])
# Filter to a specific agent
await al.subscribe("org-id", on_event, agent_filter="agent-123")
# Cleanup
await al.unsubscribe()
Event types: action.denied, action.executed, delegation.expired, delegation.revoked, policy.changed, anomaly.detected, enforcement.triggered.
Patterns
Self-correcting agent
conditions_evaluated is what makes this more than a compliance layer — it
closes the feedback loop so agents can adapt, not just fail.
from agentlattice import AgentLattice, AgentLatticeDeniedError, ActionOptions
al = AgentLattice(api_key=os.environ["AL_API_KEY"])
async def commit_with_governance(files: list[str]) -> None:
try:
await al.gate("pr.open", ActionOptions(
metadata={"pr_size": len(files), "repo": "acme/backend"},
))
await open_pr(files)
except AgentLatticeDeniedError as e:
if e.reason == "CONDITIONS_DENIED":
# Find which rules failed and why
failed = [c for c in e.conditions if not c.result]
# e.g. [ConditionResult(field="pr_size", operator="lt", expected=50, result=False)]
for rule in failed:
if rule.field == "pr_size":
# Policy says PRs must be < 50 files — split and retry
mid = len(files) // 2
await commit_with_governance(files[:mid])
await commit_with_governance(files[mid:])
return
# Reviewer denial or policy tamper — no retry
raise
Error Hierarchy
AgentLatticeError base — code, message, details
AgentLatticeDeniedError approval_id?, reason?, policy?, conditions?
AgentLatticeTimeoutError approval_id
Reliability
execute() and gate() retry automatically on 5xx and network errors — up to 3 attempts with 1s/2s/4s exponential backoff. Each request has a 30-second timeout. Pass event_id to make retries idempotent.
execute_sync() and gate_sync() are safe in async runtimes (LangGraph, FastAPI, CrewAI) — they detect a running event loop and dispatch to a thread pool automatically.
License
MIT
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file agentlattice-0.4.2.tar.gz.
File metadata
- Download URL: agentlattice-0.4.2.tar.gz
- Upload date:
- Size: 24.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
545ae35e40b8fcad3af3ffe19cc34bf4db5f90b031e74f22b21e64f7010f2a21
|
|
| MD5 |
73c257bb37c0c208bc2cd56df403f198
|
|
| BLAKE2b-256 |
db7df983016859f3caff89943c301fdf666a8662a9e38d6146f8c4b66bc1d7f0
|
File details
Details for the file agentlattice-0.4.2-py3-none-any.whl.
File metadata
- Download URL: agentlattice-0.4.2-py3-none-any.whl
- Upload date:
- Size: 18.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5456588647d44f4503d61d1966854c14d0928fa7ac0d64dd825e973a5efd9da0
|
|
| MD5 |
a8814b65b0e4cce91f7b582e6bcb4659
|
|
| BLAKE2b-256 |
a94de851991930488d6a00140af0515d5a781d1aea2680edb922c9677a556b62
|