Skip to main content

Out-of-the-box NATS-based agent-to-agent (A2A) protocol and Python client.

Project description

ana — out-of-the-box NATS agent-to-agent bus

A small Python package that gives you the boring parts of an A2A (agent-to-agent) protocol so you can focus on what your agents actually do:

  • Envelope schemas — typed Query / Reply / Ack / Discovery / Pulse pydantic models, one canonical wire format. Every envelope carries optional clock + host so receivers can anchor freshness without trusting conversation history.
  • Subject convention<prefix>.<agent>.<verb>.<topic>, prefix configurable per fleet, parseable both directions.
  • Async client (AgentBus) — thin wrapper on nats-py with publish auditing, a request/reply helper that dodges the core-NATS "no-sub means lost" gotcha, and an InboundContext that handlers can use to reply / ack without juggling state.
  • CLI (ana)listen / send / probe so shell scripts and operators don't need to import Python.

Built on top of existing NATS — bring your own cluster, anonymous or auth, core or JetStream. ana does not deploy anything.

Install

pip install a2o-ana          # PyPI distribution
# or, from source:
pip install -e .[dev]

(PyPI distribution is a2o-ana because the bare ana name was squatted in 2015. The import name stays anafrom ana import AgentBus.)

5-line bot

import asyncio
from ana import AgentBus

async def main():
    async with AgentBus("my-bot") as bus:
        await bus.query("other-bot", "what's your status?")

asyncio.run(main())

Responding bot

import asyncio
from ana import AgentBus, Query
from ana.client import InboundContext

async def handler(env, ctx: InboundContext):
    if isinstance(env, Query):
        await ctx.reply({"status": "alive", "answer": env.query})

async def main():
    async with AgentBus("my-bot") as bus:
        await bus.subscribe_to_me(handler)
        await bus.announce_discovery(role="worker")
        await asyncio.Event().wait()  # serve forever

asyncio.run(main())

Request/reply with timeout

query_and_wait subscribes to the responder's reply subject before publishing the query — so the core-NATS "publish-into-the-void" gotcha can't bite you here.

reply = await bus.query_and_wait("other-bot", "status?", timeout_s=5.0)
if reply is None:
    print("peer didn't answer in time")
else:
    print(reply.data)

CLI

# Listen for everything addressed to you
ana --identity miraku-home listen --scope self

# Send a query
ana --identity caller send query --to other-bot --query 'status?' --topic default

# Probe (send + wait for one reply)
ana --identity caller probe other-bot --query 'status?' --timeout 5

Pass --prefix myfleet.v1 to switch namespaces. Pass --nats nats://10.0.0.1:4222 to point at a non-localhost server. Pass --audit-log /var/log/ana.jsonl to record every publish.

Subject convention

Default scheme: <prefix>.<agent>.<verb>.<topic> where <topic> is optional. Verbs are query, reply, ack, discovery, policy.

cc.fleet.alice.query.status
cc.fleet.alice.reply.status
cc.fleet.alice.ack.status
cc.fleet.alice.discovery
cc.fleet.alice.policy

Pick any prefix — change SubjectScheme(prefix=...) and you're done.

Envelope schemas

All five shapes share from (sender identity), ts (ISO-8601 UTC), and type (used as the discriminator on parse), plus optional clock (local_time, uptime_s) and host auto-stamped by AgentBus. See docs/protocol.md for the full spec.

{"type": "query",     "from": "alice", "to": "bob", "query": "status?",
 "fields": ["uptime", "version"], "request_id": "ab12", "ts": "..."}

{"type": "reply",     "from": "bob", "reply_for": "cc.fleet.bob.query.status",
 "request_id": "ab12", "data": {"alive": true}, "ts": "..."}

{"type": "ack",       "from": "carol", "ack_for": "cc.fleet.carol.query.status",
 "alive": true, "ts": "..."}

{"type": "discovery", "from": "dave", "role": "worker",
 "subjects_owned": ["cc.fleet.dave.>"], "capabilities": {"gpu": "A100"}, "ts": "..."}

{"type": "pulse", "from": "eve", "activity": "training",
 "state": "step=4200,ce=0.83",
 "clock": {"local_time": "2026-05-14T15:35:00+09:00", "uptime_s": 3600},
 "host": "claw-0001", "ts": "..."}

Core NATS gotchas

ana defaults to core NATS (no JetStream). Operational implications worth knowing up front:

  • A publish made while the target has no active subscription is lost forever. There's no replay or store-and-forward.
  • query_and_wait() subscribes-then-publishes, which dodges this for the request/reply pattern.
  • For fire-and-forget broadcasts where loss matters, point ana at a JetStream-enabled NATS and ensure the target has a durable consumer before you publish. ana itself doesn't manage streams; do that with nats stream add.

See docs/protocol.md for envelope details and docs/nats-notes.md for deployment notes.

Examples

Tests

pip install -e .[dev]
pytest                          # unit tests (no NATS needed)
nats-server &                   # start a local server
ANA_NATS_URL=nats://127.0.0.1:4222 pytest tests/test_bus_integration.py

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

a2o_ana-0.5.2.tar.gz (22.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

a2o_ana-0.5.2-py3-none-any.whl (20.8 kB view details)

Uploaded Python 3

File details

Details for the file a2o_ana-0.5.2.tar.gz.

File metadata

  • Download URL: a2o_ana-0.5.2.tar.gz
  • Upload date:
  • Size: 22.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.7 {"installer":{"name":"uv","version":"0.11.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"13","id":"trixie","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for a2o_ana-0.5.2.tar.gz
Algorithm Hash digest
SHA256 a8b6dbc46bc18fe31ada158efa6a839dbab89c6d83ec0bcaf3b9f16e8688c933
MD5 17132732ed9454ba5057f03fac5f44de
BLAKE2b-256 aee59225ad81c794440b44376992dc757cd7beba41d04e2c159a25bdbe8b5acb

See more details on using hashes here.

File details

Details for the file a2o_ana-0.5.2-py3-none-any.whl.

File metadata

  • Download URL: a2o_ana-0.5.2-py3-none-any.whl
  • Upload date:
  • Size: 20.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.7 {"installer":{"name":"uv","version":"0.11.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"13","id":"trixie","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for a2o_ana-0.5.2-py3-none-any.whl
Algorithm Hash digest
SHA256 ea6654d8dd85e6a1d267bc251d44fe16f4f7a4116dad20672970d0997a165c72
MD5 9306eb1e62899c61efc99091cf7b91a1
BLAKE2b-256 3c4acd289e092d55d9af6d00e073d9aece2c86053dbd03d0e1b3b515960ca2b5

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page