Orchestrate multi-agent workflows with autonomous reasoning, parallelism, rate limiting, and LLM integration
Project description
orchestrator (Python reference SDK)
In-process workflow orchestration engine + authoring API. Zero required dependencies
(stdlib asyncio); pyyaml is optional (for .yaml specs), anthropic optional (for the
real LLM adapter).
Concepts
- Workflow — a DAG of tasks loaded from a template (
Workflow.from_file/from_dict/from_json). - Skill — a registered handler
(ctx, inputs) -> output, sync orasync. - Tool — a registered callable a skill invokes via
ctx.call_tool(...). - Orchestrator — runs a workflow, enforcing concurrency + rate limits, and returns a
Report.
Minimal usage
from orchestrator import Workflow, Orchestrator, Registry, MockLLM
reg = Registry()
@reg.tool("http.get")
def http_get(url):
return {"url": url, "ok": True}
@reg.skill("market.fetch")
def fetch(ctx, inputs):
ctx.call_tool("http.get", url=f"/prices/{inputs['ticker']}")
ctx.record_decision("selected primary feed", rationale="lowest latency")
return {"ticker": inputs["ticker"], "last": 199.4}
@reg.skill("llm.classify")
def classify(ctx, inputs):
r = ctx.llm(messages=[{"role": "user", "content": inputs["text"]}], model="claude-opus-4-8")
return r.text
wf = Workflow.from_file("../spec/examples/market-analysis.json")
report = Orchestrator(reg, llm=MockLLM()).run_sync(wf, inputs={"ticker": "AAPL"})
print(report.to_json())
The ctx (SkillContext) API
| Member | Purpose |
|---|---|
ctx.inputs |
resolved task inputs |
ctx.call_tool(name, **kwargs) |
invoke a registered tool |
ctx.llm(messages, tools=, model=, **opts) |
call the configured LLM provider; usage is recorded |
ctx.record_decision(summary, rationale=, data=) |
append a critical decision to the trace |
ctx.log(msg), ctx.attempt, ctx.cancelled |
logging / retry attempt / cooperative cancel flag |
Report
report.to_dict() / report.to_json() give: status, duration_ms, per-task
{status, duration_ms, attempts, llm, decisions, error}, critical_path (bottleneck chain),
totals (llm tokens, tool calls, retries), errors, and the resolved output.
Run
python3 examples/run_market.py
python3 tests/test_engine.py
Using a real LLM
from orchestrator import AnthropicProvider, Orchestrator
orch = Orchestrator(reg, llm=AnthropicProvider(model="claude-opus-4-8")) # needs ANTHROPIC_API_KEY + `pip install anthropic`
Confirm current Claude model IDs/limits from the Claude API reference before pinning them.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file agentic_workflow_orchestrator-1.1.0.tar.gz.
File metadata
- Download URL: agentic_workflow_orchestrator-1.1.0.tar.gz
- Upload date:
- Size: 28.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.9.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e95d5cbc5849b8541692c328fb24ec906805002f8f55830fcb79ad517371b99d
|
|
| MD5 |
a45dbff611e855b06513d4677cca9b8d
|
|
| BLAKE2b-256 |
bd3b1ff2f68ab74f68bdc1e610e001e9e479046a6d7c47b08647e3e3d5ab31eb
|
File details
Details for the file agentic_workflow_orchestrator-1.1.0-py3-none-any.whl.
File metadata
- Download URL: agentic_workflow_orchestrator-1.1.0-py3-none-any.whl
- Upload date:
- Size: 21.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.9.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
34fa05b76681937cc33c9bbf708e95666019f324b268971ea52da2c85be19a7d
|
|
| MD5 |
5a97c8be0c8a16180dd9c24e285a345a
|
|
| BLAKE2b-256 |
a3e3432c2dd9515c3ada20317d298432856b0a6a7146f92d22ef4a7f6d60f2b7
|