Skip to main content

Orchestrate multi-agent workflows with autonomous reasoning, parallelism, rate limiting, and LLM integration

Project description

orchestrator (Python reference SDK)

In-process workflow orchestration engine + authoring API. Zero required dependencies (stdlib asyncio); pyyaml is optional (for .yaml specs), anthropic optional (for the real LLM adapter).

Concepts

  • Workflow — a DAG of tasks loaded from a template (Workflow.from_file/from_dict/from_json).
  • Skill — a registered handler (ctx, inputs) -> output, sync or async.
  • Tool — a registered callable a skill invokes via ctx.call_tool(...).
  • Orchestrator — runs a workflow, enforcing concurrency + rate limits, and returns a Report.

Minimal usage

from orchestrator import Workflow, Orchestrator, Registry, MockLLM

reg = Registry()

@reg.tool("http.get")
def http_get(url):
    return {"url": url, "ok": True}

@reg.skill("market.fetch")
def fetch(ctx, inputs):
    ctx.call_tool("http.get", url=f"/prices/{inputs['ticker']}")
    ctx.record_decision("selected primary feed", rationale="lowest latency")
    return {"ticker": inputs["ticker"], "last": 199.4}

@reg.skill("llm.classify")
def classify(ctx, inputs):
    r = ctx.llm(messages=[{"role": "user", "content": inputs["text"]}], model="claude-opus-4-8")
    return r.text

wf = Workflow.from_file("../spec/examples/market-analysis.json")
report = Orchestrator(reg, llm=MockLLM()).run_sync(wf, inputs={"ticker": "AAPL"})
print(report.to_json())

The ctx (SkillContext) API

Member Purpose
ctx.inputs resolved task inputs
ctx.call_tool(name, **kwargs) invoke a registered tool
ctx.llm(messages, tools=, model=, **opts) call the configured LLM provider; usage is recorded
ctx.record_decision(summary, rationale=, data=) append a critical decision to the trace
ctx.log(msg), ctx.attempt, ctx.cancelled logging / retry attempt / cooperative cancel flag

Report

report.to_dict() / report.to_json() give: status, duration_ms, per-task {status, duration_ms, attempts, llm, decisions, error}, critical_path (bottleneck chain), totals (llm tokens, tool calls, retries), errors, and the resolved output.

Run

python3 examples/run_market.py
python3 tests/test_engine.py

Using a real LLM

from orchestrator import AnthropicProvider, Orchestrator
orch = Orchestrator(reg, llm=AnthropicProvider(model="claude-opus-4-8"))  # needs ANTHROPIC_API_KEY + `pip install anthropic`

Confirm current Claude model IDs/limits from the Claude API reference before pinning them.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

agentic_workflow_orchestrator-1.0.0.tar.gz (26.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

agentic_workflow_orchestrator-1.0.0-py3-none-any.whl (19.6 kB view details)

Uploaded Python 3

File details

Details for the file agentic_workflow_orchestrator-1.0.0.tar.gz.

File metadata

File hashes

Hashes for agentic_workflow_orchestrator-1.0.0.tar.gz
Algorithm Hash digest
SHA256 aa1b5cd054c7c1f42b28499e22fed6acdee8c3058ef7a73e90c247c9b7e1c17b
MD5 ce01be4f2f7ada3dfb7fb20dfba9b074
BLAKE2b-256 336d0c6521eb02d6ab9290083a6bb39ceea459480caf91a7eebfc561c785cfcd

See more details on using hashes here.

File details

Details for the file agentic_workflow_orchestrator-1.0.0-py3-none-any.whl.

File metadata

File hashes

Hashes for agentic_workflow_orchestrator-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 34985d8f0f4f2240184ad0aabbd26578399f79cc25d3368c5e101cff6061fb77
MD5 3d89d5d8894afa26a9b20d8f3058670a
BLAKE2b-256 2ff010ada094fb7142d0a795fae84f14aebc3977712c5c680b77304c83e86805

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page