Skip to main content

Orchestrate multi-agent workflows with autonomous reasoning, parallelism, rate limiting, and LLM integration

Project description

orchestrator (Python reference SDK)

In-process workflow orchestration engine + authoring API. Zero required dependencies (stdlib asyncio); pyyaml is optional (for .yaml specs), anthropic optional (for the real LLM adapter).

Concepts

  • Workflow — a DAG of tasks loaded from a template (Workflow.from_file/from_dict/from_json).
  • Skill — a registered handler (ctx, inputs) -> output, sync or async.
  • Tool — a registered callable a skill invokes via ctx.call_tool(...).
  • Orchestrator — runs a workflow, enforcing concurrency + rate limits, and returns a Report.

Minimal usage

from orchestrator import Workflow, Orchestrator, Registry, MockLLM

reg = Registry()

@reg.tool("http.get")
def http_get(url):
    return {"url": url, "ok": True}

@reg.skill("market.fetch")
def fetch(ctx, inputs):
    ctx.call_tool("http.get", url=f"/prices/{inputs['ticker']}")
    ctx.record_decision("selected primary feed", rationale="lowest latency")
    return {"ticker": inputs["ticker"], "last": 199.4}

@reg.skill("llm.classify")
def classify(ctx, inputs):
    r = ctx.llm(messages=[{"role": "user", "content": inputs["text"]}], model="claude-opus-4-8")
    return r.text

wf = Workflow.from_file("../spec/examples/market-analysis.json")
report = Orchestrator(reg, llm=MockLLM()).run_sync(wf, inputs={"ticker": "AAPL"})
print(report.to_json())

The ctx (SkillContext) API

Member Purpose
ctx.inputs resolved task inputs
ctx.call_tool(name, **kwargs) invoke a registered tool
ctx.llm(messages, tools=, model=, **opts) call the configured LLM provider; usage is recorded
ctx.record_decision(summary, rationale=, data=) append a critical decision to the trace
ctx.log(msg), ctx.attempt, ctx.cancelled logging / retry attempt / cooperative cancel flag

Report

report.to_dict() / report.to_json() give: status, duration_ms, per-task {status, duration_ms, attempts, llm, decisions, error}, critical_path (bottleneck chain), totals (llm tokens, tool calls, retries), errors, and the resolved output.

Run

python3 examples/run_market.py
python3 tests/test_engine.py

Using a real LLM

from orchestrator import AnthropicProvider, Orchestrator
orch = Orchestrator(reg, llm=AnthropicProvider(model="claude-opus-4-8"))  # needs ANTHROPIC_API_KEY + `pip install anthropic`

Confirm current Claude model IDs/limits from the Claude API reference before pinning them.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

agentic_workflow_orchestrator-1.1.0.tar.gz (28.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

agentic_workflow_orchestrator-1.1.0-py3-none-any.whl (21.6 kB view details)

Uploaded Python 3

File details

Details for the file agentic_workflow_orchestrator-1.1.0.tar.gz.

File metadata

File hashes

Hashes for agentic_workflow_orchestrator-1.1.0.tar.gz
Algorithm Hash digest
SHA256 e95d5cbc5849b8541692c328fb24ec906805002f8f55830fcb79ad517371b99d
MD5 a45dbff611e855b06513d4677cca9b8d
BLAKE2b-256 bd3b1ff2f68ab74f68bdc1e610e001e9e479046a6d7c47b08647e3e3d5ab31eb

See more details on using hashes here.

File details

Details for the file agentic_workflow_orchestrator-1.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for agentic_workflow_orchestrator-1.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 34fa05b76681937cc33c9bbf708e95666019f324b268971ea52da2c85be19a7d
MD5 5a97c8be0c8a16180dd9c24e285a345a
BLAKE2b-256 a3e3432c2dd9515c3ada20317d298432856b0a6a7146f92d22ef4a7f6d60f2b7

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page