Skip to main content

Out-of-the-box NATS-based agent-to-agent (A2A) protocol and Python client.

Project description

ana — out-of-the-box NATS agent-to-agent bus

A small Python package that gives you the boring parts of an A2A (agent-to-agent) protocol so you can focus on what your agents actually do:

  • Envelope schemas — typed Query / Reply / Ack / Discovery / Pulse pydantic models, one canonical wire format. Every envelope carries optional clock + host so receivers can anchor freshness without trusting conversation history.
  • Subject convention<prefix>.<agent>.<verb>.<topic>, prefix configurable per fleet, parseable both directions.
  • Async client (AgentBus) — thin wrapper on nats-py with publish auditing, a request/reply helper that dodges the core-NATS "no-sub means lost" gotcha, and an InboundContext that handlers can use to reply / ack without juggling state.
  • CLI (ana)listen / send / probe so shell scripts and operators don't need to import Python.

Built on top of existing NATS — bring your own cluster, anonymous or auth, core or JetStream. ana does not deploy anything.

Install

pip install a2o-ana          # PyPI distribution
# or, from source:
pip install -e .[dev]

(PyPI distribution is a2o-ana because the bare ana name was squatted in 2015. The import name stays anafrom ana import AgentBus.)

5-line bot

import asyncio
from ana import AgentBus

async def main():
    async with AgentBus("my-bot") as bus:
        await bus.query("other-bot", "what's your status?")

asyncio.run(main())

Responding bot

import asyncio
from ana import AgentBus, Query
from ana.client import InboundContext

async def handler(env, ctx: InboundContext):
    if isinstance(env, Query):
        await ctx.reply({"status": "alive", "answer": env.query})

async def main():
    async with AgentBus("my-bot") as bus:
        await bus.subscribe_to_me(handler)
        await bus.announce_discovery(role="worker")
        await asyncio.Event().wait()  # serve forever

asyncio.run(main())

Request/reply with timeout

query_and_wait subscribes to the responder's reply subject before publishing the query — so the core-NATS "publish-into-the-void" gotcha can't bite you here.

reply = await bus.query_and_wait("other-bot", "status?", timeout_s=5.0)
if reply is None:
    print("peer didn't answer in time")
else:
    print(reply.data)

CLI

# Listen for everything addressed to you
ana --identity miraku-home listen --scope self

# Send a query
ana --identity caller send query --to other-bot --query 'status?' --topic default

# Probe (send + wait for one reply)
ana --identity caller probe other-bot --query 'status?' --timeout 5

Pass --prefix myfleet.v1 to switch namespaces. Pass --nats nats://10.0.0.1:4222 to point at a non-localhost server. Pass --audit-log /var/log/ana.jsonl to record every publish.

Subject convention

Default scheme: <prefix>.<agent>.<verb>.<topic> where <topic> is optional. Verbs are query, reply, ack, discovery, policy.

cc.fleet.alice.query.status
cc.fleet.alice.reply.status
cc.fleet.alice.ack.status
cc.fleet.alice.discovery
cc.fleet.alice.policy

Pick any prefix — change SubjectScheme(prefix=...) and you're done.

Envelope schemas

All five shapes share from (sender identity), ts (ISO-8601 UTC), and type (used as the discriminator on parse), plus optional clock (local_time, uptime_s) and host auto-stamped by AgentBus. See docs/protocol.md for the full spec.

{"type": "query",     "from": "alice", "to": "bob", "query": "status?",
 "fields": ["uptime", "version"], "request_id": "ab12", "ts": "..."}

{"type": "reply",     "from": "bob", "reply_for": "cc.fleet.bob.query.status",
 "request_id": "ab12", "data": {"alive": true}, "ts": "..."}

{"type": "ack",       "from": "carol", "ack_for": "cc.fleet.carol.query.status",
 "alive": true, "ts": "..."}

{"type": "discovery", "from": "dave", "role": "worker",
 "subjects_owned": ["cc.fleet.dave.>"], "capabilities": {"gpu": "A100"}, "ts": "..."}

{"type": "pulse", "from": "eve", "activity": "training",
 "state": "step=4200,ce=0.83",
 "clock": {"local_time": "2026-05-14T15:35:00+09:00", "uptime_s": 3600},
 "host": "claw-0001", "ts": "..."}

Core NATS gotchas

ana defaults to core NATS (no JetStream). Operational implications worth knowing up front:

  • A publish made while the target has no active subscription is lost forever. There's no replay or store-and-forward.
  • query_and_wait() subscribes-then-publishes, which dodges this for the request/reply pattern.
  • For fire-and-forget broadcasts where loss matters, point ana at a JetStream-enabled NATS and ensure the target has a durable consumer before you publish. ana itself doesn't manage streams; do that with nats stream add.

See docs/protocol.md for envelope details and docs/nats-notes.md for deployment notes.

Examples

Tests

pip install -e .[dev]
pytest                          # unit tests (no NATS needed)
nats-server &                   # start a local server
ANA_NATS_URL=nats://127.0.0.1:4222 pytest tests/test_bus_integration.py

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

a2o_ana-0.5.0.tar.gz (22.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

a2o_ana-0.5.0-py3-none-any.whl (20.4 kB view details)

Uploaded Python 3

File details

Details for the file a2o_ana-0.5.0.tar.gz.

File metadata

  • Download URL: a2o_ana-0.5.0.tar.gz
  • Upload date:
  • Size: 22.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.15

File hashes

Hashes for a2o_ana-0.5.0.tar.gz
Algorithm Hash digest
SHA256 fb661db49ed4d82a934a6337db53aedd2788e7c078cbddd5ec9d7bb7c3df944c
MD5 a291a49adeeb8602ef4203a254089c3e
BLAKE2b-256 a5aee5ff0b92520a668aa38f173d1a194d6687db24e4f7adfaac56257f97d777

See more details on using hashes here.

File details

Details for the file a2o_ana-0.5.0-py3-none-any.whl.

File metadata

  • Download URL: a2o_ana-0.5.0-py3-none-any.whl
  • Upload date:
  • Size: 20.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.15

File hashes

Hashes for a2o_ana-0.5.0-py3-none-any.whl
Algorithm Hash digest
SHA256 f8e9145ff0ad5e41394416b93ed42bb5b08035a9f85e660b677c047d39f2da2f
MD5 e33dc6495de177ffac7a366b93d70dd0
BLAKE2b-256 1f3e0c0eaa33ef113877165c28df570beee0273669862687f56368f9b738b881

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page