Agent-to-Agent (A2A) task routing system with audit trails, policy engine, reputation tracking, and gateway
Project description
Empire A2A (Agent-to-Agent) Task Routing System
A production-grade Agent-to-Agent (A2A) task routing system with persistent storage, audit trails, policy enforcement, reputation tracking, cross-node transport, task composition, scheduling, caching, multi-tenant isolation, and external SDK/observability.
Features
- Capability-Based Routing: Route tasks to agents based on capabilities, load, cost, latency, affinity, or round-robin
- Persistent Storage: SQLite-backed task storage with WAL mode for concurrent access
- Tamper-Evident Audit Trails: SHA-256 chained audit logs with integrity verification
- Full Lifecycle Tracking: 10-state task machine with persistence hooks and timeout recovery
- Policy Engine: Hot-reloadable YAML policies with ALLOW/DENY/REQUIRE rules and budget caps
- Reputation System: Exponential decay scoring with sliding window metrics and p50/p95 latency tracking
- Mesh Transport: Cross-node HTTP transport via Tailscale with retry/backoff and circuit breaking
- Task Composition: DAG decomposition with topological sort and 6 aggregation strategies
- Scheduler: Cron expression parser with shortcuts (@hourly, @daily, etc.) and jitter support
- Result Cache: Hash-based cache with per-capability TTLs and automatic invalidation
- Multi-Tenant Isolation: Workspace-based isolation with explicit cross-workspace allowlists
- Client SDK: Sync and async clients with HMAC request signing
- Observability: Prometheus metrics endpoint and deep health checks
- Gateway Service: FastAPI REST/WebSocket service with HMAC authentication and rate limiting
Installation
pip install imperial-a2a
For development dependencies:
pip install imperial-a2a[dev]
For async client:
pip install imperial-a2a[async]
For full gateway:
pip install imperial-a2a[gateway]
Quick Start
from imperial_a2a import (
get_a2a_router,
get_lifecycle,
get_audit_trail,
get_policy_engine,
get_reputation_engine,
get_task_store,
A2AClient,
sign_request
)
# Initialize components
router = get_a2a_router()
lifecycle = get_lifecycle()
audit = get_audit_trail()
policy = get_policy_engine()
reputation = get_reputation_engine()
store = get_task_store()
# Register an agent capability
router.register_agent_capability(
agent_id="agent-001",
capability=AgentCapability(
agent_id="agent-001",
capability_id="text-summarization",
description="Summarizes text documents",
input_schema={"type": "object", "properties": {"text": {"type": "string"}}},
output_schema={"type": "object", "properties": {"summary": {"type": "string"}}},
cost_per_task=0.001,
avg_latency_ms=500,
max_concurrent=5
)
)
# Submit a task via client
client = A2AClient(gateway_url="http://localhost:8000")
result = client.submit_task(
capability_id="text-summarization",
input_data={"text": "Long document to summarize..."},
priority=TaskPriority.NORMAL
)
# Check task status
status = client.get_task_status(result.task_id)
Running the Gateway
uvicorn imperial_a2a.gateway:create_gateway --host 0.0.0.0 --port 8000
Configuration
The policy engine loads YAML configuration from empire/config/a2a_policy.yaml by default:
version: 1
rules:
- effect: DENY
scope: GLOBAL
when:
agent_id: "blocked-agent-001"
description: "Block specific agent"
- effect: REQUIRE
scope: CAPABILITY
when:
capability_id: "high-risk-operation"
then:
- effect: DENY
when:
reputation_score: {"<": 0.5}
description: "Require good reputation for high-risk ops"
budgets:
- agent_id: "agent-001"
capability_id: "premium-service"
daily_limit: 100.0
description: "Daily budget for premium service"
API Endpoints
Gateway (/)
POST /tasks- Submit a new taskGET /tasks/{task_id}- Get task statusGET /tasks- List tasks with filtersPOST /tasks/{task_id}/cancel- Cancel a taskGET /agents/{agent_id}- Get agent infoGET /capabilities- List capabilitiesGET /health- Basic health checkGET /health/deep- Deep health check (503 if unhealthy)GET /metrics- Prometheus metricsWS /ws/{client_id}- WebSocket for real-time updates
Observability
- Prometheus metrics available at
/metricsendpoint - Deep health checks at
/health/deep - Structured JSON logging available via
setup_structured_logging()
Testing
Run the test suite:
pytest empire/tests/ -v
License
MIT License - see LICENSE file for details.
Empire Integration
This A2A system is designed to integrate with the Nano Empire AI operating system. It exposes metrics and health endpoints that can be scraped by the empire observability system.
For integration with the empire dashboard, see empire/viz/empire_dashboard.py which includes A2A-specific health checks and metrics.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file imperial_a2a-1.0.1.tar.gz.
File metadata
- Download URL: imperial_a2a-1.0.1.tar.gz
- Upload date:
- Size: 109.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
61171818b19361887456221c5067472400f4b42badc53cc0564b2e48f92b7a36
|
|
| MD5 |
9382d50447372efbef6120c2e4702ecc
|
|
| BLAKE2b-256 |
1219421a4e6049a2ac7e0fa9938e3bd11b99d79ce904c6d68b36c75b346f56e5
|
File details
Details for the file imperial_a2a-1.0.1-py3-none-any.whl.
File metadata
- Download URL: imperial_a2a-1.0.1-py3-none-any.whl
- Upload date:
- Size: 119.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c37e5c697200932d2a0e2f6d02c45b4dd3b0f48fd4c2dcd845ff5bafd0da196a
|
|
| MD5 |
d121dd649601759d7e1b838b892936cf
|
|
| BLAKE2b-256 |
a4a01dcdef85a318acc6cfb4d1fc103194c71ec6ddec85e3c86b4d560acfe8da
|