Skip to main content

Out-of-the-box NATS-based agent-to-agent (A2A) protocol and Python client.

Project description

ana — out-of-the-box NATS agent-to-agent bus

A small Python package that gives you the boring parts of an A2A (agent-to-agent) protocol so you can focus on what your agents actually do:

  • Envelope schemas — typed Query / Reply / Ack / Discovery / Pulse pydantic models, one canonical wire format. Every envelope carries optional clock + host so receivers can anchor freshness without trusting conversation history.
  • Subject convention<prefix>.<agent>.<verb>.<topic>, prefix configurable per fleet, parseable both directions.
  • Async client (AgentBus) — thin wrapper on nats-py with publish auditing, a request/reply helper that dodges the core-NATS "no-sub means lost" gotcha, and an InboundContext that handlers can use to reply / ack without juggling state.
  • CLI (ana)listen / send / probe so shell scripts and operators don't need to import Python.

Built on top of existing NATS — bring your own cluster, anonymous or auth, core or JetStream. ana does not deploy anything.

Install

pip install a2o-ana          # PyPI distribution
# or, from source:
pip install -e .[dev]

(PyPI distribution is a2o-ana because the bare ana name was squatted in 2015. The import name stays anafrom ana import AgentBus.)

5-line bot

import asyncio
from ana import AgentBus

async def main():
    async with AgentBus("my-bot") as bus:
        await bus.query("other-bot", "what's your status?")

asyncio.run(main())

Responding bot

import asyncio
from ana import AgentBus, Query
from ana.client import InboundContext

async def handler(env, ctx: InboundContext):
    if isinstance(env, Query):
        await ctx.reply({"status": "alive", "answer": env.query})

async def main():
    async with AgentBus("my-bot") as bus:
        await bus.subscribe_to_me(handler)
        await bus.announce_discovery(role="worker")
        await asyncio.Event().wait()  # serve forever

asyncio.run(main())

Request/reply with timeout

query_and_wait subscribes to the responder's reply subject before publishing the query — so the core-NATS "publish-into-the-void" gotcha can't bite you here.

reply = await bus.query_and_wait("other-bot", "status?", timeout_s=5.0)
if reply is None:
    print("peer didn't answer in time")
else:
    print(reply.data)

CLI

# Listen for everything addressed to you
ana --identity miraku-home listen --scope self

# Send a query
ana --identity caller send query --to other-bot --query 'status?' --topic default

# Probe (send + wait for one reply)
ana --identity caller probe other-bot --query 'status?' --timeout 5

Pass --prefix myfleet.v1 to switch namespaces. Pass --nats nats://10.0.0.1:4222 to point at a non-localhost server. Pass --audit-log /var/log/ana.jsonl to record every publish.

Subject convention

Default scheme: <prefix>.<agent>.<verb>.<topic> where <topic> is optional. Verbs are query, reply, ack, discovery, policy.

cc.fleet.alice.query.status
cc.fleet.alice.reply.status
cc.fleet.alice.ack.status
cc.fleet.alice.discovery
cc.fleet.alice.policy

Pick any prefix — change SubjectScheme(prefix=...) and you're done.

Envelope schemas

All five shapes share from (sender identity), ts (ISO-8601 UTC), and type (used as the discriminator on parse), plus optional clock (local_time, uptime_s) and host auto-stamped by AgentBus. See docs/protocol.md for the full spec.

{"type": "query",     "from": "alice", "to": "bob", "query": "status?",
 "fields": ["uptime", "version"], "request_id": "ab12", "ts": "..."}

{"type": "reply",     "from": "bob", "reply_for": "cc.fleet.bob.query.status",
 "request_id": "ab12", "data": {"alive": true}, "ts": "..."}

{"type": "ack",       "from": "carol", "ack_for": "cc.fleet.carol.query.status",
 "alive": true, "ts": "..."}

{"type": "discovery", "from": "dave", "role": "worker",
 "subjects_owned": ["cc.fleet.dave.>"], "capabilities": {"gpu": "A100"}, "ts": "..."}

{"type": "pulse", "from": "eve", "activity": "training",
 "state": "step=4200,ce=0.83",
 "clock": {"local_time": "2026-05-14T15:35:00+09:00", "uptime_s": 3600},
 "host": "claw-0001", "ts": "..."}

Core NATS gotchas

ana defaults to core NATS (no JetStream). Operational implications worth knowing up front:

  • A publish made while the target has no active subscription is lost forever. There's no replay or store-and-forward.
  • query_and_wait() subscribes-then-publishes, which dodges this for the request/reply pattern.
  • For fire-and-forget broadcasts where loss matters, point ana at a JetStream-enabled NATS and ensure the target has a durable consumer before you publish. ana itself doesn't manage streams; do that with nats stream add.

See docs/protocol.md for envelope details and docs/nats-notes.md for deployment notes.

Examples

Tests

pip install -e .[dev]
pytest                          # unit tests (no NATS needed)
nats-server &                   # start a local server
ANA_NATS_URL=nats://127.0.0.1:4222 pytest tests/test_bus_integration.py

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

a2o_ana-0.3.0.tar.gz (19.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

a2o_ana-0.3.0-py3-none-any.whl (18.8 kB view details)

Uploaded Python 3

File details

Details for the file a2o_ana-0.3.0.tar.gz.

File metadata

  • Download URL: a2o_ana-0.3.0.tar.gz
  • Upload date:
  • Size: 19.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.15

File hashes

Hashes for a2o_ana-0.3.0.tar.gz
Algorithm Hash digest
SHA256 0b8acf03c2845a7889ab23151dc45e7f715610b874020f5d15cc1d2f8ec7ad02
MD5 5ea4d0e9a26a2d803274e5864cbd9aff
BLAKE2b-256 6962e3b139d64d43822590480224a89aedc7c9df69e58f5ceed19490b7829cf4

See more details on using hashes here.

File details

Details for the file a2o_ana-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: a2o_ana-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 18.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.15

File hashes

Hashes for a2o_ana-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 f6cfd3f473464dbe9f3a5563c8ca771f634f65551f9ad680fc1405dfc05aa576
MD5 b290c908f1b2673fb2a2c2cbba5c89e4
BLAKE2b-256 0bf2ff58286b663f936654b8eab9f35b1637b9e8bd00ccf326a6477e9c636841

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page