Skip to main content

Python SDK for stdio_bus - AI agent transport layer

Project description

stdiobus

Python SDK for building AI agents over stdio_bus.

PyPI License

stdiobus gives you a reliable transport layer for ACP/MCP-style workflows: route requests to agent workers, keep request/response correlation stable, handle streaming updates, and work with async or sync Python code.

Use it when you want to focus on agent logic, not process wiring and message transport.

Why use stdiobus

  • Build ACP/MCP agents without writing transport plumbing.
  • Send typed JSON-RPC requests with automatic session routing.
  • Receive streamed agent output as a final aggregated text.
  • Use the same protocol model across Python, Node, and Rust SDKs.
  • Run locally with a binary, or via Docker when needed.

Features

  • Simple client API: AsyncStdioBus and StdioBus.
  • Programmatic config: define worker pools in Python or use a config file.
  • Session routing by default: clientSessionId is injected automatically.
  • Hello handshake: stdio_bus/hello negotiation support.
  • Protocol extensions: identity, audit metadata, and agentId routing.
  • Streaming support: agent_message_chunk aggregation into final response text.
  • Predictable cancellation: in-flight requests fail with TransportError on shutdown or crash.
  • Cross-platform: subprocess backend with Docker fallback.
  • Typed API: dataclasses and type hints for IDE support.

Installation

pip install stdiobus

Requirements:

  • Python 3.10+
  • stdio_bus binary in PATH (or Docker)

Quick Start (Async)

import asyncio
from stdiobus import AsyncStdioBus, BusConfig, PoolConfig

async def main():
    async with AsyncStdioBus(
        config=BusConfig(
            pools=[PoolConfig(id="echo", command="python", args=["./echo_worker.py"], instances=1)]
        )
    ) as bus:
        result = await bus.request("echo", {"message": "hello"})
        print(result)

asyncio.run(main())

Quick Start (Sync)

from stdiobus import StdioBus, BusConfig, PoolConfig

with StdioBus(
    config=BusConfig(
        pools=[PoolConfig(id="echo", command="python", args=["./echo_worker.py"], instances=1)]
    )
) as bus:
    result = bus.request("echo", {"message": "hello"})
    print(result)

Real Use Cases

ACP agent flow

from stdiobus import (
    AsyncStdioBus, BusConfig, PoolConfig,
    HelloParams, RequestOptions, Identity, AuditEvent,
)

bus = AsyncStdioBus(
    config=BusConfig(
        pools=[PoolConfig(id="acp-worker", command="python", args=["./acp_worker.py"], instances=1)]
    ),
    timeout_ms=60000,
)

await bus.start()

# Optional protocol handshake
hello = await bus.hello(HelloParams())
print("Negotiated:", hello.negotiated_protocol_version)

# Request with identity/audit metadata + agent routing
result = await bus.request(
    "session/update",
    {"input": "Summarize latest incident report"},
    options=RequestOptions(
        agent_id="agent-42",
        identity=Identity(subject_id="user-123", role="operator"),
        audit=AuditEvent(event_id="evt-1001", action="session/update"),
    ),
)

# If stream chunks were received, result["text"] contains aggregated output
print(result.get("text", result))

await bus.stop()
bus.destroy()

MCP tools call

from stdiobus import AsyncStdioBus, BusConfig, PoolConfig

async with AsyncStdioBus(
    config=BusConfig(
        pools=[PoolConfig(id="mcp-tools", command="python", args=["-m", "my_tools_worker"], instances=2)]
    )
) as bus:
    tools = await bus.request("tools/list")
    print("Tools:", tools)

    output = await bus.request("tools/call", {
        "name": "search_docs",
        "arguments": {"query": "retry policy"},
    })
    print(output)

Configuration

Programmatic (recommended)

from stdiobus import BusConfig, PoolConfig, LimitsConfig

config = BusConfig(
    pools=[
        PoolConfig(id="agent-a", command="python", args=["./worker_a.py"], instances=2),
        PoolConfig(id="agent-b", command="python", args=["-m", "worker_b"], instances=1),
    ],
    limits=LimitsConfig(
        max_input_buffer=2_097_152,
        max_restarts=10,
    ),
)

