Skip to main content

codex workflows — multi-agent orchestration over the codex app-server: fan out, pipeline, and build DAGs of agents

Project description

cowo

codex workflows. orchestrate many codex agents from plain python. each agent is a real codex thread — tools, sandbox, structured output — and N agents run as N threads multiplexed over one codex app-server process. fan out, pipeline, build DAGs.

from cowo import agent, parallel
caps = await parallel([(lambda c=c: agent(f"capital of {c}?")) for c in countries])

install

uv add cowo

cowo drives your codex CLI over its app-server JSON-RPC — it bundles nothing, so it runs anywhere codex runs (no platform-locked wheels). put codex on PATH, auth once:

bun install -g @openai/codex   # or any codex install
codex login                    # or set OPENAI_API_KEY
cowo doctor

quickstart

import asyncio
from pydantic import BaseModel
from cowo import agent, parallel, spent

class Fact(BaseModel):
    name: str
    year: int

async def main():
    pong = await agent("Reply with one word: pong")
    fact = await agent("Founding year of Tokyo.", schema=Fact)   # Fact(name='Tokyo', year=1457)
    caps = await parallel([(lambda c=c: agent(f"Capital of {c}? one word")) for c in ("France", "Japan")])
    print(pong, fact, caps, "·", spent.spent(), "tokens")

asyncio.run(main())

pass a pydantic model as schema and codex enforces it (outputSchema) — a schema'd agent cannot return malformed JSON; you get the parsed instance back.

DAGs of agents

declare the graph with airflow-style >>; it runs over a tunable concurrency limiter:

from cowo import dag, task

a = task("Analyze module A.")
b = task("Analyze module B.")
synth = task(lambda ups: f"Synthesize:\n{ups}")

[a, b] >> synth                          # a, b run concurrently; synth receives both results
result = dag(synth).run(concurrency=8)   # drain the graph through 8 live-adjustable slots

the graph is data before it runs — inspectable, scheduled, journalable. d.concurrency = 32 mid-run to drain a big queue faster.

recursion

not a primitive — a recursive function whose leaves are agent() calls:

async def summarize(chunks):
    if len(chunks) == 1:
        return await agent(f"Summarize:\n{chunks[0]}")
    mid = len(chunks) // 2
    parts = await parallel([(lambda h=h: summarize(h)) for h in (chunks[:mid], chunks[mid:])])
    return await agent("Merge:\n" + "\n".join(parts))

api

agent(prompt, *, schema, model, effort, sandbox, cwd, label) one ephemeral thread + one turn. schema = dict or pydantic BaseModel. sandbox=None is yolo; pass "read-only"/"workspace-write" to restrict
session(*, model, sandbox, cwd) .send(prompt, schema=, effort=) — a persistent thread that remembers
parallel(thunks) / pipeline(items, *stages) imperative fan-out / staged flow
dag(*terminals).run(concurrency=N) + task + >> declarative agent DAG
spent budget: .spent() / .remaining() / .total, summed across all agents
run(coro) asyncio.run that also closes the app-server
COWO_CONCURRENCY global concurrency cap (default min(16, cpu-2))

how it works

one codex app-server subprocess; agent() = thread/start + turn/start, events demuxed by threadId; budget summed from token-usage notifications. ~120 lines over the protocol — see client.py.

roadmap

codex ships a native actor runtime (multi_agents_v2: spawn/send/wait/close, mailboxes, supervision). v0.2 exposes it as a message-passing actor substrate — see ACTORS.md.

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cowo-0.1.1.tar.gz (7.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

cowo-0.1.1-py3-none-any.whl (10.2 kB view details)

Uploaded Python 3

File details

Details for the file cowo-0.1.1.tar.gz.

File metadata

  • Download URL: cowo-0.1.1.tar.gz
  • Upload date:
  • Size: 7.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.16 {"installer":{"name":"uv","version":"0.11.16","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for cowo-0.1.1.tar.gz
Algorithm Hash digest
SHA256 747edcaaab072f1e875743b978d13f6cb2399a9bdb51ee10161721b092b1f6bd
MD5 49019009a161756b5ce357d02cd09cee
BLAKE2b-256 b53970d4504595ef31004ab46d14b4a7cd6a560b82324a67ba44f41f7e91dffa

See more details on using hashes here.

File details

Details for the file cowo-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: cowo-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 10.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.16 {"installer":{"name":"uv","version":"0.11.16","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for cowo-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 a865d63a2ee063bd4c447755bed65fbfcfb23f57b1d2e2ebd688de57a044c5b8
MD5 8aacab8510e478bed2b3a6a3cb29b2e5
BLAKE2b-256 b214a6a34947f77fff184f8363c98eba3c5dca982a0d87e1ddf1b2428285bf5d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page