Skip to main content

Coordination infrastructure for AI agents — registry, leases, leader election, and entitlements over gRPC

Project description

AgentKeeper Python SDK

Coordination infrastructure for AI agent systems. AgentKeeper gives your agents a shared nervous system: they can register themselves, discover each other, claim exclusive tasks, elect a leader, and enforce who is allowed to do what — all over a single gRPC connection.

What it does

Feature What it solves
Agent Registry Agents announce themselves with a description and tool list. Dead agents are detected automatically via heartbeat timeouts.
Semantic Search Find the right agent for a task using a natural language query.
Lease Manager Guarantees exactly one agent works on a given task at a time. Leases expire automatically if an agent crashes.
Leader Election Multiple orchestrator instances stand by; only one leads. Failover is automatic when the leader dies.
Entitlements ABAC policy engine. Controls which agents can call which tools. DENY rules always override ALLOW.

Installation

pip install agentkeeper

The SDK connects to a running AgentKeeper daemon. Download the binary for your platform from the releases page and run it:

./agentkeeper   # listens on :7070 by default

Quick start

The typical flow has two sides: an orchestrator that creates tasks and assigns them, and agents that register and do the work.

import uuid
from agentkeeper import AgentKeeperClient, AgentInfo, WatchEvent

client = AgentKeeperClient(host="localhost", port=7070)

# ── Orchestrator side ──────────────────────────────────────────────────────

# React to events in real time — no polling needed
def on_event(event: WatchEvent):
    if event.topic == "agent.died":
        agent: AgentInfo = event.payload
        print(f"Agent {agent.card.name} died — reassigning its tasks")
        reassign_tasks_for(agent.agent_id)

sub = client.watch(["agent."], on_event)

# Create a task — task_id is whatever ID your system uses (database row,
# queue message ID, UUID). AgentKeeper doesn't create tasks; it coordinates
# who works on them.
task_id = f"task:{uuid.uuid4()}"

# Find the best available agent for the work
agents = client.find(query="fetch and parse web pages", tools=["browser"])
for agent in agents:
    lease = client.acquire_lease(task_id, agent.agent_id, ttl=120)
    if lease:
        print(f"Assigned {task_id} to {agent.card.name}")
        send_task_to_agent(agent.agent_id, task_id)
        break  # one winner, everyone else is locked out

# ── Agent side ─────────────────────────────────────────────────────────────

with client.register(
    name="scraper-1",
    description="Fetches and parses web pages",
    version="1.0.0",
    tools=["browser", "http_client"],
) as agent:
    # Heartbeat runs automatically in the background.
    # The agent stays ALIVE as long as this block is running.

    task_id = receive_task_assignment()  # from orchestrator

    with client.acquire_lease(task_id, agent.agent_id, ttl=120) as lease:
        do_work(task_id)
    # lease released automatically on exit

# Agent deregistered automatically on exit — triggers agent.died on the bus

Connecting

Sidecar mode — connect to a daemon that is already running:

# Default: localhost:7070
client = AgentKeeperClient()

# Custom host/port
client = AgentKeeperClient(host="agentkeeper.internal", port=7070)

# Use as a context manager to close the channel on exit
with AgentKeeperClient() as client:
    ...

Embedded mode — the SDK finds and starts the daemon for you. Useful for local development and tests. The daemon process is shut down automatically when the client closes:

# SDK locates the binary and starts it on a free port
with AgentKeeperClient.embedded() as client:
    with client.register("my-agent", ...) as agent:
        ...
# daemon is stopped here

# Explicit binary path or port
client = AgentKeeperClient.embedded(
    binary="/usr/local/bin/agentkeeper",
    port=9090,
)

# With persistence — policies and agent history survive restarts
client = AgentKeeperClient.embedded(
    data_dir="/var/lib/myapp/agentkeeper",
)

Binary resolution order for embedded mode:

  1. binary= argument
  2. AGENTKEEPER_BIN environment variable
  3. agentkeeper on PATH
  4. ./daemon/agentkeeper relative to the project root (dev layout)

Dashboard and metrics

The daemon exposes an HTTP server on port 9090 (configurable with --metrics-addr):

Endpoint What it returns
http://localhost:9090/ Live dashboard — agents alive, leases active. Auto-refreshes every 5 seconds.
http://localhost:9090/healthz ok with HTTP 200 — for K8s liveness probes and load balancers.
http://localhost:9090/metrics Prometheus text format.

Open http://localhost:9090 in your browser while the daemon is running to see the dashboard.

Prometheus metrics exposed:

Metric Type Description
agentkeeper_agents_alive Gauge Agents currently ALIVE or UNREACHABLE
agentkeeper_leases_active Gauge Leases currently held
agentkeeper_rpc_total{method="..."} Counter gRPC calls served per method

Agent Registry

Register an agent

with client.register(
    name="analyst-1",          # unique name within your deployment
    description="Reads data and produces summaries using GPT-4",
    version="2.1.0",
    tools=["sql_runner", "pdf_reader"],
    metadata={"team": "data", "region": "us-east"},
    ttl=30,                    # seconds before heartbeat is required
) as agent:
    print(agent.agent_id)
    # background thread sends Heartbeat every ttl/2 seconds automatically

