Skip to main content

Official Python SDK for the AgentFM peer-to-peer compute mesh.

Project description

agentfm-sdk

Official Python SDK for AgentFM, a peer-to-peer compute mesh for containerized AI agents.

pip install agentfm-sdk

Distribution name vs import name. The package is published as agentfm-sdk on PyPI (the bare name agentfm is taken by an unrelated project), but you import it as agentfm in code. So pip install agentfm-sdk then from agentfm import AgentFMClient.

Typed sync and async clients, full OpenAI-compatible namespace, scatter-gather batch dispatch, signed webhook callbacks, and strict mypy compliance.

Hello World

from agentfm import AgentFMClient

with AgentFMClient(gateway_url="http://127.0.0.1:8080") as client:
    # 1. Discover: see every candidate with full metadata
    for w in client.workers.list(model="llama3.2"):
        print(f"{w.peer_id[:12]}...  {w.author!r}  load={w.current_tasks}/{w.max_tasks}")

    # 2. Pick: explicit choice (peer_id is the cryptographically verifiable identifier)
    worker = client.workers.list(model="llama3.2")[0]

    # 3. Dispatch: by peer_id, always
    result = client.tasks.run(
        worker_id=worker.peer_id,
        prompt="Draft a 200-word leave policy. Save it as policy.md to /tmp/output.",
    )

    print(result.text)
    print(f"\nDuration: {result.duration_seconds:.1f}s")

    # Anything the agent wrote to /tmp/output is auto-zipped, transferred,
    # and extracted client-side. result.artifacts is a list of pathlib.Path.
    for path in result.artifacts:
        print(f"  artifact: {path} ({path.stat().st_size} bytes)")

tasks.run returns a TaskResult with text, artifacts, worker_id, and duration_seconds. Anything else streams via tasks.stream or submit_async.

Streaming

for chunk in client.tasks.stream(worker_id=worker.peer_id, prompt="..."):
    if chunk.kind == "text":
        print(chunk.text, end="", flush=True)

chunk.kind is "text" for normal stdout or "marker" for internal sentinels (artifact-incoming notifications). Most callers only care about "text".

Batch dispatch (scatter-gather)

For batch workloads, tasks.scatter runs many prompts across many peers concurrently, with automatic failover and retry. Results come back in submission order; failures surface as ScatterResult(status="failed") rather than raised exceptions, so a single bad prompt never breaks the batch.

prompts = [f"Summarise document #{i}" for i in range(50)]

# Spread across an explicit peer list
results = client.tasks.scatter(
    prompts,
    peer_ids=[w.peer_id for w in client.workers.list(available_only=True)],
    max_concurrency=8,
    max_retries=2,
)

for i, r in enumerate(results):
    if r.status == "success":
        print(f"[{i}] ok  ({len(r.text)} chars)")
    else:
        print(f"[{i}] failed: {r.error}")

If you want the SDK to discover the peers for you, use scatter_by_model:

results = client.tasks.scatter_by_model(
    prompts,
    model="llama3.2",          # filters workers by their advertised model
    max_workers=4,             # use at most 4 of the matching workers
    max_concurrency=8,         # at most 8 concurrent in-flight tasks
    max_retries=2,
)

pick= accepts a custom callable for fancier worker selection (e.g. lowest GPU usage):

results = client.tasks.scatter_by_model(
    prompts,
    model="flux.2",
    pick=lambda ws: sorted(ws, key=lambda w: w.gpu_usage_pct)[:3],
)

The worker_id field on each ScatterResult records which peer ultimately served (or failed) that prompt — useful for debugging hot spots.

OpenAI-compatible

Drop-in replacement for the OpenAI API on /v1/chat/completions, /v1/completions, and /v1/models. Point any existing OpenAI SDK at AgentFM by changing only the base URL and key.

# AgentFM's typed namespace (recommended for new code)
resp = client.openai.chat.completions.create(
    model=worker.peer_id,                    # peer_id pin-routes; any string also works
    messages=[{"role": "user", "content": "hi"}],
)
print(resp.choices[0].message.content)

# Streaming
for chunk in client.openai.chat.completions.create(
    model=worker.peer_id,
    messages=[{"role": "user", "content": "hi"}],
    stream=True,
):
    if chunk.choices and chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="", flush=True)

If you prefer the official openai package, it works unchanged:

from openai import OpenAI

