Skip to main content

Out-of-the-box NATS-based agent-to-agent (A2A) protocol and Python client.

Project description

ana — out-of-the-box NATS agent-to-agent bus

A small Python package that gives you the boring parts of an A2A (agent-to-agent) protocol so you can focus on what your agents actually do:

  • Envelope schemas — typed Query / Reply / Ack / Discovery pydantic models, one canonical wire format.
  • Subject convention<prefix>.<agent>.<verb>.<topic>, prefix configurable per fleet, parseable both directions.
  • Async client (AgentBus) — thin wrapper on nats-py with publish auditing, a request/reply helper that dodges the core-NATS "no-sub means lost" gotcha, and an InboundContext that handlers can use to reply / ack without juggling state.
  • CLI (ana)listen / send / probe so shell scripts and operators don't need to import Python.

Built on top of existing NATS — bring your own cluster, anonymous or auth, core or JetStream. ana does not deploy anything.

Install

pip install a2o-ana          # PyPI distribution
# or, from source:
pip install -e .[dev]

(PyPI distribution is a2o-ana because the bare ana name was squatted in 2015. The import name stays anafrom ana import AgentBus.)

5-line bot

import asyncio
from ana import AgentBus

async def main():
    async with AgentBus("my-bot") as bus:
        await bus.query("other-bot", "what's your status?")

asyncio.run(main())

Responding bot

import asyncio
from ana import AgentBus, Query
from ana.client import InboundContext

async def handler(env, ctx: InboundContext):
    if isinstance(env, Query):
        await ctx.reply({"status": "alive", "answer": env.query})

async def main():
    async with AgentBus("my-bot") as bus:
        await bus.subscribe_to_me(handler)
        await bus.announce_discovery(role="worker")
        await asyncio.Event().wait()  # serve forever

asyncio.run(main())

Request/reply with timeout

query_and_wait subscribes to the responder's reply subject before publishing the query — so the core-NATS "publish-into-the-void" gotcha can't bite you here.

reply = await bus.query_and_wait("other-bot", "status?", timeout_s=5.0)
if reply is None:
    print("peer didn't answer in time")
else:
    print(reply.data)

CLI

# Listen for everything addressed to you
ana --identity miraku-home listen --scope self

# Send a query
ana --identity caller send query --to other-bot --query 'status?' --topic default

# Probe (send + wait for one reply)
ana --identity caller probe other-bot --query 'status?' --timeout 5

Pass --prefix myfleet.v1 to switch namespaces. Pass --nats nats://10.0.0.1:4222 to point at a non-localhost server. Pass --audit-log /var/log/ana.jsonl to record every publish.

Subject convention

Default scheme: <prefix>.<agent>.<verb>.<topic> where <topic> is optional. Verbs are query, reply, ack, discovery, policy.

cc.fleet.alice.query.status
cc.fleet.alice.reply.status
cc.fleet.alice.ack.status
cc.fleet.alice.discovery
cc.fleet.alice.policy

Pick any prefix — change SubjectScheme(prefix=...) and you're done.

Envelope schemas

All four shapes share from (sender identity), ts (ISO-8601 UTC), and type (used as the discriminator on parse). See docs/protocol.md for the full spec.

{"type": "query",     "from": "alice", "to": "bob", "query": "status?",
 "fields": ["uptime", "version"], "request_id": "ab12", "ts": "..."}

{"type": "reply",     "from": "bob", "reply_for": "cc.fleet.bob.query.status",
 "request_id": "ab12", "data": {"alive": true}, "ts": "..."}

{"type": "ack",       "from": "carol", "ack_for": "cc.fleet.carol.query.status",
 "alive": true, "ts": "..."}

{"type": "discovery", "from": "dave", "role": "worker",
 "subjects_owned": ["cc.fleet.dave.>"], "capabilities": {"gpu": "A100"}, "ts": "..."}

Core NATS gotchas

ana defaults to core NATS (no JetStream). Operational implications worth knowing up front:

  • A publish made while the target has no active subscription is lost forever. There's no replay or store-and-forward.
  • query_and_wait() subscribes-then-publishes, which dodges this for the request/reply pattern.
  • For fire-and-forget broadcasts where loss matters, point ana at a JetStream-enabled NATS and ensure the target has a durable consumer before you publish. ana itself doesn't manage streams; do that with nats stream add.

See docs/protocol.md for envelope details and docs/nats-notes.md for deployment notes.

Examples

Tests

pip install -e .[dev]
pytest                          # unit tests (no NATS needed)
nats-server &                   # start a local server
ANA_NATS_URL=nats://127.0.0.1:4222 pytest tests/test_bus_integration.py

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

a2o_ana-0.1.1.tar.gz (16.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

a2o_ana-0.1.1-py3-none-any.whl (15.5 kB view details)

Uploaded Python 3

File details

Details for the file a2o_ana-0.1.1.tar.gz.

File metadata

  • Download URL: a2o_ana-0.1.1.tar.gz
  • Upload date:
  • Size: 16.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.15

File hashes

Hashes for a2o_ana-0.1.1.tar.gz
Algorithm Hash digest
SHA256 3594fba09a9377fea8eb62e06ad8ac4106fdc28fc4ff982c515933a916979f33
MD5 26a1d168221d3e5eae80584f01c2c3ec
BLAKE2b-256 eed0d0b4522c17ea6c1aeb2b936881742542b983a09c95eae91fbd2a3e32e430

See more details on using hashes here.

File details

Details for the file a2o_ana-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: a2o_ana-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 15.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.15

File hashes

Hashes for a2o_ana-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 710d1451e9eb581559d13a392ea34331ffc7d9ea5bc9d35f70b239c1348188fc
MD5 5ffb3d1ad7f4deefe3d36e33e9aa181f
BLAKE2b-256 dfa248e306da5d30f6c55f90c8e171b5afed41edd0e86f9a98654853929be8b5

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page