Skip to main content

Orchestrate multi-agent workflows with autonomous reasoning, parallelism, rate limiting, and LLM integration

Project description

orchestrator (Python reference SDK)

In-process workflow orchestration engine + authoring API. Zero required dependencies (stdlib asyncio); pyyaml is optional (for .yaml specs), anthropic optional (for the real LLM adapter).

Concepts

  • Workflow — a DAG of tasks loaded from a template (Workflow.from_file/from_dict/from_json).
  • Skill — a registered handler (ctx, inputs) -> output, sync or async.
  • Tool — a registered callable a skill invokes via ctx.call_tool(...).
  • Orchestrator — runs a workflow, enforcing concurrency + rate limits, and returns a Report.

Minimal usage

from orchestrator import Workflow, Orchestrator, Registry, MockLLM

reg = Registry()

@reg.tool("http.get")
def http_get(url):
    return {"url": url, "ok": True}

@reg.skill("market.fetch")
def fetch(ctx, inputs):
    ctx.call_tool("http.get", url=f"/prices/{inputs['ticker']}")
    ctx.record_decision("selected primary feed", rationale="lowest latency")
    return {"ticker": inputs["ticker"], "last": 199.4}

@reg.skill("llm.classify")
def classify(ctx, inputs):
    r = ctx.llm(messages=[{"role": "user", "content": inputs["text"]}], model="claude-opus-4-8")
    return r.text

wf = Workflow.from_file("../spec/examples/market-analysis.json")
report = Orchestrator(reg, llm=MockLLM()).run_sync(wf, inputs={"ticker": "AAPL"})
print(report.to_json())

The ctx (SkillContext) API

Member Purpose
ctx.inputs resolved task inputs
ctx.call_tool(name, **kwargs) invoke a registered tool
ctx.llm(messages, tools=, model=, **opts) call the configured LLM provider; usage is recorded
ctx.record_decision(summary, rationale=, data=) append a critical decision to the trace
ctx.log(msg), ctx.attempt, ctx.cancelled logging / retry attempt / cooperative cancel flag

Report

report.to_dict() / report.to_json() give: status, duration_ms, per-task {status, duration_ms, attempts, llm, decisions, error}, critical_path (bottleneck chain), totals (llm tokens, tool calls, retries), errors, and the resolved output.

Run

python3 examples/run_market.py
python3 tests/test_engine.py

Using a real LLM

from orchestrator import AnthropicProvider, Orchestrator
orch = Orchestrator(reg, llm=AnthropicProvider(model="claude-opus-4-8"))  # needs ANTHROPIC_API_KEY + `pip install anthropic`

Confirm current Claude model IDs/limits from the Claude API reference before pinning them.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

agentic_workflow_orchestrator-1.1.2.tar.gz (2.2 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

agentic_workflow_orchestrator-1.1.2-py3-none-any.whl (2.1 MB view details)

Uploaded Python 3

File details

Details for the file agentic_workflow_orchestrator-1.1.2.tar.gz.

File metadata

File hashes

Hashes for agentic_workflow_orchestrator-1.1.2.tar.gz
Algorithm Hash digest
SHA256 599372e9aed13ef6109c408cbb356a6535fea5fc0a43fb60d59194291c145f9f
MD5 73c697cdbe8b2496de1e47746e843b9f
BLAKE2b-256 261d3b0c9ed3004167f6d067563bd7864ebde3adf253b995545a13687a409a5a

See more details on using hashes here.

File details

Details for the file agentic_workflow_orchestrator-1.1.2-py3-none-any.whl.

File metadata

File hashes

Hashes for agentic_workflow_orchestrator-1.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 f58089c607f0a4de9322494ffa22fdfc66e919654f8c5608b2f64406da0edaf1
MD5 f34b96dc144d1d66f9f6f6d144906691
BLAKE2b-256 a289a52429aa8edce6199c0f048b4b483e11f3399ede8c5faf35eb3ff443a260

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page