client = OpenAI(base_url="http://127.0.0.1:8080/v1", api_key="your-key")
resp = client.chat.completions.create(
    model="llama3.2",
    messages=[{"role": "user", "content": "hi"}],
)

The hybrid model matcher tries three identifiers in order, first hit wins: PeerID exact match, AgentName (case-insensitive), Model engine (case-insensitive). Within a tier, least-loaded wins.

Authentication

from agentfm import AgentFMClient, AuthenticationError

# Three modes:
client = AgentFMClient(api_key="your-key")    # explicit
client = AgentFMClient()                       # falls back to AGENTFM_API_KEY env
client = AgentFMClient(api_key=None)           # explicit no-auth (skips env)

try:
    client.workers.list()
except AuthenticationError as e:
    print(e.code, e.status)  # "invalid_api_key" 401

with_options(api_key="other") derives a client with a different key; pass api_key=None to clear auth on the derived client. See the gateway-side Authentication docs for setting up AGENTFM_API_KEYS on the boss.

client.api_key is read-after-construction. The token is baked into the underlying httpx.Client.headers once at __init__. Mutating client.api_key = "new-key" afterwards does NOT update the request header. Use client.with_options(api_key="new-key") to derive a client with a new key. Matches the OpenAI Python SDK's behaviour.

Error handling

Every error is a typed subclass of AgentFMError. No raw httpx exceptions ever surface to user code, so a single except AgentFMError clause is sufficient if you don't need to discriminate:

from agentfm import (
    AgentFMError, AuthenticationError, GatewayConnectionError,
    ModelNotFoundError, MeshOverloadedError, WorkerNotFoundError,
    WorkerStreamError, WorkerUnreachableError, InvalidRequestError,
)

try:
    result = client.tasks.run(worker_id=peer_id, prompt="...")
except AuthenticationError:
    print("API key rejected; check AGENTFM_API_KEY")
except WorkerNotFoundError:
    print("That peer isn't in current telemetry")
except WorkerUnreachableError:
    print("Peer is online but the gateway can't dial it")
except MeshOverloadedError:
    print("All matching workers are at capacity; retry later")
except WorkerStreamError as e:
    print(f"Stream failed mid-task: {e.message}")
except AgentFMError as e:
    print(f"Other gateway error: code={e.code} status={e.status}")
Exception Raised when
AuthenticationError 401 from the gateway (missing/wrong bearer)
GatewayConnectionError Local gateway unreachable; mid-stream transport failure
GatewayInternalError Gateway returned an internal_error envelope
GatewayProtocolError Gateway response can't be decoded
WorkerNotFoundError Peer ID not in current telemetry
WorkerStreamError libp2p stream failed mid-task
WorkerUnreachableError Boss couldn't dial the worker
ModelNotFoundError No worker advertises the requested model
MeshOverloadedError All matching workers at capacity
InvalidRequestError 4xx from gateway (bad model field, malformed URL, etc.)
ArtifactError Zip extraction failed (corrupt / zip-slip / size cap)

Async

Mirror surface of the sync client. Every method exists with the same kwargs; just async/await instead of blocking.

import asyncio
from agentfm import AsyncAgentFMClient

async def main() -> None:
    async with AsyncAgentFMClient() as client:
        workers = await client.workers.list()
        worker = workers[0]
        async for chunk in client.tasks.stream(worker_id=worker.peer_id, prompt="hi"):
            if chunk.kind == "text":
                print(chunk.text, end="", flush=True)

asyncio.run(main())

Async streaming with the OpenAI namespace

The async OpenAI streaming pattern has one gotcha: create(stream=True) returns a coroutine you must await, then iterate with async for. Two steps, not one:

async with AsyncAgentFMClient() as client:
    stream = await client.openai.chat.completions.create(
        model=worker.peer_id,
        messages=[{"role": "user", "content": "hi"}],
        stream=True,
    )
    async for chunk in stream:
        if chunk.choices and chunk.choices[0].delta.content:
            print(chunk.choices[0].delta.content, end="", flush=True)

For early-exit cleanup (break out of the loop before EOF), wrap the stream in contextlib.aclosing so the underlying httpx response is released promptly:

from contextlib import aclosing

async with aclosing(client.tasks.stream(worker_id=peer_id, prompt="...")) as stream:
    async for chunk in stream:
        if want_to_stop:
            break  # response cleaned up via aclosing

Fire-and-forget with webhook

Submit long-running jobs and receive HMAC-signed callbacks when they complete:

from agentfm import WebhookReceiver, WebhookPayload

