Out-of-the-box NATS-based agent-to-agent (A2A) protocol and Python client.
Project description
ana — out-of-the-box NATS agent-to-agent bus
A small Python package that gives you the boring parts of an A2A (agent-to-agent) protocol so you can focus on what your agents actually do:
- Envelope schemas — typed
Query/Reply/Ack/Discovery/Pulsepydantic models, one canonical wire format. Every envelope carries optionalclock+hostso receivers can anchor freshness without trusting conversation history. - Subject convention —
<prefix>.<agent>.<verb>.<topic>, prefix configurable per fleet, parseable both directions. - Async client (
AgentBus) — thin wrapper onnats-pywith publish auditing, a request/reply helper that dodges the core-NATS "no-sub means lost" gotcha, and anInboundContextthat handlers can use to reply / ack without juggling state. - CLI (
ana) —listen/send/probeso shell scripts and operators don't need to import Python.
Built on top of existing NATS — bring your own cluster, anonymous or auth, core or JetStream. ana does not deploy anything.
Install
pip install a2o-ana # PyPI distribution
# or, from source:
pip install -e .[dev]
(PyPI distribution is a2o-ana because the bare ana name was squatted in 2015.
The import name stays ana — from ana import AgentBus.)
5-line bot
import asyncio
from ana import AgentBus
async def main():
async with AgentBus("my-bot") as bus:
await bus.query("other-bot", "what's your status?")
asyncio.run(main())
Responding bot
import asyncio
from ana import AgentBus, Query
from ana.client import InboundContext
async def handler(env, ctx: InboundContext):
if isinstance(env, Query):
await ctx.reply({"status": "alive", "answer": env.query})
async def main():
async with AgentBus("my-bot") as bus:
await bus.subscribe_to_me(handler)
await bus.announce_discovery(role="worker")
await asyncio.Event().wait() # serve forever
asyncio.run(main())
Request/reply with timeout
query_and_wait subscribes to the responder's reply subject before
publishing the query — so the core-NATS "publish-into-the-void" gotcha
can't bite you here.
reply = await bus.query_and_wait("other-bot", "status?", timeout_s=5.0)
if reply is None:
print("peer didn't answer in time")
else:
print(reply.data)
CLI
# Listen for everything addressed to you
ana --identity miraku-home listen --scope self
# Send a query
ana --identity caller send query --to other-bot --query 'status?' --topic default
# Probe (send + wait for one reply)
ana --identity caller probe other-bot --query 'status?' --timeout 5
Pass --prefix myfleet.v1 to switch namespaces. Pass
--nats nats://10.0.0.1:4222 to point at a non-localhost server. Pass
--audit-log /var/log/ana.jsonl to record every publish.
Subject convention
Default scheme: <prefix>.<agent>.<verb>.<topic> where <topic> is
optional. Verbs are query, reply, ack, discovery, policy.
cc.fleet.alice.query.status
cc.fleet.alice.reply.status
cc.fleet.alice.ack.status
cc.fleet.alice.discovery
cc.fleet.alice.policy
Pick any prefix — change SubjectScheme(prefix=...) and you're done.
Envelope schemas
All five shapes share from (sender identity), ts (ISO-8601 UTC),
and type (used as the discriminator on parse), plus optional
clock (local_time, uptime_s) and host auto-stamped by
AgentBus. See docs/protocol.md for the full
spec.
{"type": "query", "from": "alice", "to": "bob", "query": "status?",
"fields": ["uptime", "version"], "request_id": "ab12", "ts": "..."}
{"type": "reply", "from": "bob", "reply_for": "cc.fleet.bob.query.status",
"request_id": "ab12", "data": {"alive": true}, "ts": "..."}
{"type": "ack", "from": "carol", "ack_for": "cc.fleet.carol.query.status",
"alive": true, "ts": "..."}
{"type": "discovery", "from": "dave", "role": "worker",
"subjects_owned": ["cc.fleet.dave.>"], "capabilities": {"gpu": "A100"}, "ts": "..."}
{"type": "pulse", "from": "eve", "activity": "training",
"state": "step=4200,ce=0.83",
"clock": {"local_time": "2026-05-14T15:35:00+09:00", "uptime_s": 3600},
"host": "claw-0001", "ts": "..."}
Core NATS gotchas
ana defaults to core NATS (no JetStream). Operational implications worth knowing up front:
- A publish made while the target has no active subscription is lost forever. There's no replay or store-and-forward.
query_and_wait()subscribes-then-publishes, which dodges this for the request/reply pattern.- For fire-and-forget broadcasts where loss matters, point ana at a
JetStream-enabled NATS and ensure the target has a durable consumer
before you publish. ana itself doesn't manage streams; do that with
nats stream add.
See docs/protocol.md for envelope details and
docs/nats-notes.md for deployment notes.
Examples
examples/echo_bot.py— minimal responder.examples/two_bot_dialogue.py— two bots in one process exchanging query/reply.examples/send_query.py— one-shot probe.
Tests
pip install -e .[dev]
pytest # unit tests (no NATS needed)
nats-server & # start a local server
ANA_NATS_URL=nats://127.0.0.1:4222 pytest tests/test_bus_integration.py
License
MIT
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file a2o_ana-0.5.0.tar.gz.
File metadata
- Download URL: a2o_ana-0.5.0.tar.gz
- Upload date:
- Size: 22.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fb661db49ed4d82a934a6337db53aedd2788e7c078cbddd5ec9d7bb7c3df944c
|
|
| MD5 |
a291a49adeeb8602ef4203a254089c3e
|
|
| BLAKE2b-256 |
a5aee5ff0b92520a668aa38f173d1a194d6687db24e4f7adfaac56257f97d777
|
File details
Details for the file a2o_ana-0.5.0-py3-none-any.whl.
File metadata
- Download URL: a2o_ana-0.5.0-py3-none-any.whl
- Upload date:
- Size: 20.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f8e9145ff0ad5e41394416b93ed42bb5b08035a9f85e660b677c047d39f2da2f
|
|
| MD5 |
e33dc6495de177ffac7a366b93d70dd0
|
|
| BLAKE2b-256 |
1f3e0c0eaa33ef113877165c28df570beee0273669862687f56368f9b738b881
|