The ttl controls how quickly a crashed agent is detected. An agent that stops heartbeating transitions ALIVE → UNREACHABLE → DEAD. Once DEAD, all its leases are released and it is removed from elections.

Find agents

# Semantic search — returns agents ranked by relevance to the query
agents = client.find(query="fetch and parse web pages", limit=5)

# Hard filter by tools — only returns agents that have ALL listed tools
agents = client.find(tools=["browser", "http_client"])

# Combine: semantic ranking within the tool-filtered set
agents = client.find(
    query="scrape structured data from HTML",
    tools=["browser"],
    limit=3,
)

for agent in agents:
    print(agent.agent_id, agent.card.name, agent.status)

Inspect an agent

from agentkeeper import AgentStatus

info = client.get_agent(agent_id)
print(info.card.name)          # "scraper-1"
print(info.status)             # AgentStatus.ALIVE
print(info.registered_at)      # datetime
print(info.last_heartbeat_at)  # datetime

Lease Manager

Leases solve the double-execution problem: when two agents race to pick up the same task, only one wins. The winning agent holds an exclusive lock until the work is done, the TTL expires, or the agent dies.

Acquire a lease

lease = client.acquire_lease(
    task_id="task:process-invoice-42",  # any string key
    agent_id=agent.agent_id,
    ttl=120,                            # seconds
)

if lease is None:
    print("Another agent already owns this task")
else:
    print(f"I own it. Token: {lease.lease.token}")
    lease.release()

Use as a context manager (recommended)

lease = client.acquire_lease("task:process-invoice-42", agent.agent_id)
if lease:
    with lease:
        do_work()
    # lease.release() called automatically on exit, even on exception

Renew a lease for long-running tasks

with client.acquire_lease("task:long-job", agent.agent_id, ttl=60) as lease:
    for chunk in process_large_file():
        lease.renew(ttl=60)  # reset the TTL after each chunk
        process(chunk)

Typical orchestrator pattern

agents = client.find(query="process invoices", tools=["pdf_reader"])

for agent in agents:
    lease = client.acquire_lease("task:invoice-42", agent.agent_id)
    if lease:
        # This agent won the race — assign the work
        assign_task(agent.agent_id, "task:invoice-42")
        break

Leader Election

Use leader election when you run multiple orchestrator instances for high availability. Only one is active at a time; the others are hot standbys.

with client.register("orchestrator", "Pipeline orchestrator", "1.0.0") as orch:

    state = client.campaign(group="orchestrator:pipeline-A", agent_id=orch.agent_id)

    if state.leader_agent_id == orch.agent_id:
        print("I am the leader — running the pipeline")
        run_pipeline()
    else:
        print(f"Standby — leader is {state.leader_agent_id}")
        wait_for_leadership_change()

When the leader's agent dies (crash, deregister(), or heartbeat timeout), the daemon immediately re-elects from the remaining candidates and increments term. A new candidate joining never displaces the current leader — only death or resign() does.

# Graceful handoff: resign so a standby takes over immediately
client.resign(group="orchestrator:pipeline-A", agent_id=orch.agent_id)
# Check the current leader without campaigning
state = client.campaign("orchestrator:pipeline-A", orch.agent_id)
print(state.leader_agent_id)
print(state.term)        # logical clock — increments on every election event
print(state.elected_at)  # datetime

Entitlements

The entitlements engine controls which agents can perform which actions on which resources. All three fields (subject, action, resource) support * as a wildcard.

Set policies

from agentkeeper import PolicyEffect

# Allow the orchestrator to call any tool
client.set_policy(
    id="allow-orch-all",
    subject="agent:orchestrator",
    action="call",
    resource="tool:*",
    effect=PolicyEffect.ALLOW,
)

# Deny the scraper from writing files (even if a broader allow exists)
client.set_policy(
    id="deny-scraper-write",
    subject="agent:scraper",
    action="call",
    resource="tool:write_file",
    effect=PolicyEffect.DENY,
)

DENY always wins. If both an ALLOW and a DENY match the same request, the request is rejected.

Check a policy

result = client.check_policy(
    subject="agent:scraper",
    action="call",
    resource="tool:write_file",
)
print(result.allowed)  # False
print(result.reason)   # 'denied by policy "deny-scraper-write"'

Use check_policy in your tool handlers before executing any action:

def call_tool(agent_name: str, tool: str):
    result = client.check_policy(
        subject=f"agent:{agent_name}",
        action="call",
        resource=f"tool:{tool}",
    )
    if not result.allowed:
        raise PermissionError(result.reason)
    run_tool(tool)

List and delete policies

# All policies
policies = client.list_policies()

# Filtered by subject prefix
scraper_policies = client.list_policies(subject_prefix="agent:scraper")

# Delete by id
client.delete_policy("deny-scraper-write")

Event Bus

The event bus is the reactive layer. Instead of polling get_agent() in a loop to detect deaths or checking election state on a timer, you subscribe once and receive events as they happen.

watch()

