A multi-agent orchestration runtime
Project description
ramure
ramure is an opinionated and lightweight Python library for building reliable agent software. It makes it easy to define programs where agents communicate across environments to accomplish a task.
Agent software are complex distributed systems. The goal of ramure is to make it easier to build and robustify these systems, in 2 notable ways:
- Infrastructure primitives: for agent communication, provisioning, and the software environments in which they run
- Fault-tolerant and modular design: ramure's abstractions encourage modularity and fault-tolerance in the design of agent software, using ideas from distributed systems programming like Erlang. See [here] for the motivation behind ramure's design.
Here are some examples of tasks ramure makes easy with agents:
- optimization
- custom software generation pipelines with user input
- data pipelines
- worker pools, monitors, and supervisors
Install
pip install ramure
Python 3.11+ is required.
ramure depends on pi and tmux for the machines on which agents run.
Quick start
Here is an annotated single worker program:
import asyncio
from ramure import agent, agent_process, done, fail, wait
# registers the ramure runtime which will manage the agents and their machines, as well as controlling the lifecycle/cleanup
@agent_process
async def run_task(spec: str) -> str:
# initialize an agent (either locally or on a remote sandbox)
worker = await agent(f"worker")
# register tools the agent can call in-harness
@worker.on("finish")
async def on_finish(summary: str) -> str:
"""Call with your result when the task is done."""
done(summary)
return "Recorded."
@worker.on("give_up")
async def on_give_up(reason: str) -> str:
"""Call if you cannot complete the task."""
fail(f"gave_up: {reason}")
return "Recorded."
await worker.send(
f"Task:\n\n{spec}\n\n"
"When done, call finish(summary). If impossible, "
"call give_up(reason)."
)
# wait for the done/fail lifecycle triggered by the agent events
return await wait()
if __name__ == "__main__":
print(asyncio.run(run_task("Write 10 diverse haikus about git rebase.")))
Run it with:
uv run your_program.py
Now you can connect to the agent via ramure connect worker to see what it's doing.
Core concepts
Agent processes
The central object of ramure is the agent_process (AP), defined by decorating an async function with @agent_process. Inside the function, you define agents and machines as well as how they should communicate.
When a root AP gets called, ramure initializes a runtime that is responsible for the lifecycle of the agents and machines it owns. Nested APs inherit the active runtime. To control the lifecycle of an AP, you define events that agents can call back into deterministic Python through @agent.on(...).
Structuring how information moves in your program makes it easier to reliably use agent labor, especially in more complex cases. You can also configure which image an AP runs from — your local machine by default, or another image backend such as Docker.
Composition
APs compose. An AP can call another AP the way you'd call any async function:
@agent_process
async def main():
code = await write_code("fibonacci function")
review = await review_code(code)
return code
Or fan out concurrently with asyncio.gather:
@agent_process
async def main():
results = await asyncio.gather(
research("Rust"),
research("Python"),
)
return results
Observation, bubbling, and retry
An AP can also spawn() and obtain a handle to the running AP whose events become observable in real time.
@agent_process
async def main():
handle = spawn(flaky_task, "write a haiku")
async for event in handle.events:
if event.type == "failed":
handle = spawn(flaky_task, "write a haiku")
elif event.type == "done":
return event.data
This handle holds on to the child AP's event stream, so you can observe it as it runs, retry if it fails, and pass events along to higher-level supervisors using bubble().
Processes can emit custom events with emit(type, data). If a supervisor wants child events to appear on its own event stream, use bubble():
@agent_process
async def worker_pool(specs: list[str]) -> None:
for i, spec in enumerate(specs):
tid = f"t{i:04d}"
bubble(spawn(run_task, tid, spec), source=tid)
await wait()
Now a parent observing spawn(worker_pool, specs).events can see child events too, tagged with source=tid.
Endpoints and afforded interfaces
APs can also encode specific ways in which they are interacted with, by exposing an API that can be called in code, or via another agent. To do this, use the @expose decorator:
@agent_process
async def worker_pool() -> None:
specs: dict[str, str] = {}
@expose
async def add_task(spec: str) -> str:
tid = f"t{len(specs):04d}"
specs[tid] = spec
emit("task_added", {"task_id": tid, "spec": spec})
bubble(spawn(run_task, tid, spec), source=tid)
return tid
@expose
async def tasks() -> dict[str, str]:
return dict(specs)
emit("ready", None)
await wait()
You can then consume the exposed worker pool in various ways:
@agent_process
async def main():
pool = spawn(worker_pool)
async for event in pool.events:
if event.type == "ready":
break
# call directly
await pool.call("add_task", spec="Write a haiku about git rebase.")
# or attach an agent to call the exposed functions on the pool,
# which get exposed as tools
monitor = await agent(
"monitor",
system_prompt="You run a pool of workers.",
)
await pool.attach(monitor, prefix="pool_")
This lets you give a component narrow affordances instead of ambient access to everything. Endpoints run inside the child process's scope, so calls to emit(), done(), and fail() inside an endpoint affect the child, not the caller. Child-owned agents are also visible through handle.agents once the child has created them.
API
Decorator
@agent_process(image=, timeout=, log_dir=, host=, port=, base_url=)— wrap an async function as a process
Ambient functions
await agent(name, system_prompt=, image=, machine=)— create an agentawait machine(image=)— spawn a standalone machineconnect(a, b, direction=)— allow agents to message/send filesdone(result)— signal process successfail(reason)— signal process failureawait wait()— block untildone()orfail()emit(type, data)— emit a process eventspawn(fn, *args, **kwargs)— run a process in the background, returnsProcessHandlebubble(handle, source=)— forward a child process's events onto the current process stream@expose— register an async function as an endpoint callable viahandle.call()or attachable viahandle.attach()current_runtime()— access the active runtime (rarely needed)
Agent methods
agent.on(tool_name)— decorator to register an async tool handleragent.send(message)— send a message to the agentagent.exec(command)— run a shell command on the agent's machineagent.events— async-iterable log of raw agent events
ProcessHandle
handle.events— async-iterable stream of process eventshandle.agents— dict of the child's agents created so farawait handle.call(name, **kwargs)— call an endpointawait handle.attach(agent, only=, prefix=)— register endpoints as tools on an agenthandle.cancel()— cancel the process
CLI
Running a root @agent_process opens a Unix socket at
~/.ramure/runtimes/{execution_id}.sock and writes a per-run log tree
under ~/.ramure/logs/{execution_id}/. The ramure CLI uses these:
ramure ls # live runs
ramure status [--id <prefix>] # agents, machines, connections
ramure send <agent> <msg> [--id <prefix>]
ramure connect <agent> [--id <prefix>] # tmux attach
ramure ssh <agent> [--id <prefix>] # shell on the agent's machine
--id takes an execution-id prefix. Omit it when there's only one live run.
All commands require the run to be live (socket present). Finished-run logs
remain under ~/.ramure/logs/{execution_id}/.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file ramure-0.0.2.tar.gz.
File metadata
- Download URL: ramure-0.0.2.tar.gz
- Upload date:
- Size: 141.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.18 {"installer":{"name":"uv","version":"0.9.18","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
cdb95e344829f7624d4f4cd1b2cd815ca8d6efde87c251310d28bb107e8d1773
|
|
| MD5 |
353e1330154f42b40e5386c0e36706d7
|
|
| BLAKE2b-256 |
bb20549b409bb1cbad026828e50adc9ee209212103dfa6e91bf855eb0e7065c7
|
File details
Details for the file ramure-0.0.2-py3-none-any.whl.
File metadata
- Download URL: ramure-0.0.2-py3-none-any.whl
- Upload date:
- Size: 44.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.18 {"installer":{"name":"uv","version":"0.9.18","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4ca4367490da2592e9ca5faf68bbb866317049541e455145f62d9f8fb801d393
|
|
| MD5 |
ff13d43c0a4e633bc2df8c622845caf4
|
|
| BLAKE2b-256 |
37c09111b4f2c2273765deb9b9093927a032b067631e2cfc1a443d605d3832d8
|