Coordination infrastructure for AI agents — registry, leases, leader election, and entitlements over gRPC
Project description
AgentKeeper Python SDK
Coordination infrastructure for AI agent systems. AgentKeeper gives your agents a shared nervous system: they can register themselves, discover each other, claim exclusive tasks, elect a leader, and enforce who is allowed to do what — all over a single gRPC connection.
What it does
| Feature | What it solves |
|---|---|
| Agent Registry | Agents announce themselves with a description and tool list. Dead agents are detected automatically via heartbeat timeouts. |
| Semantic Search | Find the right agent for a task using a natural language query. |
| Lease Manager | Guarantees exactly one agent works on a given task at a time. Leases expire automatically if an agent crashes. |
| Leader Election | Multiple orchestrator instances stand by; only one leads. Failover is automatic when the leader dies. |
| Entitlements | ABAC policy engine. Controls which agents can call which tools. DENY rules always override ALLOW. |
Installation
pip install agentkeeper
The SDK connects to a running AgentKeeper daemon. Download the binary for your platform from the releases page and run it:
./agentkeeper # listens on :7070 by default
Quick start
The typical flow has two sides: an orchestrator that creates tasks and assigns them, and agents that register and do the work.
import uuid
from agentkeeper import AgentKeeperClient, AgentInfo, WatchEvent
client = AgentKeeperClient(host="localhost", port=7070)
# ── Orchestrator side ──────────────────────────────────────────────────────
# React to events in real time — no polling needed
def on_event(event: WatchEvent):
if event.topic == "agent.died":
agent: AgentInfo = event.payload
print(f"Agent {agent.card.name} died — reassigning its tasks")
reassign_tasks_for(agent.agent_id)
sub = client.watch(["agent."], on_event)
# Create a task — task_id is whatever ID your system uses (database row,
# queue message ID, UUID). AgentKeeper doesn't create tasks; it coordinates
# who works on them.
task_id = f"task:{uuid.uuid4()}"
# Find the best available agent for the work
agents = client.find(query="fetch and parse web pages", tools=["browser"])
for agent in agents:
lease = client.acquire_lease(task_id, agent.agent_id, ttl=120)
if lease:
print(f"Assigned {task_id} to {agent.card.name}")
send_task_to_agent(agent.agent_id, task_id)
break # one winner, everyone else is locked out
# ── Agent side ─────────────────────────────────────────────────────────────
with client.register(
name="scraper-1",
description="Fetches and parses web pages",
version="1.0.0",
tools=["browser", "http_client"],
) as agent:
# Heartbeat runs automatically in the background.
# The agent stays ALIVE as long as this block is running.
task_id = receive_task_assignment() # from orchestrator
with client.acquire_lease(task_id, agent.agent_id, ttl=120) as lease:
do_work(task_id)
# lease released automatically on exit
# Agent deregistered automatically on exit — triggers agent.died on the bus
Connecting
Sidecar mode — connect to a daemon that is already running:
# Default: localhost:7070
client = AgentKeeperClient()
# Custom host/port
client = AgentKeeperClient(host="agentkeeper.internal", port=7070)
# Use as a context manager to close the channel on exit
with AgentKeeperClient() as client:
...
Embedded mode — the SDK finds and starts the daemon for you. Useful for local development and tests. The daemon process is shut down automatically when the client closes:
# SDK locates the binary and starts it on a free port
with AgentKeeperClient.embedded() as client:
with client.register("my-agent", ...) as agent:
...
# daemon is stopped here
# Explicit binary path or port
client = AgentKeeperClient.embedded(
binary="/usr/local/bin/agentkeeper",
port=9090,
)
# With persistence — policies and agent history survive restarts
client = AgentKeeperClient.embedded(
data_dir="/var/lib/myapp/agentkeeper",
)
Binary resolution order for embedded mode:
binary=argumentAGENTKEEPER_BINenvironment variableagentkeeperonPATH./daemon/agentkeeperrelative to the project root (dev layout)
Dashboard and metrics
The daemon exposes an HTTP server on port 9090 (configurable with --metrics-addr):
| Endpoint | What it returns |
|---|---|
http://localhost:9090/ |
Live dashboard — agents alive, leases active. Auto-refreshes every 5 seconds. |
http://localhost:9090/healthz |
ok with HTTP 200 — for K8s liveness probes and load balancers. |
http://localhost:9090/metrics |
Prometheus text format. |
Open http://localhost:9090 in your browser while the daemon is running to see the dashboard.
Prometheus metrics exposed:
| Metric | Type | Description |
|---|---|---|
agentkeeper_agents_alive |
Gauge | Agents currently ALIVE or UNREACHABLE |
agentkeeper_leases_active |
Gauge | Leases currently held |
agentkeeper_rpc_total{method="..."} |
Counter | gRPC calls served per method |
Agent Registry
Register an agent
with client.register(
name="analyst-1", # unique name within your deployment
description="Reads data and produces summaries using GPT-4",
version="2.1.0",
tools=["sql_runner", "pdf_reader"],
metadata={"team": "data", "region": "us-east"},
ttl=30, # seconds before heartbeat is required
) as agent:
print(agent.agent_id)
# background thread sends Heartbeat every ttl/2 seconds automatically
The ttl controls how quickly a crashed agent is detected. An agent that stops heartbeating transitions ALIVE → UNREACHABLE → DEAD. Once DEAD, all its leases are released and it is removed from elections.
Find agents
# Semantic search — returns agents ranked by relevance to the query
agents = client.find(query="fetch and parse web pages", limit=5)
# Hard filter by tools — only returns agents that have ALL listed tools
agents = client.find(tools=["browser", "http_client"])
# Combine: semantic ranking within the tool-filtered set
agents = client.find(
query="scrape structured data from HTML",
tools=["browser"],
limit=3,
)
for agent in agents:
print(agent.agent_id, agent.card.name, agent.status)
Inspect an agent
from agentkeeper import AgentStatus
info = client.get_agent(agent_id)
print(info.card.name) # "scraper-1"
print(info.status) # AgentStatus.ALIVE
print(info.registered_at) # datetime
print(info.last_heartbeat_at) # datetime
Lease Manager
Leases solve the double-execution problem: when two agents race to pick up the same task, only one wins. The winning agent holds an exclusive lock until the work is done, the TTL expires, or the agent dies.
Acquire a lease
lease = client.acquire_lease(
task_id="task:process-invoice-42", # any string key
agent_id=agent.agent_id,
ttl=120, # seconds
)
if lease is None:
print("Another agent already owns this task")
else:
print(f"I own it. Token: {lease.lease.token}")
lease.release()
Use as a context manager (recommended)
lease = client.acquire_lease("task:process-invoice-42", agent.agent_id)
if lease:
with lease:
do_work()
# lease.release() called automatically on exit, even on exception
Renew a lease for long-running tasks
with client.acquire_lease("task:long-job", agent.agent_id, ttl=60) as lease:
for chunk in process_large_file():
lease.renew(ttl=60) # reset the TTL after each chunk
process(chunk)
Typical orchestrator pattern
agents = client.find(query="process invoices", tools=["pdf_reader"])
for agent in agents:
lease = client.acquire_lease("task:invoice-42", agent.agent_id)
if lease:
# This agent won the race — assign the work
assign_task(agent.agent_id, "task:invoice-42")
break
Leader Election
Use leader election when you run multiple orchestrator instances for high availability. Only one is active at a time; the others are hot standbys.
with client.register("orchestrator", "Pipeline orchestrator", "1.0.0") as orch:
state = client.campaign(group="orchestrator:pipeline-A", agent_id=orch.agent_id)
if state.leader_agent_id == orch.agent_id:
print("I am the leader — running the pipeline")
run_pipeline()
else:
print(f"Standby — leader is {state.leader_agent_id}")
wait_for_leadership_change()
When the leader's agent dies (crash, deregister(), or heartbeat timeout), the daemon immediately re-elects from the remaining candidates and increments term. A new candidate joining never displaces the current leader — only death or resign() does.
# Graceful handoff: resign so a standby takes over immediately
client.resign(group="orchestrator:pipeline-A", agent_id=orch.agent_id)
# Check the current leader without campaigning
state = client.campaign("orchestrator:pipeline-A", orch.agent_id)
print(state.leader_agent_id)
print(state.term) # logical clock — increments on every election event
print(state.elected_at) # datetime
Entitlements
The entitlements engine controls which agents can perform which actions on which resources. All three fields (subject, action, resource) support * as a wildcard.
Set policies
from agentkeeper import PolicyEffect
# Allow the orchestrator to call any tool
client.set_policy(
id="allow-orch-all",
subject="agent:orchestrator",
action="call",
resource="tool:*",
effect=PolicyEffect.ALLOW,
)
# Deny the scraper from writing files (even if a broader allow exists)
client.set_policy(
id="deny-scraper-write",
subject="agent:scraper",
action="call",
resource="tool:write_file",
effect=PolicyEffect.DENY,
)
DENY always wins. If both an ALLOW and a DENY match the same request, the request is rejected.
Check a policy
result = client.check_policy(
subject="agent:scraper",
action="call",
resource="tool:write_file",
)
print(result.allowed) # False
print(result.reason) # 'denied by policy "deny-scraper-write"'
Use check_policy in your tool handlers before executing any action:
def call_tool(agent_name: str, tool: str):
result = client.check_policy(
subject=f"agent:{agent_name}",
action="call",
resource=f"tool:{tool}",
)
if not result.allowed:
raise PermissionError(result.reason)
run_tool(tool)
List and delete policies
# All policies
policies = client.list_policies()
# Filtered by subject prefix
scraper_policies = client.list_policies(subject_prefix="agent:scraper")
# Delete by id
client.delete_policy("deny-scraper-write")
Event Bus
The event bus is the reactive layer. Instead of polling get_agent() in a loop to detect deaths or checking election state on a timer, you subscribe once and receive events as they happen.
watch()
from agentkeeper import WatchEvent, AgentInfo, Lease
def on_event(event: WatchEvent):
print(event.topic) # e.g. "agent.died"
print(event.sequence) # globally monotonic — detect gaps for replay
print(event.timestamp) # datetime (UTC)
print(event.payload) # AgentInfo | Lease | ElectionState | None
# Subscribe to a topic prefix — "agent." matches agent.registered,
# agent.died, agent.unreachable
sub = client.watch(["agent."], on_event)
# Subscribe to multiple prefixes
sub = client.watch(["agent.", "lease."], on_event)
# Empty list = all topics
sub = client.watch([], on_event)
# Stop receiving events
sub.cancel()
Use as a context manager to automatically cancel on exit:
with client.watch(["agent.died"], on_event):
run_pipeline()
# subscription cancelled here
Built-in topics and their payload types:
| Topic | Payload type | Fired when |
|---|---|---|
agent.registered |
AgentInfo |
An agent calls register() |
agent.unreachable |
AgentInfo |
First missed heartbeat deadline |
agent.died |
AgentInfo |
Agent dead — deregistered or heartbeat timeout |
lease.acquired |
Lease |
An agent wins a lease |
lease.renewed |
Lease |
An agent renews its lease |
lease.expired |
Lease |
A lease TTL elapsed with no renewal |
lease.released |
Lease |
An agent explicitly released its lease |
leader.changed |
ElectionState |
Leadership changed in any election group |
Practical orchestrator pattern
import uuid
from agentkeeper import WatchEvent, AgentInfo, Lease
# task registry — in production this would be a database
active_tasks: dict[str, str] = {} # task_id → agent_id
def on_agent_died(event: WatchEvent):
dead_agent: AgentInfo = event.payload
# find all tasks held by this agent and reassign them
for task_id, holder in list(active_tasks.items()):
if holder == dead_agent.agent_id:
reassign(task_id)
def on_lease_expired(event: WatchEvent):
lease: Lease = event.payload
# a task went unfinished — put it back in the queue
requeue(lease.task_id)
def on_event(event: WatchEvent):
if event.topic == "agent.died":
on_agent_died(event)
elif event.topic == "lease.expired":
on_lease_expired(event)
with client.watch(["agent.died", "lease.expired"], on_event):
run_forever()
watch_election()
Standby orchestrators use watch_election() to know the moment they become leader — no polling, no delay:
my_agent_id = agent.agent_id
def on_leader_change(state):
if state.leader_agent_id == my_agent_id:
print(f"I am now the leader (term {state.term})")
become_active()
else:
print(f"Standby — leader is {state.leader_agent_id}")
become_standby()
with client.watch_election("orchestrator:pipeline-A", on_leader_change):
# blocks or does standby work — on_leader_change fires whenever
# leadership changes for this group
wait_indefinitely()
Data types
All methods return plain Python dataclasses, not proto objects.
from agentkeeper import (
AgentCard, # name, description, version, tools, metadata
AgentInfo, # agent_id, card, status, registered_at, last_heartbeat_at
AgentStatus, # ALIVE | UNREACHABLE | DEAD
Lease, # task_id, holder_agent_id, token, acquired_at, expires_at
ElectionState, # group, leader_agent_id, term, elected_at
Policy, # id, subject, action, resource, effect
PolicyEffect, # ALLOW | DENY
CheckResult, # allowed, reason
)
Error handling
The SDK surfaces gRPC errors as grpc.RpcError. The most common cases:
import grpc
try:
lease.renew()
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.PERMISSION_DENIED:
print("Token mismatch — lease was taken by another agent")
elif e.code() == grpc.StatusCode.NOT_FOUND:
print("Agent or lease no longer exists")
else:
raise
Running the tests
The test suite requires a running daemon:
# Terminal 1
./agentkeeper
# Terminal 2
cd sdk/python
uv run pytest tests/ -v
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file agentkeeper-0.1.0.tar.gz.
File metadata
- Download URL: agentkeeper-0.1.0.tar.gz
- Upload date:
- Size: 47.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.8.22
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
43236e2b808d7521a3e44805c13c44f9e9d11ff82d5054361b3e69769d331c0e
|
|
| MD5 |
d298ce570ab02d7e79a4637ef77d7f26
|
|
| BLAKE2b-256 |
ad3f3bb11f72503d47f8449d2bbabaaadcc76e7a2979df3325c94b38cabc3e76
|
File details
Details for the file agentkeeper-0.1.0-py3-none-any.whl.
File metadata
- Download URL: agentkeeper-0.1.0-py3-none-any.whl
- Upload date:
- Size: 28.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.8.22
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
049d124dff92259827df53bd183bb9a362d8313c6af42d7651c6a6179def3715
|
|
| MD5 |
edf7cccbecb0c103beae099233b491e2
|
|
| BLAKE2b-256 |
57b080057d6991acfad5ef2dc96ab2cf20be6e3034a6e5b95d58664c89e417bd
|