Skip to main content

Reference Python implementation of the Agent Runtime Control Protocol (ARCP) v1.1

Project description

ARCP Python SDK

Python SDK for the Agent Runtime Control Protocol (ARCP) — submit, observe, and control long-running agent jobs from Python.

PyPI Python versions CI codecov ARCP License

Specification · Concepts · Install · Quick start · Guides · API reference


arcp is the Python reference implementation of ARCP, the Agent Runtime Control Protocol. It covers both sides of the wire — arcp.client.ARCPClient for submitting and observing jobs, arcp.runtime.ARCPRuntime for hosting agents — so either side can talk to any conformant peer in any language without hand-rolling the envelope, sequencing, or lease enforcement.

ARCP itself is a transport-agnostic wire protocol for long-running AI agent jobs. It owns the parts of agent infrastructure that don't change between products — sessions, durable event streams, capability leases, budgets, resume — and stays out of the parts that do. ARCP wraps the agent function; it does not define how agents are built, how tools are exposed (that's MCP), or how telemetry is exported (that's OpenTelemetry).

Installation

Requires Python 3.11 or later. The package is published on PyPI as agentruntimecontrolprotocol; the import name stays arcp. The default install is the kitchen sink — client, runtime, transports, ASGI/aiohttp middleware, OpenTelemetry middleware, JWKS over HTTPS, and the arcp CLI — so everything works without thinking about extras.

# everything (default)
pip install agentruntimecontrolprotocol

# semantic intent markers (no-op extras — same deps as default,
# but useful for documenting which side of the wire a service uses
# in its lockfile or dependency manifest)
pip install "agentruntimecontrolprotocol[client]"
pip install "agentruntimecontrolprotocol[runtime]"

# pytest stack for running the test suite
pip install "agentruntimecontrolprotocol[test]"

Quick start

Connect to a runtime, submit a job, stream its events to completion:

import asyncio
import contextlib
import os

from arcp import ClientInfo, WebSocketTransport
from arcp.client import ARCPClient


async def main() -> None:
    client = ARCPClient(
        client=ClientInfo(name="quickstart", version="1.0.0"),
        token=os.environ["ARCP_TOKEN"],
    )
    async with contextlib.aclosing(client):
        transport = await WebSocketTransport.connect("wss://runtime.example.com/arcp")
        await client.connect(transport)

        handle = await client.submit(
            agent="data-analyzer",
            input={"dataset": "s3://example/sales.csv"},
            lease_request={"net.fetch": ["s3://example/**"]},
        )
        async for event in handle.events():
            print(f"[{event['kind']}]", event["body"])

        result = await handle.done
        print("final:", result.final_status, result.result)


asyncio.run(main())

This is the whole shape of the SDK: open a session, submit work, consume an ordered event stream, get a terminal result or error. Everything below is detail on those four moves.

Concepts

ARCP organizes everything around four concerns — identity, durability, authority, and observability — expressed through five core objects:

  • Session — a connection between a client and a runtime. A session carries identity (a bearer token), negotiates a feature set in a hello/welcome handshake, and is resumable: if the transport drops, you reconnect with a resume token and the runtime replays buffered events. Jobs outlive the session that started them. See §6.
  • Job — one unit of agent work submitted into a session. A job has an identity, an optional idempotency key, a resolved agent version, and a lifecycle that ends in exactly one terminal state: success, error, cancelled, or timed_out. See §7.
  • Event — the ordered, session-scoped stream a job emits: logs, thoughts, tool calls and results, status, metrics, artifact references, progress, and streamed result chunks. Events carry strictly monotonic sequence numbers so the stream survives reconnects gap-free. See §8.
  • Lease — the authority a job runs under, expressed as capability grants (fs.read, fs.write, net.fetch, tool.call, agent.delegate, cost.budget, model.use). The runtime enforces the lease at every operation boundary; a job can never act outside it. Leases may carry a budget and an expiry, and may be subset and handed to sub-agents via delegation. See §9.
  • Subscription — read-only attachment to a job started elsewhere (e.g. a dashboard watching a job a CLI submitted). A subscriber observes the live event stream but cannot cancel or mutate the job. Distinct from resume, which continues the original session and carries cancel authority. See §7.6.

The SDK models each of these as first-class objects; the rest of this README shows how.

Guides

Sessions and resume

Open a session, negotiate features, and reconnect transparently after a transport drop using the resume token — jobs keep running server-side while you're gone.

import asyncio
import contextlib
import os

from arcp import ClientInfo, SessionResume, WebSocketTransport
from arcp.client import ARCPClient

URL = "wss://runtime.example.com/arcp"
TOKEN = os.environ["ARCP_TOKEN"]


def new_client() -> ARCPClient:
    return ARCPClient(
        client=ClientInfo(name="resumable", version="1.0.0"),
        token=TOKEN,
    )


async def main() -> None:
    first = new_client()
    transport1 = await WebSocketTransport.connect(URL)
    welcome = await first.connect(transport1)
    session_id = welcome.session_id
    resume_token = welcome.resume_token

    handle = await first.submit(agent="long-running", input={})
    async for _ in handle.events():
        if first.latest_event_seq >= 2:
            break
    last_seq = first.latest_event_seq

    # Drop the transport without sending session.bye; the job keeps running.
    await transport1.close()

    second = new_client()
    async with contextlib.aclosing(second):
        transport2 = await WebSocketTransport.connect(URL)
        await second.resume(
            transport2,
            resume=SessionResume(
                session_id=session_id,
                resume_token=resume_token,
                last_event_seq=last_seq,
            ),
        )
        # The runtime replays every event with seq > last_seq, then resumes live streaming.


asyncio.run(main())

Submitting jobs

Submit a job with an agent (optionally version-pinned as name@version), an input, and an optional lease request, idempotency key, and runtime limit.

from datetime import UTC, datetime, timedelta

from arcp import LeaseConstraints

expires_at = (datetime.now(UTC) + timedelta(minutes=1)).isoformat().replace("+00:00", "Z")

handle = await client.submit(
    agent="weekly-report@2.1.0",
    input={"week": "2026-W19"},
    lease_request={"net.fetch": ["s3://reports/**"]},
    lease_constraints=LeaseConstraints(expires_at=expires_at),
    idempotency_key="weekly-report-2026-W19",
    max_runtime_sec=300,
)

print("job_id =", handle.job_id)
print("effective lease =", handle.lease)
print("resolved agent =", handle.agent_ref)

Consuming events

Iterate the ordered event stream — log, thought, tool_call, tool_result, status, metric, artifact_ref, progress, result_chunk — and optionally acknowledge progress so the runtime can release buffered events early.

from arcp import ClientInfo
from arcp.client import ARCPClient, AutoAckOptions

client = ARCPClient(
    client=ClientInfo(name="ack-demo", version="1.0.0"),
    token=TOKEN,
    # Coalesced session.ack: send when 32+ events have accrued, at most every 250 ms.
    auto_ack=AutoAckOptions(every_n=32, interval_sec=0.25),
)

handle = await client.submit(agent="chatty", input={})

async for event in handle.events():
    kind = event["kind"]
    body = event["body"]
    if kind == "log":
        print(body.get("message"))
    elif kind == "tool_call":
        print("->", body.get("name"), body.get("arguments"))
    elif kind == "metric":
        print("metric", body)
    elif kind == "progress":
        print("progress", body)
    # Or ack manually: await client.ack(client.latest_event_seq)

Leases and budgets

Request capabilities, a budget, and an expiry; read budget-remaining metrics as they arrive; handle the runtime's enforcement decisions.

from datetime import UTC, datetime, timedelta

from arcp import BudgetExhaustedError, LeaseConstraints, LeaseExpiredError

expires_at = (datetime.now(UTC) + timedelta(minutes=10)).isoformat().replace("+00:00", "Z")

handle = await client.submit(
    agent="web-research",
    input={"iterations": 8, "per_call_usd": 0.3},
    lease_request={
        "tool.call": ["search.*", "fetch.*"],
        "cost.budget": ["USD:1.00"],
    },
    lease_constraints=LeaseConstraints(expires_at=expires_at),
)

print("initial budget =", handle.budget)

try:
    async for event in handle.events():
        if event["kind"] == "metric" and event["body"].get("name") == "cost.budget.remaining":
            body = event["body"]
            print(f"budget remaining: {body['value']:.2f} {body.get('unit', '')}")
    await handle.done
except (BudgetExhaustedError, LeaseExpiredError) as err:
    # Never retryable: resubmit with a fresh lease or budget instead.
    print("job ended:", err.code, err.message)

Subscribing to jobs

Attach read-only to a job submitted elsewhere and observe its live stream (with optional history replay) without cancel authority.

from arcp import ClientInfo, ListJobsFilter, WebSocketTransport
from arcp.client import ARCPClient

observer = ARCPClient(
    client=ClientInfo(name="dashboard", version="1.0.0"),
    token=TOKEN,
    features=("list_jobs", "subscribe"),
)
await observer.connect(await WebSocketTransport.connect(URL))

listing = await observer.list_jobs(filter=ListJobsFilter(status=("running",)))
sub = await observer.subscribe(listing.jobs[0].job_id, history=True)
print(f"subscribed from seq={sub.subscribed_from} replayed={sub.replayed}")

async for event in sub.handle.events():
    print(f"[seq>{sub.subscribed_from}] {event['kind']}")

# ... later ...
await observer.unsubscribe(sub.job_id)

Error handling

Catch the typed error taxonomy and respect the retryable flag — LEASE_EXPIRED and BUDGET_EXHAUSTED are never retryable; a naive retry fails identically.

from arcp import ARCPError, BudgetExhaustedError, LeaseExpiredError

try:
    handle = await client.submit(agent="flaky", input={})
    await handle.done
except (LeaseExpiredError, BudgetExhaustedError):
    # Resubmit with a fresh lease / budget instead of retrying.
    raise
except ARCPError as err:
    if err.retryable:
        # Safe to retry with backoff (e.g. INTERNAL_ERROR, TIMEOUT).
        ...
    else:
        raise

Feature support

ARCP features this SDK negotiates during the hello/welcome handshake:

Feature flag Status
heartbeat Supported
ack Supported
list_jobs Supported
subscribe Supported
lease_expires_at Supported
cost.budget Supported
model.use Supported
provisioned_credentials Supported
progress Supported
result_chunk Supported
agent_versions Supported

Transport

ARCP is transport-agnostic. This SDK ships a WebSocket transport (default), an stdio transport for in-process child runtimes, and an in-memory transport for tests. WebSocket is the default for networked runtimes; stdio is used for in-process child runtimes. Select one by constructing the corresponding object (WebSocketTransport.connect(url), StdioTransport(...), pair_memory_transports()) and passing it to client.connect(transport); the arcp.middleware.asgi and arcp.middleware.aiohttp packages attach the WebSocket upgrade to Starlette, FastAPI, Litestar, Quart, or aiohttp.web servers.

API reference

Full API reference — every type, method, and event payload — is in docs/.

Versioning and compatibility

This SDK speaks ARCP v1.1 (draft). The SDK follows semantic versioning independently of the protocol; the protocol version it negotiates is shown above and in session.hello. A runtime advertising a different ARCP MAJOR is not guaranteed compatible. Feature mismatches degrade gracefully: the effective feature set is the intersection of what the client and runtime advertise, and the SDK will not use a feature outside it.

Contributing

See CONTRIBUTING.md. Protocol questions and proposed changes belong in the spec repository; SDK bugs and feature requests belong here.

License

Apache-2.0 — see LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

agentruntimecontrolprotocol-1.1.3.tar.gz (291.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

agentruntimecontrolprotocol-1.1.3-py3-none-any.whl (81.6 kB view details)

Uploaded Python 3

File details

Details for the file agentruntimecontrolprotocol-1.1.3.tar.gz.

File metadata

File hashes

Hashes for agentruntimecontrolprotocol-1.1.3.tar.gz
Algorithm Hash digest
SHA256 5f22bc46087a146d44eb59cffca2c71ba1339b5e306cc96a17fe2ab06c909ee7
MD5 16d0960d0a67f66645eadd6232e811a6
BLAKE2b-256 8820254faea7d05e52913a2b34ba17474b424cfe82aa7f11ab21abd7daeea173

See more details on using hashes here.

Provenance

The following attestation bundles were made for agentruntimecontrolprotocol-1.1.3.tar.gz:

Publisher: publish.yml on agentruntimecontrolprotocol/python-sdk

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file agentruntimecontrolprotocol-1.1.3-py3-none-any.whl.

File metadata

File hashes

Hashes for agentruntimecontrolprotocol-1.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 1f3ca226b9c9daa5e5eaad46b2991d292d5a52a5da0379ad413739d68b184df5
MD5 3d7c8ea9084d86bc1c99beffa19e674c
BLAKE2b-256 89e4d68640bfd56f9a1c2db7ff8d45e6215c4dc2b4619df5cff559b3efc7d23f

See more details on using hashes here.

Provenance

The following attestation bundles were made for agentruntimecontrolprotocol-1.1.3-py3-none-any.whl:

Publisher: publish.yml on agentruntimecontrolprotocol/python-sdk

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page