from agentkeeper import WatchEvent, AgentInfo, Lease

def on_event(event: WatchEvent):
    print(event.topic)      # e.g. "agent.died"
    print(event.sequence)   # globally monotonic — detect gaps for replay
    print(event.timestamp)  # datetime (UTC)
    print(event.payload)    # AgentInfo | Lease | ElectionState | None

# Subscribe to a topic prefix — "agent." matches agent.registered,
# agent.died, agent.unreachable
sub = client.watch(["agent."], on_event)

# Subscribe to multiple prefixes
sub = client.watch(["agent.", "lease."], on_event)

# Empty list = all topics
sub = client.watch([], on_event)

# Stop receiving events
sub.cancel()

Use as a context manager to automatically cancel on exit:

with client.watch(["agent.died"], on_event):
    run_pipeline()
# subscription cancelled here

Built-in topics and their payload types:

Topic Payload type Fired when
agent.registered AgentInfo An agent calls register()
agent.unreachable AgentInfo First missed heartbeat deadline
agent.died AgentInfo Agent dead — deregistered or heartbeat timeout
lease.acquired Lease An agent wins a lease
lease.renewed Lease An agent renews its lease
lease.expired Lease A lease TTL elapsed with no renewal
lease.released Lease An agent explicitly released its lease
leader.changed ElectionState Leadership changed in any election group

Practical orchestrator pattern

import uuid
from agentkeeper import WatchEvent, AgentInfo, Lease

# task registry — in production this would be a database
active_tasks: dict[str, str] = {}  # task_id → agent_id

def on_agent_died(event: WatchEvent):
    dead_agent: AgentInfo = event.payload
    # find all tasks held by this agent and reassign them
    for task_id, holder in list(active_tasks.items()):
        if holder == dead_agent.agent_id:
            reassign(task_id)

def on_lease_expired(event: WatchEvent):
    lease: Lease = event.payload
    # a task went unfinished — put it back in the queue
    requeue(lease.task_id)

def on_event(event: WatchEvent):
    if event.topic == "agent.died":
        on_agent_died(event)
    elif event.topic == "lease.expired":
        on_lease_expired(event)

with client.watch(["agent.died", "lease.expired"], on_event):
    run_forever()

watch_election()

Standby orchestrators use watch_election() to know the moment they become leader — no polling, no delay:

my_agent_id = agent.agent_id

def on_leader_change(state):
    if state.leader_agent_id == my_agent_id:
        print(f"I am now the leader (term {state.term})")
        become_active()
    else:
        print(f"Standby — leader is {state.leader_agent_id}")
        become_standby()

with client.watch_election("orchestrator:pipeline-A", on_leader_change):
    # blocks or does standby work — on_leader_change fires whenever
    # leadership changes for this group
    wait_indefinitely()

Data types

All methods return plain Python dataclasses, not proto objects.

from agentkeeper import (
    AgentCard,      # name, description, version, tools, metadata
    AgentInfo,      # agent_id, card, status, registered_at, last_heartbeat_at
    AgentStatus,    # ALIVE | UNREACHABLE | DEAD
    Lease,          # task_id, holder_agent_id, token, acquired_at, expires_at
    ElectionState,  # group, leader_agent_id, term, elected_at
    Policy,         # id, subject, action, resource, effect
    PolicyEffect,   # ALLOW | DENY
    CheckResult,    # allowed, reason
)

Error handling

The SDK surfaces gRPC errors as grpc.RpcError. The most common cases:

import grpc

try:
    lease.renew()
except grpc.RpcError as e:
    if e.code() == grpc.StatusCode.PERMISSION_DENIED:
        print("Token mismatch — lease was taken by another agent")
    elif e.code() == grpc.StatusCode.NOT_FOUND:
        print("Agent or lease no longer exists")
    else:
        raise

Running the tests

The test suite requires a running daemon:

# Terminal 1
./agentkeeper

# Terminal 2
cd sdk/python
uv run pytest tests/ -v

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

agentkeeper-0.1.0.tar.gz (47.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

agentkeeper-0.1.0-py3-none-any.whl (28.3 kB view details)

Uploaded Python 3

File details

Details for the file agentkeeper-0.1.0.tar.gz.

File metadata

  • Download URL: agentkeeper-0.1.0.tar.gz
  • Upload date:
  • Size: 47.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.8.22

File hashes

Hashes for agentkeeper-0.1.0.tar.gz
Algorithm Hash digest
SHA256 43236e2b808d7521a3e44805c13c44f9e9d11ff82d5054361b3e69769d331c0e
MD5 d298ce570ab02d7e79a4637ef77d7f26
BLAKE2b-256 ad3f3bb11f72503d47f8449d2bbabaaadcc76e7a2979df3325c94b38cabc3e76

See more details on using hashes here.

File details

Details for the file agentkeeper-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for agentkeeper-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 049d124dff92259827df53bd183bb9a362d8313c6af42d7651c6a6179def3715
MD5 edf7cccbecb0c103beae099233b491e2
BLAKE2b-256 57b080057d6991acfad5ef2dc96ab2cf20be6e3034a6e5b95d58664c89e417bd

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page