File-based (legacy)

from stdiobus import StdioBus

bus = StdioBus(config_path="./stdio-bus-config.json")

config and config_path are mutually exclusive.

API Reference

Main classes

  • AsyncStdioBus(config=..., config_path=..., backend="auto", timeout_ms=...)
  • StdioBus(...) — sync wrapper

Lifecycle

Method Description
start() Start the bus and spawn workers
stop(timeout_sec=30) Stop gracefully, cancel in-flight requests
connect(params) Start + optional hello handshake
hello(params) Perform stdio_bus/hello handshake
destroy() Release all resources

Messaging

Method Description
request(method, params, ...) Send request and wait for response
notify(method, params, ...) Send notification (no response)
send(message) Send raw JSON-RPC message
on_message(handler) Register handler for all inbound messages
on_notification(handler) Register handler for notifications only

Properties

Property Description
client_session_id Auto-generated routing session ID
agent_session_id Agent-returned session ID (after hello)
get_state() Current bus state
get_stats() Runtime statistics
get_backend_type() Active backend: subprocess, native, docker

Protocol types

HelloParams, HelloResult, RequestOptions, Identity, AuditEvent, BusConfig, PoolConfig, LimitsConfig, SubprocessOptions

Errors

Exception When
InvalidArgumentError Bad parameter or config
InvalidStateError Operation not valid in current state
TimeoutError Request exceeded deadline
TransportError Transport failure, shutdown, or crash
PolicyDeniedError Operation denied by policy

Known Behavior

  • No automatic reconnect. If the bus process exits, pending requests fail with TransportError. Create a new instance to reconnect.
  • stop() cancels all in-flight requests with TransportError before stopping the backend.
  • Streaming chunks (agent_message_chunk) are aggregated into result["text"] when the response result is a dict.
  • stdout from the bus process is expected to carry NDJSON protocol messages only.

Advanced: Backend Details

For most users, backend="auto" is the right choice. Details for those who need control:

Backend When Config delivery
subprocess stdio_bus binary in PATH (default) --config-fd <N> pipe
native libstdio_bus.a built with cffi embed API (in-process)
docker Docker available --config <mounted-file>

Auto-selection: subprocess → native → docker (Unix), subprocess → docker (Windows).

from stdiobus import StdioBus, SubprocessOptions

bus = StdioBus(
    config=config,
    backend="subprocess",
    subprocess=SubprocessOptions(
        binary_path="/usr/local/bin/stdio_bus",
        start_timeout_sec=10.0,
    ),
)

Development

pip install -e ".[dev]"
pytest -v

License

Apache-2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

stdiobus-2.1.0.tar.gz (50.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

stdiobus-2.1.0-py3-none-any.whl (33.5 kB view details)

Uploaded Python 3

File details

Details for the file stdiobus-2.1.0.tar.gz.

File metadata

  • Download URL: stdiobus-2.1.0.tar.gz
  • Upload date:
  • Size: 50.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.0

File hashes

Hashes for stdiobus-2.1.0.tar.gz
Algorithm Hash digest
SHA256 bc184175f769b68f2dd9be32195e6a52868e936a98a8acec0909effc71be8895
MD5 de18290877485b40f1e4a8fb56dd0d51
BLAKE2b-256 750bd38dd0a39d25d9b0e4ff1fcd390196eb7a31f408d2bdd7e7720d47fe2e6a

See more details on using hashes here.

File details

Details for the file stdiobus-2.1.0-py3-none-any.whl.

File metadata

  • Download URL: stdiobus-2.1.0-py3-none-any.whl
  • Upload date:
  • Size: 33.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.0

File hashes

Hashes for stdiobus-2.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 d9d397a9e85c6a90b216b8888d3bdd4febc29020180e689960fc728311b14a60
MD5 058c27ba43525bcbe1f1fcfc9dfd2e23
BLAKE2b-256 bcced9ff97ece8c1c35f771000ed4445416ea91e77961dcdd692633dce67eca2

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page