Skip to main content

Config-agnostic LLM pipeline graph executor — turn skills into flows

Project description

Skillflow

Config-agnostic LLM pipeline graph executor. Define multi-agent pipelines as YAML DAGs — skillflow handles traversal, tool execution, checkpoints, recovery, and event streaming on SQLite.

Install

pip install skillflow-py      # PyPI
pip install -e ~/skillflow    # from repo (editable)

Or clone and use the install script, which also registers CLI commands:

git clone https://github.com/linxuhao/SkillFlow.git
bash skillflow/scripts/install.sh

CLI commands registered in ~/.local/bin/:

Command Description
skillflow-lint Validate pipeline YAML files (one-shot)
skillflow-run Stateless pipeline runner (agent calls via CLI)
skillflow-convert Convert a skill description → pipeline YAML
skillflow-lint configs/*.yaml               # one-shot validation
skillflow-run --graph pipeline.yaml --action next   # start a pipeline (returns JSON)
skillflow-run --action submit --run-id <id> --result '{"key": "val"}'
skillflow-convert my_skill.md -o pipeline.yaml

PyPI publish

pip install build twine
python3 -m build
twine upload dist/*

Getting Started

Skillflow runs pipelines in two modes.

Framework Mode

Skillflow is embedded in a host application. The host drives the loop — skillflow handles traversal, tool execution, and state. The host only executes agent steps via StepRunner.

from skillflow import SkillFlow, PipelineGraph, StepResult

graph = PipelineGraph.from_yaml("tests/fixtures/minimal_1step.yaml")

sf = SkillFlow(":memory:")
sf.register_graph(graph)
sf.register_agent_config("echo_agent", model="host")

run_id = sf.create_run("minimal_1step")
sf.start_run(run_id)

while True:
    sf.advance_run(run_id)
    claimed = sf.claim_next_step(run_id)
    if claimed is None:
        break  # completed or paused
    # Host StepRunner executes the agent step here
    sf.confirm_step(claimed.token, StepResult(outputs={}, flags={}))

In framework mode, all tools auto-execute inline — skillflow runs native tools and custom tools without involving the host agent.

Config reference: tests/fixtures/minimal_1step.yaml.

Runner Mode

Skillflow is driven interactively via SkillTool. The pipeline exposes steps as instructions — a human or LLM agent calls action="next" / "submit" / "approve" / "reject".

from skillflow import SkillFlow, PipelineGraph
from skillflow.plugins.skill_runner import SkillTool

graph = PipelineGraph.from_yaml("tests/fixtures/skill_review.yaml")
sf = SkillFlow(":memory:", delegate_tools_to_agent=True)
sf.register_graph(graph)
sf.register_agent_config("review_analyst", model="host")
# ... register other agent configs referenced by the graph

tool = SkillTool(sf, "skill_review")
resp = tool(action="next")

while resp.status == "in_progress":
    print(resp.instruction)
    # Agent does work, produces output...
    resp = tool(action="submit", result={"findings": {...}})
# resp.status == "completed"

In runner mode, native tools auto-execute but custom and unknown tools are delegated to the agent (via resp.tool_name / resp.tool_params). Use skillflow-convert to generate a pipeline, then drive it via SkillTool.

Node Types

Type Description
agent LLM step — host app executes via StepRunner protocol
tool Auto-executed by skillflow (native), or delegated to agent in runner mode (custom)
gate Auto-resolved using match conditions against step output flags
loop Iterates over a JSON list from a workspace file, instantiating sub-steps per item

Transition Matching

Five match strategies. See tests/fixtures/dpe_full.yaml for a complete pipeline using all of them:

match: { field: "passed", value: true }                          # step output flags
match: { from_file: "review_verdict.json", field: "passed", value: true }  # output file
match: { from: "checkpoint", value: "approved" }                 # checkpoint routing
match: { _error: true }                                          # error handler
# (no match key)                                                 # always match

Context Injection

context:
  - source: { step: "1" }
  - source: { step: "2", mode: "interfaces" }
  - source: { config: "meta", output: "brief.md" }
  - source: { tool: "dir_tree" }

Checkpoints

Agent steps can pause for human approval (tests/fixtures/checkpoint_cycle.yaml):

sf.reject_checkpoint(run_id, "draft", "Add more detail to the analysis")

Output Validation

Steps declare validation specs auto-executed by skillflow. See tests/fixtures/skill_review.yaml for inline JSON Schema validation, or tests/fixtures/lifecycle_hooks.yaml for syntax_lint + py_compile validators.

Available validators: json_schema, syntax_lint, py_compile, pytest, file_exists.

Lifecycle Hooks

Steps with output.mode: "write" can trigger deliver and post-deliver hooks. See tests/fixtures/lifecycle_hooks.yaml:

lifecycle:
  on_deliver:
    tool: "repo_apply"
    params:
      source_dir: "$STEP_DIR"
    on_failure: "retry"
    max_retries: 2
  after_deliver:
    - tool: "syntax_lint"
      files: ["*.py"]

Error Handling

Steps declare max_retries and an _error transition. See tests/fixtures/error_handler.yaml.

Feedback Loopback

Tool failures can inject output into the next step's inputs (feedback: true). See plugins/skill_converter/skill_converter.yaml — the validate_design step feeds lint errors into fix_issues.

End Conditions

Four termination strategies, combined with and/or. See tests/fixtures/end_conditions.yaml and tests/fixtures/dpe_full.yaml:

end_conditions:
  combinator: or
  conditions:
    - type: node_reached
      node: "5_review"
      result: "completed"
    - type: max_total_steps
      limit: 200
    - type: max_run_duration_seconds
      limit: 3600
    - type: flag_match
      flag: { fatal_error: true }

Stale Claim Recovery

Built into advance_run. Claims older than stale_threshold_seconds (default 300) are auto-reset:

sf = SkillFlow("pipeline.db", stale_threshold_seconds=300)

Event Streaming

All state transitions are written to skillflow_outbox. Poll for real-time notifications:

events = sf.drain_outbox(batch_size=50)
for event in events:
    print(event.event_type, event.payload)
sf.ack_outbox([e.id for e in events])

In-process subscribers via NotificationBus:

from skillflow import NotificationBus

bus = NotificationBus()
bus.subscribe("step_completed", lambda n: print(n.payload))
sf = SkillFlow(":memory:", notification_bus=bus)

Tools

Native (13 built-in)

Tool Description
read_file Read a file with line numbers
write Write content to workspace
list_tree List directory structure
dir_tree Context tree for prompt injection
json_schema Validate JSON against inline schema
syntax_lint Syntax check via ruff
py_compile Python bytecode compile
pytest Run pytest on test files
repo_apply Copy files to repo + git commit
repo_validate Multi-tool repo validation
draft_commit Move draft files to final dir + commit
file_exists Check files matching glob patterns
notify Send user-visible notifications

Custom tools

Host apps add tool directories. Each tool: {name}/tool.yaml + {name}/impl.py. Function name must match directory name.

from skillflow.tool_loader import ToolLoader

loader = ToolLoader()
loader.add_tools_dir("my_app/tools")
sf = SkillFlow(":memory:", tool_loader=loader)

Use Cases

1. Framework mode — embed skillflow in your app

Use skillflow as a library. Read the Getting Started section above and the fixture examples in tests/fixtures/.

from skillflow import SkillFlow, PipelineGraph
graph = PipelineGraph.from_yaml("my_pipeline.yaml")
sf = SkillFlow(":memory:")
sf.register_graph(graph)
# ... drive the loop with claim_next_step / confirm_step

2. Agent mode — convert skills to pipelines

LLM agents use the converter + runner plugins to turn skill descriptions into skillflow pipelines. The agent calls a tool named run_skill repeatedly — the runner tells it what to do at each step.

from skillflow.plugins.skill_converter import setup_converter
from skillflow.plugins.skill_runner import load_agent_guide

# Give the agent its user manual
system_prompt = load_agent_guide()  # ← includes protocol, response format, rules

tool = setup_converter(sf, description_file="my_skill.md")
resp = tool(action="next")
# ... agent loops: submit result → get next instruction → ...
# resp.status == "completed" → pipeline YAML ready

Agent manuals are shipped in the package:

Plugin Manual How to load
skill_runner Agent protocol — actions, SkillResponse format, rules load_agent_guide() from skillflow.plugins.skill_runner
skill_converter Step-by-step guide — analyze → design → lint → fix load_agent_guide() from skillflow.plugins.skill_converter

Inject the runner manual into the agent's system prompt. Inject the converter manual when the agent is asked to convert a skill.

CLI tools for manual use:

skillflow-lint pipeline.yaml                    # one-shot config validation
skillflow-run --graph pipeline.yaml --action next  # start a pipeline
skillflow-convert my_skill.md -o out.yaml       # convert a skill description

Linter (skillflow.plugins.linter)

Framework utility. Validates pipeline YAML — used as a skillflow tool (skillflow_lint) inside the converter's feedback loop, or standalone:

skillflow-lint tests/fixtures/skill_review.yaml
skillflow-lint configs/*.yaml

Package

src/skillflow/
├── core.py              # SkillFlow orchestrator (create/claim/confirm/advance)
├── graph.py             # PipelineGraph, StepNode, Transition, GraphResolver
├── tool_loader.py       # Dynamic tool schema + implementation loading
├── context.py           # ContextResolver: cross-config, step, tool sources
├── step_validation.py   # StepValidator: multi-tool output validation
├── write_tools.py       # Constrained write tool generation from output.fixed
├── workspace.py         # Per-step atomic staging directories
├── validation.py        # Optional external-schema output validation
├── recovery.py          # Stale claim recovery
├── schema.py            # SQLite DDL + migrations
├── exceptions.py        # SkillFlowError hierarchy
├── outbox.py            # OutboxConsumer for event polling
├── notifications.py     # NotificationBus for in-process subscribers
├── agent_registry.py    # Agent config registry + schema resolution
├── plugins/             # Built-in plugins
│   ├── linter/          # Config validator + skillflow_lint tool
│   ├── skill_runner/    # SkillTool — interactive pipeline facade
│   └── skill_converter/ # Skill description → pipeline YAML
└── tools/               # Native tools (13)
    ├── read_file/       ├── write/          ├── list_tree/
    ├── dir_tree/        ├── json_schema/    ├── syntax_lint/
    ├── py_compile/      ├── pytest/         ├── repo_apply/
    ├── repo_validate/   ├── draft_commit/   ├── file_exists/
    └── notify/

Tests

pytest tests/ -v       # 306 tests
pytest plugins/ -v     # 21 plugin tests

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

skillflow_py-1.0.2.tar.gz (99.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

skillflow_py-1.0.2-py3-none-any.whl (84.6 kB view details)

Uploaded Python 3

File details

Details for the file skillflow_py-1.0.2.tar.gz.

File metadata

  • Download URL: skillflow_py-1.0.2.tar.gz
  • Upload date:
  • Size: 99.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for skillflow_py-1.0.2.tar.gz
Algorithm Hash digest
SHA256 952d9a2a9bea4663e6d45d13a50ca562e7c4764de589c28a71441da12368fc62
MD5 7dd9236e268140f6a9d84e0191047579
BLAKE2b-256 86fb4f220e8528b24da62487a4821c218b99781e99f70afe06113e8fb1b41366

See more details on using hashes here.

File details

Details for the file skillflow_py-1.0.2-py3-none-any.whl.

File metadata

  • Download URL: skillflow_py-1.0.2-py3-none-any.whl
  • Upload date:
  • Size: 84.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for skillflow_py-1.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 1195a60a9751fae96c6a81d834ffc60863bf2b4491c905c57a6de6b2dd0f4555
MD5 c12768ee9c5620945310a157b8bc6c0c
BLAKE2b-256 5493b3fe2341da5d44e92a8ecdd60e45fe99610fe7fffd8f7a5da4d90ab7c3f9

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page