def on_done(payload: WebhookPayload) -> None:
    print(f"task {payload.task_id} -> {payload.status}")

# Verify HMAC-SHA256 signatures by setting AGENTFM_WEBHOOK_SECRET on the gateway
# and passing the same secret here. Constant-time comparison.
with WebhookReceiver(port=8000, callback=on_done, secret="shared-secret") as rx:
    ack = client.tasks.submit_async(
        worker_id=worker.peer_id,
        prompt="Long-running batch job",
        webhook_url="http://my-host.example.com:8000/cb",
    )
    print(f"queued task {ack.task_id}")
    input("Press Enter to stop receiver...\n")

The receiver enforces a 64 KiB body cap, validates Content-Type, and verifies HMAC in constant time. The gateway-side validator refuses webhook URLs pointing at loopback / link-local / RFC1918 addresses (set AGENTFM_WEBHOOK_ALLOW_PRIVATE=1 to opt back in for trusted private deploys).

Spin up an ephemeral gateway

For tests and notebooks, LocalMeshGateway boots a real agentfm -mode api subprocess, waits for it to become ready, and tears it down on context exit:

from agentfm import LocalMeshGateway, AgentFMClient

with LocalMeshGateway(port=8080) as gw:
    client = AgentFMClient(gateway_url=gw.url)
    workers = client.workers.list(model="llama3.2", wait_for_workers=1)
    print(workers)

If your gateway requires auth, pass api_key= to the gateway constructor; the readiness probe will use it.

Environment variables

Variable Used by Purpose
AGENTFM_API_KEY SDK clients Bearer token for outbound requests when api_key= is omitted
AGENTFM_API_KEYS Gateway (server-side) Comma-separated list of accepted bearer tokens
AGENTFM_ALLOW_UNAUTH_PUBLIC Gateway (server-side) 1 allows non-loopback --api-bind without AGENTFM_API_KEYS
AGENTFM_WEBHOOK_SECRET Gateway + WebhookReceiver HMAC-SHA256 signing secret for async webhook callbacks
AGENTFM_WEBHOOK_ALLOW_PRIVATE Gateway (server-side) 1 allows webhook URLs pointing at private/loopback addresses

CLI

agentfm-py models                          # list peers on the mesh
agentfm-py chat --peer 12D3KooW... --prompt "hi"

Why peer_id, not model name

In a federated mesh, anyone can advertise any name. agent_name and model are user-supplied strings with no uniqueness or authenticity guarantee. peer_id is the only cryptographically verifiable identifier. The SDK is built around this: tasks.run, tasks.stream, and tasks.submit_async accept peer_id only. The OpenAI namespace accepts any string for model (per OpenAI's spec), but emits a one-time warning recommending peer_id for production use.

The discovery → pick → dispatch flow (above) is the canonical pattern. See examples/06_list_and_pick_by_peer_id.py for three concrete patterns: browse-and-pick, fetch-by-ID, and pin-without-list-call.

Python and license

  • Requires Python 3.10+.
  • Apache 2.0 (matches the parent project).
  • Tested on macOS and Linux against CPython 3.10, 3.11, 3.12, 3.13.

Documentation

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

agentfm_sdk-0.3.0.tar.gz (43.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

agentfm_sdk-0.3.0-py3-none-any.whl (51.7 kB view details)

Uploaded Python 3

File details

Details for the file agentfm_sdk-0.3.0.tar.gz.

File metadata

  • Download URL: agentfm_sdk-0.3.0.tar.gz
  • Upload date:
  • Size: 43.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.9

File hashes

Hashes for agentfm_sdk-0.3.0.tar.gz
Algorithm Hash digest
SHA256 cc9b43c29ec32db6b8077c52be1dc7711e2c648806dc7f0b18d6b87b1fb4d37a
MD5 d756b10a8c15472c0d95a2430ad37cbd
BLAKE2b-256 9732951b71c2c946539fb33f4b5f2912d1ea3833af2bca16bd9c739499f8b04b

See more details on using hashes here.

File details

Details for the file agentfm_sdk-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: agentfm_sdk-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 51.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.9

File hashes

Hashes for agentfm_sdk-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 cbd330ec5725ec06312dd1b705f88a215625112752684751828ec16ae85682e6
MD5 cfbaf0ddb7d04cf098fc62ff4c7c5a7d
BLAKE2b-256 f26076fc9778b33415f43c6802196229a1dd321fbe329f7c10aaad4e7c7f5779

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page