StageFlow: Pipeline framework for stages
Project description
StageFlow
StageFlow is a lightweight framework for describing and running JSON pipelines with nodes, stages, and schema validation.
Features
- Node types:
stage,condition,parallel,map,subpipeline,terminal. - Custom stages (async
run) with documented arguments/config/outputs. - Input waiting (
wait_input), event history, session snapshots. - Built-in stages for dict/list/string/logic utilities.
- Pipeline JSON Schema (stage enum injected on the fly) and HTML doc generator.
Installation
python -m venv .venv
source .venv/bin/activate # Windows: .venv\Scripts\activate
pip install stageflow-framework
# or for local dev:
# pip install -e .
Requires Python 3.10+.
Quickstart
- Register your stage:
from stageflow.core.stage import BaseStage, register_stage
@register_stage("HelloStage")
class HelloStage(BaseStage):
"""
description: "Custom stage example"
arguments:
name: string
outputs:
greeting: string
"""
async def run(self):
name = self.get_arguments().get("name", "world")
self.set_outputs({"greeting": f"Hello, {name}!"})
- Describe a pipeline in JSON:
pipeline_dict = {
"entry": "start",
"nodes": [
{
"id": "start",
"type": "stage",
"stage": "HelloStage",
"arguments": {"name": "user_name"},
"outputs": {"greeting": "greeting"},
"next": "finish",
},
{
"id": "finish",
"type": "terminal",
"result": {"status": "ok"},
"artifacts": ["greeting"],
},
],
}
- Run:
import asyncio
from stageflow.core.pipeline import Pipeline
from stageflow.core.session import Session
from stageflow.core.context import Context
async def main():
pipe = Pipeline.from_dict(pipeline_dict)
session = Session(id="demo", pipeline=pipe, context=Context(payload={"user_name": "Alice"}))
result = await session.run()
print(result.result) # {'status': 'ok'}
print(result.artifacts) # {'greeting': 'Hello, Alice!'}
asyncio.run(main())
Docs and schema generation
- Programmatic:
from stageflow.docs.html import generate_docs_assets
html_page, pipeline_schema, stages_json = generate_docs_assets()
# html_page — ready-to-serve HTML
# pipeline_schema — dict with JSON Schema (stage enum injected)
# stages_json — raw JSON with stage specs
- CLI (prints JSON payload with
html,pipeline_schema,stages_jsonto stdout):
python scripts/generate_docs_html.py > docs_assets.json
If needed, persist the html from the JSON payload yourself.
Tests
python -m unittest
Node types overview
stage: Executes a registered Stage. Fields:stage(name),config/arguments/outputsmappings,next, optionalfallback.condition: List of{if, then}branches (JsonLogic conditions), optionalelse.parallel: Runs several children;policy=all|any,cancel_on_errorflag, optionalnext.map: Iterates over items path, runs body node for each item (sequential/parallel), collects mapped outputs.subpipeline: Invokes nested pipeline by id, withinputsmapping,artifact_outputs, optionalresult_output,next.terminal: Ends execution, returnsresultand selectsartifacts.
Node types in detail
- Stage node
- Required:
id,type="stage",stage(registered stage name). - Data flow:
argumentsmap context paths -> stage inputs;outputsmap stage return keys -> context paths. - Control flow:
nextpoints to following node; optionalfallbacknode is used on stage exceptions. - Stage class declares
skipable,allowed_events,allowed_inputs,timeout,retries.
- Required:
- Condition node
- Required:
id,type="condition", non-emptyconditionslist. - Each condition:
{ "if": <JsonLogic>, "then": "<node id>" }; first matched branch wins. - Optional
elsetarget if no branch matches.
- Required:
- Parallel node
- Required:
id,type="parallel", non-emptychildren(node ids). policy:all(default) waits all children;anycompletes on first success.cancel_on_error: whether to cancel siblings on failure.next: node id to continue after parallel finishes.
- Required:
- Map node
- Required:
id,type="map",items(context path with iterable),body(node id). mode:sequential(default) orparallelfor body executions.output_map: map of body outputs -> target context paths for aggregated results.item_path/index_path: where to inject current item/index into context for body runs.cancel_on_error: abort remaining iterations on error.next: node id after all iterations finish.
- Required:
- Subpipeline node
- Required:
id,type="subpipeline",subpipeline_id(key insubpipelinesdict of pipeline). inputs: context mapping passed into subpipeline root context.artifact_outputs: map subpipeline artifacts -> parent context paths.result_output: path in parent context to store subpipelineresult.next: next node in parent pipeline.
- Required:
- Terminal node
- Required:
id,type="terminal". result: arbitrary object returned as final result.artifacts: list of context paths that will be returned as artifacts map.
- Required:
Built-in stages
- Vars:
SetValueStage,CopyValueStage,IncrementStage,MergeDictStage. - Dicts:
PickKeysStage,DropKeysStage. - Lists:
AppendListStage,ExtendListStage,FilterListStage,UniqueListStage,PopListStage. - Strings:
ConcatStage,TemplateStage. - Logic:
AssertStage,FailStage,LogStage,SleepStage.
Each stage docstring describes arguments/config/outputs; they are also available via generate_docs_assets() / stages_json.
Useful refs
stageflow/core— core (pipeline, session, nodes, context).stageflow/builtins— built-in stages.stageflow/docs— schema/HTML generation helpers.scripts/— helpers (docs generator wrapper).tests/— unit tests.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file stageflow_framework-0.1.1.tar.gz.
File metadata
- Download URL: stageflow_framework-0.1.1.tar.gz
- Upload date:
- Size: 31.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2fea0d231926376bd739920a3471e53e1d990065ab423257a1f442d5627c236c
|
|
| MD5 |
2a47671b9536a4f60b4a1bee4a16898c
|
|
| BLAKE2b-256 |
df6837e23d7f3b2c2f9a8ac9fa3b542c5829db33918c0dbe8a902547713dd305
|
File details
Details for the file stageflow_framework-0.1.1-py3-none-any.whl.
File metadata
- Download URL: stageflow_framework-0.1.1-py3-none-any.whl
- Upload date:
- Size: 29.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
055f441c51dc467b3d4eb5b4b4b0fd8299c96be2d56968eb430e031cc45bf178
|
|
| MD5 |
aeb779d0504952445ecf1520595e14a9
|
|
| BLAKE2b-256 |
298b88f89bc0e8dbcccd65257d7ee5be45b90d4917f15d090dac7abce1b8ac52
|