Skip to main content

Orchestrate multi-agent workflows with autonomous reasoning, parallelism, rate limiting, and LLM integration

Project description

orchestrator (Python reference SDK)

In-process workflow orchestration engine + authoring API. Zero required dependencies (stdlib asyncio); pyyaml is optional (for .yaml specs), anthropic optional (for the real LLM adapter).

Concepts

  • Workflow — a DAG of tasks loaded from a template (Workflow.from_file/from_dict/from_json).
  • Skill — a registered handler (ctx, inputs) -> output, sync or async.
  • Tool — a registered callable a skill invokes via ctx.call_tool(...).
  • Orchestrator — runs a workflow, enforcing concurrency + rate limits, and returns a Report.

Minimal usage

from orchestrator import Workflow, Orchestrator, Registry, MockLLM

reg = Registry()

@reg.tool("http.get")
def http_get(url):
    return {"url": url, "ok": True}

@reg.skill("market.fetch")
def fetch(ctx, inputs):
    ctx.call_tool("http.get", url=f"/prices/{inputs['ticker']}")
    ctx.record_decision("selected primary feed", rationale="lowest latency")
    return {"ticker": inputs["ticker"], "last": 199.4}

@reg.skill("llm.classify")
def classify(ctx, inputs):
    r = ctx.llm(messages=[{"role": "user", "content": inputs["text"]}], model="claude-opus-4-8")
    return r.text

wf = Workflow.from_file("../spec/examples/market-analysis.json")
report = Orchestrator(reg, llm=MockLLM()).run_sync(wf, inputs={"ticker": "AAPL"})
print(report.to_json())

The ctx (SkillContext) API

Member Purpose
ctx.inputs resolved task inputs
ctx.call_tool(name, **kwargs) invoke a registered tool
ctx.llm(messages, tools=, model=, **opts) call the configured LLM provider; usage is recorded
ctx.record_decision(summary, rationale=, data=) append a critical decision to the trace
ctx.log(msg), ctx.attempt, ctx.cancelled logging / retry attempt / cooperative cancel flag

Report

report.to_dict() / report.to_json() give: status, duration_ms, per-task {status, duration_ms, attempts, llm, decisions, error}, critical_path (bottleneck chain), totals (llm tokens, tool calls, retries), errors, and the resolved output.

Run

python3 examples/run_market.py
python3 tests/test_engine.py

Using a real LLM

from orchestrator import AnthropicProvider, Orchestrator
orch = Orchestrator(reg, llm=AnthropicProvider(model="claude-opus-4-8"))  # needs ANTHROPIC_API_KEY + `pip install anthropic`

Confirm current Claude model IDs/limits from the Claude API reference before pinning them.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

agentic_workflow_orchestrator-1.1.1.tar.gz (29.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

agentic_workflow_orchestrator-1.1.1-py3-none-any.whl (21.6 kB view details)

Uploaded Python 3

File details

Details for the file agentic_workflow_orchestrator-1.1.1.tar.gz.

File metadata

File hashes

Hashes for agentic_workflow_orchestrator-1.1.1.tar.gz
Algorithm Hash digest
SHA256 43c1c02078ca80b26e63132b64aaff76eeb824649daedf17ec20bf4435b97003
MD5 13b6dbe869f5d5ca9199d9c3c77d6845
BLAKE2b-256 d9c56ea7a78fff0f02cd434d17098ea1ce41b582637f39e96b0bd7668fcbee42

See more details on using hashes here.

File details

Details for the file agentic_workflow_orchestrator-1.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for agentic_workflow_orchestrator-1.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 808a760d7569e1f2b47a24f10ff3cb1e11045d92023ddc92a73bc60696090df7
MD5 581e4d33d01fb3e9d1acc3ffe000167c
BLAKE2b-256 de658e1faa17bf7a82238e2afb1dc745a1869b0f53f43f3c50fb142138fbf9f8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page