Skip to main content

A multi-agent orchestration runtime

Project description

ramure

ramure is an opinionated and lightweight Python library for building reliable agent software. It makes it easy to define programs where agents communicate across environments to accomplish a task.

Agent software are complex distributed systems. The goal of ramure is to make it easier to build and robustify these systems, in 2 notable ways:

  • Infrastructure primitives: for agent communication, provisioning, and the software environments in which they run
  • Fault-tolerant and modular design: ramure's abstractions encourage modularity and fault-tolerance in the design of agent software, using ideas from distributed systems programming like Erlang. See [here] for the motivation behind ramure's design.

Here are some examples of tasks ramure makes easy with agents:

  • optimization
  • custom software generation pipelines with user input
  • data pipelines
  • worker pools, monitors, and supervisors

Install

pip install ramure

Python 3.11+ is required.

ramure depends on pi and tmux for the machines on which agents run.

Quick start

Here is an annotated single worker program:

import asyncio
from ramure import agent, agent_process, done, fail, wait

# registers the ramure runtime which will manage the agents and their machines, as well as controlling the lifecycle/cleanup
@agent_process
async def run_task(spec: str) -> str:
    # initialize an agent (either locally or on a remote sandbox)
    worker = await agent(f"worker")

    # register tools the agent can call in-harness
    @worker.on("finish")
    async def on_finish(summary: str) -> str:
        """Call with your result when the task is done."""
        done(summary)
        return "Recorded."

    @worker.on("give_up")
    async def on_give_up(reason: str) -> str:
        """Call if you cannot complete the task."""
        fail(f"gave_up: {reason}")
        return "Recorded."

    await worker.send(
        f"Task:\n\n{spec}\n\n"
        "When done, call finish(summary). If impossible, "
        "call give_up(reason)."
    )
    # wait for the done/fail lifecycle triggered by the agent events
    return await wait()


if __name__ == "__main__":
    print(asyncio.run(run_task("Write 10 diverse haikus about git rebase.")))

Run it with:

uv run your_program.py

Now you can connect to the agent via ramure connect worker to see what it's doing.

Core concepts

Agent processes

The central object of ramure is the agent_process (AP), defined by decorating an async function with @agent_process. Inside the function, you define agents and machines as well as how they should communicate.

When a root AP gets called, ramure initializes a runtime that is responsible for the lifecycle of the agents and machines it owns. Nested APs inherit the active runtime. To control the lifecycle of an AP, you define events that agents can call back into deterministic Python through @agent.on(...).

Structuring how information moves in your program makes it easier to reliably use agent labor, especially in more complex cases. You can also configure which image an AP runs from — your local machine by default, or another image backend such as Docker.

Composition

APs compose. An AP can call another AP the way you'd call any async function:

@agent_process
async def main():
    code = await write_code("fibonacci function")
    review = await review_code(code)
    return code

Or fan out concurrently with asyncio.gather:

@agent_process
async def main():
    results = await asyncio.gather(
        research("Rust"),
        research("Python"),
    )
    return results

Observation, bubbling, and retry

An AP can also spawn() and obtain a handle to the running AP whose events become observable in real time.

@agent_process
async def main():
    handle = spawn(flaky_task, "write a haiku")

    async for event in handle.events:
        if event.type == "failed":
            handle = spawn(flaky_task, "write a haiku")
        elif event.type == "done":
            return event.data

This handle holds on to the child AP's event stream, so you can observe it as it runs, retry if it fails, and pass events along to higher-level supervisors using bubble().

Processes can emit custom events with emit(type, data). If a supervisor wants child events to appear on its own event stream, use bubble():

@agent_process
async def worker_pool(specs: list[str]) -> None:
    for i, spec in enumerate(specs):
        tid = f"t{i:04d}"
        bubble(spawn(run_task, tid, spec), source=tid)
    await wait()

Now a parent observing spawn(worker_pool, specs).events can see child events too, tagged with source=tid.

Endpoints and afforded interfaces

APs can also encode specific ways in which they are interacted with, by exposing an API that can be called in code, or via another agent. To do this, use the @expose decorator:

@agent_process
async def worker_pool() -> None:
    specs: dict[str, str] = {}

    @expose
    async def add_task(spec: str) -> str:
        tid = f"t{len(specs):04d}"
        specs[tid] = spec
        emit("task_added", {"task_id": tid, "spec": spec})
        bubble(spawn(run_task, tid, spec), source=tid)
        return tid

    @expose
    async def tasks() -> dict[str, str]:
        return dict(specs)

    emit("ready", None)
    await wait()

