NATS JetStream task bus for distributed agent programs
Project description
agentbus
agentbus is a small NATS JetStream based task bus for distributed agent programs.
It is designed for this architecture:
coordinator agent / human entry point
↓ publishes task
public NATS JetStream server
↓ durable delivery
agentbus worker long-running process on each worker machine
↓ invokes configured agent command
worker publishes result message and ack/nak/term the task
Design goals
- No bot-to-bot chat dependency.
- No direct inbound access needed for worker machines.
- Generic agent command integration through TOML
chat_cmd. - Worker and publisher configuration via TOML files.
- NATS subjects keep routing explicit and permissionable.
- Durable task delivery through JetStream, not plain fire-and-forget pub/sub.
Requirements
Server side:
nats-serverwith JetStream enabled.natsCLI for stream setup and debugging.- A reachable TCP port for NATS clients. The examples use non-default
7422instead of NATS default4222.
Worker side:
- Python >= 3.11.
- Network access from the worker machine to the NATS server.
- A one-shot agent command configured as TOML
chat_cmd.
Layout
agentbus/
config.py TOML configuration
messages.py task/result schema and prompt builder
publish.py task publishing helpers
worker.py NATS JetStream worker runtime
cli.py agentbus command-line entrypoint
config/
agentbus.worker.example.toml
nats-server.conf
scripts/
stream-setup.sh
deploy/
systemd/agentbus-worker.service
launchd/com.agentbus.worker.plist
supervisor/agentbus-worker.conf
skills/
agentbus/SKILL.md
1. Configure the NATS server
Simple install examples:
# macOS
brew install nats-server nats-io/nats-tools/nats
# Go toolchain
go install github.com/nats-io/nats-server/v2@latest
go install github.com/nats-io/natscli/nats@latest
# Linux packages / Docker / binaries
# See the official installation guide above for the current commands.
Verify both commands are available:
nats-server --version
nats --version
Copy the sample config to the server's NATS config path. The upstream systemd example uses /etc/nats-server.conf; some distro packages may use a different path, so match the service you install.
sudo mkdir -p /data/jetstream /etc/nats/tls
sudo cp config/nats-server.conf /etc/nats-server.conf
sudo chmod 600 /etc/nats-server.conf
sudo chown -R nats:nats /data/jetstream 2>/dev/null || true
Edit the config before starting the server:
sudo $EDITOR /etc/nats-server.conf
If you prefer a project-specific filename such as /etc/nats/agentbus.conf, that is also fine, but then make sure your service or start command explicitly uses nats-server -c /etc/nats/agentbus.conf.
At minimum, change these values:
agent-main password
agent-code password
agent-doc password
client port, if `7422` is not appropriate
TLS cert/key paths, if public internet clients will connect
jetstream.store_dir, if /data/jetstream is not appropriate
The sample config defines three users:
agent-main publishes tasks and subscribes to central results
agent-code subscribes to agent.code.tasks and publishes results
agent-doc subscribes to agent.doc.tasks and publishes results
It also enables JetStream:
jetstream {
store_dir: "/data/jetstream"
max_mem_store: 256MiB
max_file_store: 10GiB
}
Domain and TLS
For public internet deployments, prefer a domain plus TLS. The domain is configured in DNS, not inside NATS. NATS only needs to know which certificate and key files to serve.
Example DNS setup:
agentbus.example.com. A <server_public_ipv4>
agentbus.example.com. AAAA <server_public_ipv6, optional>
Example Let's Encrypt certificate flow on the server:
sudo certbot certonly --standalone -d agentbus.example.com
sudo install -m 0644 /etc/letsencrypt/live/agentbus.example.com/fullchain.pem /etc/nats/tls/fullchain.pem
sudo install -m 0600 /etc/letsencrypt/live/agentbus.example.com/privkey.pem /etc/nats/tls/privkey.pem
sudo chown -R nats:nats /etc/nats/tls 2>/dev/null || true
Then enable this block in the NATS config file:
tls {
cert_file: "/etc/nats/tls/fullchain.pem"
key_file: "/etc/nats/tls/privkey.pem"
}
Clients should then use the domain and tls:// scheme:
tls://agent-main:agent_main_password@agentbus.example.com:7422
If you do not enable TLS, use nats://..., but avoid exposing that setup to the public internet.
Start the server with the config:
nats-server -c /etc/nats-server.conf
For a real deployment, run this under your service manager, for example systemd, Docker, or a managed NATS service.
Important network notes:
- Expose the NATS client port,
7422in these examples, only to machines that need to connect. The NATS default is4222; using a non-default port reduces scanner noise but is not a security boundary. - Keep the monitoring port
8222private or bind it only to localhost/VPN. - Use TLS for public internet deployments. If TLS is enabled, clients should use a
tls://...NATS URL or equivalent TLS client options.
The sample config includes a commented TLS block:
# tls {
# cert_file: "/etc/nats/tls/fullchain.pem"
# key_file: "/etc/nats/tls/privkey.pem"
# }
2. Create JetStream streams
After the NATS server is running, create the task and result streams.
Use a user with JetStream API permission. In the sample config, agent-main has $JS.API.> access:
./scripts/stream-setup.sh 'tls://agent-main:agent_main_password@agentbus.example.com:7422'
This creates:
AGENT_TASKS subjects: agent.*.tasks max age: 7d
AGENT_RESULTS subjects: agent.*.results max age: 30d
You can inspect the streams with:
nats --server 'tls://agent-main:agent_main_password@agentbus.example.com:7422' stream ls
nats --server 'tls://agent-main:agent_main_password@agentbus.example.com:7422' stream info AGENT_TASKS
nats --server 'tls://agent-main:agent_main_password@agentbus.example.com:7422' stream info AGENT_RESULTS
3. Install the worker
From PyPI:
python3 -m venv env
source env/bin/activate
pip install --upgrade pip
pip install agentbus
From a source checkout:
python3 -m venv env
source env/bin/activate
pip install --upgrade pip
pip install -e .
Requires Python >= 3.11. If python3 points to an older interpreter on your machine, use a versioned command such as python3.11 or python3.14.
For development and tests:
pip install -e ".[dev]"
python -m pytest tests -q
4. Configure the worker
Prefer a config file:
mkdir -p ~/.agentbus
cp config/agentbus.worker.example.toml ~/.agentbus/config.toml
$EDITOR ~/.agentbus/config.toml
chmod 600 ~/.agentbus/config.toml
If --config is omitted, agentbus worker run checks:
./agentbus.toml
~/.agentbus/config.toml
/etc/agentbus/agentbus.toml
Required worker fields:
[agent]
id = "code"
# {input} is required and marks where AgentBus inserts the generated prompt.
chat_cmd = ["agent-cli", "chat", "--oneshot", "{input}"]
[nats]
url = "tls://agent-code:agent_code_password@agentbus.example.com:7422"
A fuller example:
[agent]
id = "code"
chat_cmd = ["agent-cli", "chat", "--oneshot", "{input}"]
extra_instruction = ""
[worker]
task_timeout_seconds = 1800
max_task_bytes = 1048576
reconnect_time_wait_seconds = 2
max_reconnect_attempts = -1
[nats]
url = "tls://agent-code:agent_code_password@agentbus.example.com:7422"
stream = "AGENT_TASKS"
task_subject = "agent.code.tasks"
default_result_subject = "agent.main.results"
# Durable consumer name. Keep stable per worker identity so NATS remembers ack/progress.
durable = "agent-code"
[log]
dir = "~/.agentbus/logs"
max_bytes = 104857600
backup_count = 5
chat_cmd can also be a string, but list form is safer because {input} becomes one exact argument even when the generated prompt contains spaces or quotes.
# String form is accepted for simple commands.
chat_cmd = "agent-cli chat --oneshot {input}"
# Prefer list form when the prompt belongs between flags.
chat_cmd = ["agent-cli", "run", "--prompt", "{input}", "--json"]
# Hermes example.
chat_cmd = ["hermes", "chat", "-q", "-Q", "{input}"]
durable is the NATS JetStream durable consumer name. It is not a password or a server address; it is the stable name NATS uses to remember this worker's delivery progress. If the worker restarts with the same durable name, NATS can continue from unacked / not-yet-delivered messages instead of treating it as a brand-new ephemeral consumer.
AgentBus intentionally does not use environment variables for worker configuration. Put worker settings in TOML and pass --config when you do not want the default path.
5. Run the worker
Foreground mode:
agentbus worker run --config ~/.agentbus/config.toml
For long-running deployment, use one of the included templates:
deploy/systemd/agentbus-worker.service
deploy/launchd/com.agentbus.worker.plist
deploy/supervisor/agentbus-worker.conf
Before installing a service, edit the template paths, user, working directory, and config path for the target machine.
6. Publish a test task
Read the latest result in one terminal:
agentbus result get \
--nats-url 'tls://agent-main:agent_main_password@agentbus.example.com:7422' \
--agent main
To keep watching after reading recent history, add --watch. --limit has the same meaning whether or not --watch is set: read the latest N stored results first.
agentbus result get \
--nats-url 'tls://agent-main:agent_main_password@agentbus.example.com:7422' \
--agent main \
--limit 20 \
--watch
Publish a test task in another terminal:
agentbus task publish \
--nats-url 'tls://agent-main:agent_main_password@agentbus.example.com:7422' \
--to code \
--to doc \
--from main \
--reply-to main \
--task-type ping \
'hello'
Publishing is intentionally configured with CLI arguments instead of a TOML file. Unlike the worker, it is a short one-shot command with only a few options.
Only the task content is positional; agent routing and task metadata are named options so the command remains readable.
Repeat --to to publish the same task content to multiple agents. AgentBus sends one task message per target.
--task-type names the kind of work to run, while the final positional argument is the task content.
--reply-to is an agent id, like --from and --to. It controls which agent result inbox receives the worker execution record; when omitted, it defaults to --from. AgentBus derives the result subject internally as agent.<reply_to>.results.
--payload-fmt defaults to text, which stores the positional argument as payload.fmt = "text" and payload.content string. With --payload-fmt json, the positional argument is parsed as JSON and wrapped as payload.fmt = "json" plus payload.content, so JSON objects, arrays, strings, numbers, booleans, and null are all accepted. Workers should treat missing, empty, null, or text payload format as text.
Examples:
agentbus task publish \
--nats-url 'tls://agent-main:agent_main_password@agentbus.example.com:7422' \
--to code \
--task-type ping \
--payload-fmt text \
'hello'
agentbus task publish \
--nats-url 'tls://agent-main:agent_main_password@agentbus.example.com:7422' \
--to code \
--task-type batch \
--payload-fmt json \
'[{"url":"https://example.com"}]'
The --to code and --to doc options map to these task subjects:
agent.code.tasks
agent.doc.tasks
If the target workers are running, agentbus result get --agent main should receive task.result messages from:
agent.main.results
Message subjects
Recommended convention:
agent.<agent_id>.tasks tasks for one worker agent
agent.<agent_id>.results optional direct result stream per agent
agent.main.results central result subject for the coordinator
agent.<agent_id>.heartbeat optional health events
Examples:
agent.code.tasks
agent.doc.tasks
agent.main.results
Task message
{
"id": "task-20260518-0001",
"from": "agent-main",
"to": "agent-code",
"reply_to": "agent-main",
"type": "task.request",
"task_type": "review_pr",
"payload": {
"fmt": "json",
"content": {
"repo": "org/repo",
"pr": 123
}
}
}
Result message
{
"id": "result-uuid",
"type": "task.result",
"status": "completed",
"task": {
"id": "task-20260518-0001",
"from": "agent-main",
"to": "agent-code",
"reply_to": "agent-main",
"type": "task.request",
"task_type": "review_pr",
"payload": {
"fmt": "json",
"content": {
"repo": "org/repo",
"pr": 123
}
},
"created_at": "2026-05-18T00:00:00+00:00"
},
"result": "...agent output...",
"completed_at": "2026-05-18T00:00:00+00:00"
}
result messages are worker-generated execution records. Agent-to-agent business replies should still be sent as new task messages. The original task is embedded whole under task; top-level duplicate routing fields such as request_id, from, to, worker, and reply_to are intentionally omitted.
Recommended status values:
completed
failed
needs_approval
Logs
The worker writes logs both to stderr and to a rotating file. The default log file is:
~/.agentbus/logs/agentbus-worker.log
Default rotation settings:
max file size: 100MB
backup count: 5
The log directory is created automatically. Configure it with [log].dir, [log].max_bytes, and [log].backup_count in TOML.
Ack behavior
valid task + command succeeds and result is published → ack
valid task + command exits non-zero and result is published → ack
invalid JSON / invalid task schema → term if available, otherwise ack
worker crashes before result publish → nak if available, then raise
Security notes
- Use one NATS user per agent.
- Replace all sample passwords before running in a shared or public environment.
- Restrict each user to only the subjects it needs.
- Use TLS for public NATS deployments.
- Keep monitoring/admin ports private.
- Store config files with
chmod 600if credentials are embedded innats_url. - Do not put tokens, cookies, or authorization headers in task payloads unless strictly necessary.
- Treat tasks that delete data, send external messages, deploy code, merge PRs, or spend money as approval-required.
License
Apache License 2.0. See LICENSE.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file agentbus-0.1.0.tar.gz.
File metadata
- Download URL: agentbus-0.1.0.tar.gz
- Upload date:
- Size: 29.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5e0387195963ca9942a0e58dc47da44f5ecf1fc6bdcf1e775ab76eb19f115989
|
|
| MD5 |
ebe8eaa68ab6aa654c71adb7ec53df36
|
|
| BLAKE2b-256 |
efce8866bdb6c56409d48913b622e7a0ef5e3d84e05df9e9f7d6a71fc3c9c130
|
File details
Details for the file agentbus-0.1.0-py3-none-any.whl.
File metadata
- Download URL: agentbus-0.1.0-py3-none-any.whl
- Upload date:
- Size: 20.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9b9a23c6becd1f486df3f596fb31b1ca3ed6af87631914375a69ae56e4621f3e
|
|
| MD5 |
f6c5acef22954979d1ad1506ba94862d
|
|
| BLAKE2b-256 |
16421ec53d6537db4df2c79949faba0013615b78c696f9640e8eed3545ca4d7b
|