Skip to main content

Out-of-the-box NATS-based agent-to-agent (A2A) protocol and Python client.

Project description

ana — out-of-the-box NATS agent-to-agent bus

A small Python package that gives you the boring parts of an A2A (agent-to-agent) protocol so you can focus on what your agents actually do:

  • Envelope schemas — typed Query / Reply / Ack / Discovery / Pulse pydantic models, one canonical wire format. Every envelope carries optional clock + host so receivers can anchor freshness without trusting conversation history.
  • Subject convention<prefix>.<agent>.<verb>.<topic>, prefix configurable per fleet, parseable both directions.
  • Async client (AgentBus) — thin wrapper on nats-py with publish auditing, a request/reply helper that dodges the core-NATS "no-sub means lost" gotcha, and an InboundContext that handlers can use to reply / ack without juggling state.
  • CLI (ana)listen / send / probe so shell scripts and operators don't need to import Python.

Built on top of existing NATS — bring your own cluster, anonymous or auth, core or JetStream. ana does not deploy anything.

Install

pip install a2o-ana          # PyPI distribution
# or, from source:
pip install -e .[dev]

(PyPI distribution is a2o-ana because the bare ana name was squatted in 2015. The import name stays anafrom ana import AgentBus.)

5-line bot

import asyncio
from ana import AgentBus

async def main():
    async with AgentBus("my-bot") as bus:
        await bus.query("other-bot", "what's your status?")

asyncio.run(main())

Responding bot

import asyncio
from ana import AgentBus, Query
from ana.client import InboundContext

async def handler(env, ctx: InboundContext):
    if isinstance(env, Query):
        await ctx.reply({"status": "alive", "answer": env.query})

async def main():
    async with AgentBus("my-bot") as bus:
        await bus.subscribe_to_me(handler)
        await bus.announce_discovery(role="worker")
        await asyncio.Event().wait()  # serve forever

asyncio.run(main())

Request/reply with timeout

query_and_wait subscribes to the responder's reply subject before publishing the query — so the core-NATS "publish-into-the-void" gotcha can't bite you here.

reply = await bus.query_and_wait("other-bot", "status?", timeout_s=5.0)
if reply is None:
    print("peer didn't answer in time")
else:
    print(reply.data)

CLI

# Listen for everything addressed to you
ana --identity miraku-home listen --scope self

# Send a query
ana --identity caller send query --to other-bot --query 'status?' --topic default

# Probe (send + wait for one reply)
ana --identity caller probe other-bot --query 'status?' --timeout 5

Pass --prefix myfleet.v1 to switch namespaces. Pass --nats nats://10.0.0.1:4222 to point at a non-localhost server. Pass --audit-log /var/log/ana.jsonl to record every publish.

Subject convention

Default scheme: <prefix>.<agent>.<verb>.<topic> where <topic> is optional. Verbs are query, reply, ack, discovery, policy.

cc.fleet.alice.query.status
cc.fleet.alice.reply.status
cc.fleet.alice.ack.status
cc.fleet.alice.discovery
cc.fleet.alice.policy

Pick any prefix — change SubjectScheme(prefix=...) and you're done.

Envelope schemas

All five shapes share from (sender identity), ts (ISO-8601 UTC), and type (used as the discriminator on parse), plus optional clock (local_time, uptime_s) and host auto-stamped by AgentBus. See docs/protocol.md for the full spec.

{"type": "query",     "from": "alice", "to": "bob", "query": "status?",
 "fields": ["uptime", "version"], "request_id": "ab12", "ts": "..."}

{"type": "reply",     "from": "bob", "reply_for": "cc.fleet.bob.query.status",
 "request_id": "ab12", "data": {"alive": true}, "ts": "..."}

{"type": "ack",       "from": "carol", "ack_for": "cc.fleet.carol.query.status",
 "alive": true, "ts": "..."}

{"type": "discovery", "from": "dave", "role": "worker",
 "subjects_owned": ["cc.fleet.dave.>"], "capabilities": {"gpu": "A100"}, "ts": "..."}

{"type": "pulse", "from": "eve", "activity": "training",
 "state": "step=4200,ce=0.83",
 "clock": {"local_time": "2026-05-14T15:35:00+09:00", "uptime_s": 3600},
 "host": "claw-0001", "ts": "..."}

Core NATS gotchas

ana defaults to core NATS (no JetStream). Operational implications worth knowing up front:

  • A publish made while the target has no active subscription is lost forever. There's no replay or store-and-forward.
  • query_and_wait() subscribes-then-publishes, which dodges this for the request/reply pattern.
  • For fire-and-forget broadcasts where loss matters, point ana at a JetStream-enabled NATS and ensure the target has a durable consumer before you publish. ana itself doesn't manage streams; do that with nats stream add.

See docs/protocol.md for envelope details and docs/nats-notes.md for deployment notes.

Examples

Tests

pip install -e .[dev]
pytest                          # unit tests (no NATS needed)
nats-server &                   # start a local server
ANA_NATS_URL=nats://127.0.0.1:4222 pytest tests/test_bus_integration.py

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

a2o_ana-0.5.3.tar.gz (25.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

a2o_ana-0.5.3-py3-none-any.whl (23.4 kB view details)

Uploaded Python 3

File details

Details for the file a2o_ana-0.5.3.tar.gz.

File metadata

  • Download URL: a2o_ana-0.5.3.tar.gz
  • Upload date:
  • Size: 25.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.7 {"installer":{"name":"uv","version":"0.11.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"13","id":"trixie","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for a2o_ana-0.5.3.tar.gz
Algorithm Hash digest
SHA256 a1982b5e32ca1388eebcfa0f1e9efacd07fc1391b2451392b69fe8c67a1720a5
MD5 3ef728f5e90a9eacd4fd3bf17af5e177
BLAKE2b-256 ef1f04e29a0867037b9167608c7added5196a60ac79faf699ae5ee78f5c21a6d

See more details on using hashes here.

File details

Details for the file a2o_ana-0.5.3-py3-none-any.whl.

File metadata

  • Download URL: a2o_ana-0.5.3-py3-none-any.whl
  • Upload date:
  • Size: 23.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.7 {"installer":{"name":"uv","version":"0.11.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"13","id":"trixie","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for a2o_ana-0.5.3-py3-none-any.whl
Algorithm Hash digest
SHA256 5b02c820a7d9b1687cc17e0603373f55317cae74fb650e9cba5ca278025486da
MD5 33ce63c08a374da4e468d0039b9e678c
BLAKE2b-256 f3b13edbdcbbbbedce0ba6cc5c3218793167c77214dd85321aa57fa9513f7e8e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page