You can then consume the exposed worker pool in various ways:

@agent_process
async def main():
    pool = spawn(worker_pool)

    async for event in pool.events:
        if event.type == "ready":
            break

    # call directly
    await pool.call("add_task", spec="Write a haiku about git rebase.")

    # or attach an agent to call the exposed functions on the pool,
    # which get exposed as tools
    monitor = await agent(
        "monitor",
        system_prompt="You run a pool of workers.",
    )
    await pool.attach(monitor, prefix="pool_")

This lets you give a component narrow affordances instead of ambient access to everything. Endpoints run inside the child process's scope, so calls to emit(), done(), and fail() inside an endpoint affect the child, not the caller. Child-owned agents are also visible through handle.agents once the child has created them.

API

Decorator

  • @agent_process(image=, timeout=, log_dir=, host=, port=, base_url=) — wrap an async function as a process

Ambient functions

  • await agent(name, system_prompt=, image=, machine=) — create an agent
  • await machine(image=) — spawn a standalone machine
  • connect(a, b, direction=) — allow agents to message/send files
  • done(result) — signal process success
  • fail(reason) — signal process failure
  • await wait() — block until done() or fail()
  • emit(type, data) — emit a process event
  • spawn(fn, *args, **kwargs) — run a process in the background, returns ProcessHandle
  • bubble(handle, source=) — forward a child process's events onto the current process stream
  • @expose — register an async function as an endpoint callable via handle.call() or attachable via handle.attach()
  • current_runtime() — access the active runtime (rarely needed)

Agent methods

  • agent.on(tool_name) — decorator to register an async tool handler
  • agent.send(message) — send a message to the agent
  • agent.exec(command) — run a shell command on the agent's machine
  • agent.events — async-iterable log of raw agent events

ProcessHandle

  • handle.events — async-iterable stream of process events
  • handle.agents — dict of the child's agents created so far
  • await handle.call(name, **kwargs) — call an endpoint
  • await handle.attach(agent, only=, prefix=) — register endpoints as tools on an agent
  • handle.cancel() — cancel the process

CLI

Running a root @agent_process opens a Unix socket at ~/.ramure/runtimes/{execution_id}.sock and writes a per-run log tree under ~/.ramure/logs/{execution_id}/. The ramure CLI uses these:

ramure ls                         # live runs
ramure status [--id <prefix>]     # agents, machines, connections
ramure send <agent> <msg> [--id <prefix>]
ramure connect <agent> [--id <prefix>]  # tmux attach
ramure ssh <agent> [--id <prefix>]      # shell on the agent's machine

--id takes an execution-id prefix. Omit it when there's only one live run. All commands require the run to be live (socket present). Finished-run logs remain under ~/.ramure/logs/{execution_id}/.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ramure-0.0.2.tar.gz (141.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ramure-0.0.2-py3-none-any.whl (44.1 kB view details)

Uploaded Python 3

File details

Details for the file ramure-0.0.2.tar.gz.

File metadata

  • Download URL: ramure-0.0.2.tar.gz
  • Upload date:
  • Size: 141.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.18 {"installer":{"name":"uv","version":"0.9.18","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for ramure-0.0.2.tar.gz
Algorithm Hash digest
SHA256 cdb95e344829f7624d4f4cd1b2cd815ca8d6efde87c251310d28bb107e8d1773
MD5 353e1330154f42b40e5386c0e36706d7
BLAKE2b-256 bb20549b409bb1cbad026828e50adc9ee209212103dfa6e91bf855eb0e7065c7

See more details on using hashes here.

File details

Details for the file ramure-0.0.2-py3-none-any.whl.

File metadata

  • Download URL: ramure-0.0.2-py3-none-any.whl
  • Upload date:
  • Size: 44.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.18 {"installer":{"name":"uv","version":"0.9.18","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for ramure-0.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 4ca4367490da2592e9ca5faf68bbb866317049541e455145f62d9f8fb801d393
MD5 ff13d43c0a4e633bc2df8c622845caf4
BLAKE2b-256 37c09111b4f2c2273765deb9b9093927a032b067631e2cfc1a443d605d3832d8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page