Skip to main content

Config-agnostic LLM pipeline graph executor — turn skills into flows

Project description

Skillflow

Config-agnostic LLM pipeline graph executor. Define multi-agent pipelines as YAML DAGs — skillflow handles traversal, tool execution, checkpoints, recovery, and event streaming on SQLite.

Install

pip install skillflow-py      # PyPI
pip install -e ~/skillflow    # from repo (editable)

Or clone and use the install script, which also registers CLI commands:

git clone https://github.com/linxuhao/SkillFlow.git
bash skillflow/scripts/install.sh

CLI commands registered in ~/.local/bin/:

Command Description
skillflow-lint Validate pipeline YAML files (one-shot)
skillflow-run Stateless pipeline runner (agent calls via CLI)
skillflow-convert Convert a skill description → pipeline YAML
skillflow-lint configs/*.yaml               # one-shot validation
skillflow-run --graph pipeline.yaml --action next   # start a pipeline (returns JSON)
skillflow-run --action submit --run-id <id> --result '{"key": "val"}'
skillflow-convert my_skill.md -o pipeline.yaml

PyPI publish

pip install build twine
python3 -m build
twine upload dist/*

Getting Started

Skillflow runs pipelines in two modes.

Framework Mode

Skillflow is embedded in a host application. The host drives the loop — skillflow handles traversal, tool execution, and state. The host only executes agent steps via StepRunner.

from skillflow import SkillFlow, PipelineGraph, StepResult

graph = PipelineGraph.from_yaml("tests/fixtures/minimal_1step.yaml")

sf = SkillFlow(":memory:")
sf.register_graph(graph)
sf.register_agent_config("echo_agent", model="host")

run_id = sf.create_run("minimal_1step")
sf.start_run(run_id)

while True:
    sf.advance_run(run_id)
    claimed = sf.claim_next_step(run_id)
    if claimed is None:
        break  # completed or paused
    # Host StepRunner executes the agent step here
    sf.confirm_step(claimed.token, StepResult(outputs={}, flags={}))

In framework mode, all tools auto-execute inline — skillflow runs native tools and custom tools without involving the host agent.

Config reference: tests/fixtures/minimal_1step.yaml.

Runner Mode

Runner mode is the language-agnostic interface. An LLM agent (or any program) drives a pipeline by calling skillflow-run as a CLI tool. Each invocation is a fresh process — only the SQLite DB persists state.

Pass --graph once with --action start. The graph path is stored in the DB. All subsequent calls use --run-id to reconnect — no --graph needed.

# 1. Start a pipeline — pass --graph once, get the first step back
$ skillflow-run --graph pipeline.yaml --action start
{"status": "in_progress", "run_id": "abc123", "step": "analyze", "instruction": "..."}

# 2. Submit work for the current step (no --graph needed)
$ skillflow-run --action submit --run-id abc123 \
    --result '{"issues": [{"file": "app.py", "severity": "high"}]}'
{"status": "in_progress", "run_id": "abc123", "step": "summarize", "instruction": "..."}

# 3. When a checkpoint step completes, the run pauses
{"status": "paused", "checkpoint_label": "Review Summary — approve to commit, reject to revise"}

# 3a. Human approves (no --graph needed)
$ skillflow-run --action approve --run-id abc123
{"status": "in_progress", "run_id": "abc123", "step": "apply_fixes", ...}

# 3b. Or human rejects with feedback
$ skillflow-run --action reject --run-id abc123 \
    --feedback "Severity of bare except should be high, not medium"

# 4. Loop continues until the pipeline completes
{"status": "completed", "steps_completed": 3, "outputs": {...}}

The agent loop in pseudocode:

resp = run("--graph", graph, "--action", "start")
while resp.status == "in_progress":
    # Steps with output.fixed require files written to output_dir before submit
    if resp.expected_files:
        for fname in resp.expected_files:
            write_file(resp.output_dir / fname, generate_content(resp))
    work = do_the_work(resp.instruction, resp.tools)
    resp = run("--action", "submit", "--run-id", resp.run_id,
               "--result", json(work))
    # If validation failed, resp.validation_error is set and resp.step repeats
    if resp.validation_error:
        fix_the_error(resp.validation_error)
        continue
    if resp.status == "paused":
        show_checkpoint_to_human(resp.checkpoint_label)
        if human_approves:
            resp = run("--action", "approve", "--run-id", resp.run_id)
        else:
            resp = run("--action", "reject", "--run-id", resp.run_id,
                       "--feedback", human_feedback)
# resp.status == "completed" — pipeline done

Response fields beyond status:

Field When present Meaning
output_dir Steps with output.fixed Write expected files here before calling submit
expected_files Steps with output.fixed File names to create (e.g. ["findings.json"])
validation_error Submit rejected by validator Why the previous submit failed — fix and re-submit
tools Agent steps Write helpers (write_*, create_*, append_*) with format specs

Runner mode delegates unknown tools to the agent (resp.tool_name / resp.tool_params). For programmatic use, the same API is available via SkillTool.

Node Types

Type Description
agent LLM step — host app executes via StepRunner protocol
tool Auto-executed by skillflow (native), or delegated to agent in runner mode (custom)
gate Auto-resolved using match conditions against step output flags
loop Iterates over a JSON list from a workspace file, instantiating sub-steps per item

Transition Matching

Five match strategies. See tests/fixtures/dpe_full.yaml for a complete pipeline using all of them:

match: { field: "passed", value: true }                          # step output flags
match: { from_file: "review_verdict.json", field: "passed", value: true }  # output file
match: { from: "checkpoint", value: "approved" }                 # checkpoint routing
match: { _error: true }                                          # error handler
# (no match key)                                                 # always match

Context Injection

context:
  - source: { step: "1" }
  - source: { step: "2", mode: "interfaces" }
  - source: { config: "meta", output: "brief.md" }
  - source: { tool: "dir_tree" }

Checkpoints

Agent steps can pause for human approval (tests/fixtures/checkpoint_cycle.yaml):

sf.reject_checkpoint(run_id, "draft", "Add more detail to the analysis")

Output Validation

Steps declare validation specs auto-executed by skillflow. See tests/fixtures/skill_review.yaml for inline JSON Schema validation, or tests/fixtures/lifecycle_hooks.yaml for syntax_lint + py_compile validators.

Available validators: json_schema, syntax_lint, py_compile, pytest, file_exists.

Lifecycle Hooks

Steps with output.mode: "write" can trigger deliver and post-deliver hooks. See tests/fixtures/lifecycle_hooks.yaml:

lifecycle:
  on_deliver:
    tool: "repo_apply"
    params:
      source_dir: "$STEP_DIR"
    on_failure: "retry"
    max_retries: 2
  after_deliver:
    - tool: "syntax_lint"
      files: ["*.py"]

Error Handling

Steps declare max_retries and an _error transition. See tests/fixtures/error_handler.yaml.

Feedback Loopback

Tool failures can inject output into the next step's inputs (feedback: true). See plugins/skill_converter/skill_converter.yaml — the validate_design step feeds lint errors into fix_issues.

End Conditions

Four termination strategies, combined with and/or. See tests/fixtures/end_conditions.yaml and tests/fixtures/dpe_full.yaml:

end_conditions:
  combinator: or
  conditions:
    - type: node_reached
      node: "5_review"
      result: "completed"
    - type: max_total_steps
      limit: 200
    - type: max_run_duration_seconds
      limit: 3600
    - type: flag_match
      flag: { fatal_error: true }

Stale Claim Recovery

Built into advance_run. Claims older than stale_threshold_seconds (default 300) are auto-reset:

sf = SkillFlow("pipeline.db", stale_threshold_seconds=300)

Event Streaming

All state transitions are written to skillflow_outbox. Poll for real-time notifications:

events = sf.drain_outbox(batch_size=50)
for event in events:
    print(event.event_type, event.payload)
sf.ack_outbox([e.id for e in events])

In-process subscribers via NotificationBus:

from skillflow import NotificationBus

bus = NotificationBus()
bus.subscribe("step_completed", lambda n: print(n.payload))
sf = SkillFlow(":memory:", notification_bus=bus)

Tools

Native (13 built-in)

Tool Description
read_file Read a file with line numbers
write Write content to workspace
list_tree List directory structure
dir_tree Context tree for prompt injection
json_schema Validate JSON against inline schema
syntax_lint Syntax check via ruff
py_compile Python bytecode compile
pytest Run pytest on test files
repo_apply Copy files to repo + git commit
repo_validate Multi-tool repo validation
draft_commit Move draft files to final dir + commit
file_exists Check files matching glob patterns
notify Send user-visible notifications

Custom tools

Host apps add tool directories. Each tool: {name}/tool.yaml + {name}/impl.py. Function name must match directory name.

from skillflow.tool_loader import ToolLoader

loader = ToolLoader()
loader.add_tools_dir("my_app/tools")
sf = SkillFlow(":memory:", tool_loader=loader)

Use Cases

1. Framework mode — embed skillflow in your app

Use skillflow as a library. Read the Getting Started section above and the fixture examples in tests/fixtures/.

from skillflow import SkillFlow, PipelineGraph
graph = PipelineGraph.from_yaml("my_pipeline.yaml")
sf = SkillFlow(":memory:")
sf.register_graph(graph)
# ... drive the loop with claim_next_step / confirm_step

2. Agent mode — convert skills to pipelines

skillflow-convert is a thin wrapper that calls skillflow-run with the built-in converter pipeline. The agent drives it the same way:

# Start conversion with a skill description
$ skillflow-convert --desc "Code review skill..." --action start
{"status": "in_progress", "run_id": "abc123", "step": "analyze_skill", "instruction": "..."}

# Submit analysis, continue through design → lint → fix → done (no --desc needed)
$ skillflow-convert --action submit --run-id abc123 --result '{"analysis": {...}}'

On completion, the generated pipeline YAML is at ~/.skillflow/workspaces/skill-converter/.../skill_pipeline.yaml.

Agent manuals (the tool schema + rules) are shipped in the package:

Plugin Manual Load via
skill_runner Actions, response format, rules load_agent_guide() from skillflow.plugins.skill_runner
skill_converter Step-by-step: analyze → design → lint → fix load_agent_guide() from skillflow.plugins.skill_converter

Inject these into the agent's system prompt so it knows how to call the CLI tools.

skillflow-lint pipeline.yaml                             # one-shot config validation
skillflow-run --graph pipeline.yaml --action start        # start a pipeline (returns JSON, --graph only once)
skillflow-convert --desc "..." --action start             # start a conversion

Linter (skillflow.plugins.linter)

Framework utility. Validates pipeline YAML — used as a skillflow tool (skillflow_lint) inside the converter's feedback loop, or standalone:

skillflow-lint tests/fixtures/skill_review.yaml
skillflow-lint configs/*.yaml

Package

src/skillflow/
├── core.py              # SkillFlow orchestrator (create/claim/confirm/advance)
├── graph.py             # PipelineGraph, StepNode, Transition, GraphResolver
├── tool_loader.py       # Dynamic tool schema + implementation loading
├── context.py           # ContextResolver: cross-config, step, tool sources
├── step_validation.py   # StepValidator: multi-tool output validation
├── write_tools.py       # Constrained write tool generation from output.fixed
├── workspace.py         # Per-step atomic staging directories
├── validation.py        # Optional external-schema output validation
├── recovery.py          # Stale claim recovery
├── schema.py            # SQLite DDL + migrations
├── exceptions.py        # SkillFlowError hierarchy
├── outbox.py            # OutboxConsumer for event polling
├── notifications.py     # NotificationBus for in-process subscribers
├── agent_registry.py    # Agent config registry + schema resolution
├── plugins/             # Built-in plugins
│   ├── linter/          # Config validator + skillflow_lint tool
│   ├── skill_runner/    # SkillTool — interactive pipeline facade
│   └── skill_converter/ # Skill description → pipeline YAML
└── tools/               # Native tools (13)
    ├── read_file/       ├── write/          ├── list_tree/
    ├── dir_tree/        ├── json_schema/    ├── syntax_lint/
    ├── py_compile/      ├── pytest/         ├── repo_apply/
    ├── repo_validate/   ├── draft_commit/   ├── file_exists/
    └── notify/

Tests

pytest tests/ -v       # 306 tests
pytest plugins/ -v     # 21 plugin tests

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

skillflow_py-1.0.3.tar.gz (103.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

skillflow_py-1.0.3-py3-none-any.whl (88.4 kB view details)

Uploaded Python 3

File details

Details for the file skillflow_py-1.0.3.tar.gz.

File metadata

  • Download URL: skillflow_py-1.0.3.tar.gz
  • Upload date:
  • Size: 103.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for skillflow_py-1.0.3.tar.gz
Algorithm Hash digest
SHA256 96487aa733c030f25eebe6342531d15c48c11fdd477b3783148c03f2e89a8143
MD5 e55ce881f5d645d263339a1851fc454f
BLAKE2b-256 52483515c19aaf462fd9d89c2bb657a8f3c86fd53ac7881804b8c2660e41fe93

See more details on using hashes here.

File details

Details for the file skillflow_py-1.0.3-py3-none-any.whl.

File metadata

  • Download URL: skillflow_py-1.0.3-py3-none-any.whl
  • Upload date:
  • Size: 88.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for skillflow_py-1.0.3-py3-none-any.whl
Algorithm Hash digest
SHA256 28f6f9864101c4b0ae5134c72d18755ccd45299a0ac8822d368e4f40e94a92ae
MD5 c8468f4dda3035c563053cb547bff13a
BLAKE2b-256 766b707c1b1b0cd6ad0c746b687278617b7a9d1f34a395e5c09064bdc0698fef

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page