Skip to main content

StageFlow: Pipeline framework for stages

Project description

StageFlow

StageFlow is a lightweight framework for describing and running JSON pipelines with nodes, stages, and schema validation.

Features

  • Node types: stage, condition, parallel, map, subpipeline, terminal.
  • Custom stages (async run) with documented arguments/config/outputs.
  • Input waiting (wait_input), event history, session snapshots.
  • Built-in stages for dict/list/string/logic utilities.
  • Pipeline JSON Schema (stage enum injected on the fly) and HTML doc generator.

Installation

python -m venv .venv
source .venv/bin/activate  # Windows: .venv\Scripts\activate
pip install stageflow-framework
# or for local dev:
# pip install -e .

Requires Python 3.10+.

Quickstart

  1. Register your stage:
from stageflow.core.stage import BaseStage, register_stage

@register_stage("HelloStage")
class HelloStage(BaseStage):
    """
    description: "Custom stage example"
    arguments:
      name: string
    outputs:
      greeting: string
    """
    async def run(self):
        name = self.get_arguments().get("name", "world")
        self.set_outputs({"greeting": f"Hello, {name}!"})
  1. Describe a pipeline in JSON:
pipeline_dict = {
    "entry": "start",
    "nodes": [
        {
            "id": "start",
            "type": "stage",
            "stage": "HelloStage",
            "arguments": {"name": "user_name"},
            "outputs": {"greeting": "greeting"},
            "next": "finish",
        },
        {
            "id": "finish",
            "type": "terminal",
            "result": {"status": "ok"},
            "artifacts": ["greeting"],
        },
    ],
}
  1. Run:
import asyncio
from stageflow.core.pipeline import Pipeline
from stageflow.core.session import Session
from stageflow.core.context import Context

async def main():
    pipe = Pipeline.from_dict(pipeline_dict)
    session = Session(id="demo", pipeline=pipe, context=Context(payload={"user_name": "Alice"}))
    result = await session.run()
    print(result.result)        # {'status': 'ok'}
    print(result.artifacts)     # {'greeting': 'Hello, Alice!'}

asyncio.run(main())

Docs and schema generation

  • Programmatic:
from stageflow.docs.html import generate_docs_assets

html_page, pipeline_schema, stages_json = generate_docs_assets()
# html_page — ready-to-serve HTML
# pipeline_schema — dict with JSON Schema (stage enum injected)
# stages_json — raw JSON with stage specs
  • CLI (prints JSON payload with html, pipeline_schema, stages_json to stdout):
python scripts/generate_docs_html.py > docs_assets.json

If needed, persist the html from the JSON payload yourself.

Tests

python -m unittest

Node types overview

  • stage: Executes a registered Stage. Fields: stage (name), config/arguments/outputs mappings, next, optional fallback.
  • condition: List of {if, then} branches (JsonLogic conditions), optional else.
  • parallel: Runs several children; policy = all|any, cancel_on_error flag, optional next.
  • map: Iterates over items path, runs body node for each item (sequential/parallel), collects mapped outputs.
  • subpipeline: Invokes nested pipeline by id, with inputs mapping, artifact_outputs, optional result_output, next.
  • terminal: Ends execution, returns result and selects artifacts.

Node types in detail

  • Stage node
    • Required: id, type="stage", stage (registered stage name).
    • Data flow: arguments map context paths -> stage inputs; outputs map stage return keys -> context paths.
    • Control flow: next points to following node; optional fallback node is used on stage exceptions.
    • Stage class declares skipable, allowed_events, allowed_inputs, timeout, retries.
  • Condition node
    • Required: id, type="condition", non-empty conditions list.
    • Each condition: { "if": <JsonLogic>, "then": "<node id>" }; first matched branch wins.
    • Optional else target if no branch matches.
  • Parallel node
    • Required: id, type="parallel", non-empty children (node ids).
    • policy: all (default) waits all children; any completes on first success.
    • cancel_on_error: whether to cancel siblings on failure.
    • next: node id to continue after parallel finishes.
  • Map node
    • Required: id, type="map", items (context path with iterable), body (node id).
    • mode: sequential (default) or parallel for body executions.
    • output_map: map of body outputs -> target context paths for aggregated results.
    • item_path/index_path: where to inject current item/index into context for body runs.
    • cancel_on_error: abort remaining iterations on error.
    • next: node id after all iterations finish.
  • Subpipeline node
    • Required: id, type="subpipeline", subpipeline_id (key in subpipelines dict of pipeline).
    • inputs: context mapping passed into subpipeline root context.
    • artifact_outputs: map subpipeline artifacts -> parent context paths.
    • result_output: path in parent context to store subpipeline result.
    • next: next node in parent pipeline.
  • Terminal node
    • Required: id, type="terminal".
    • result: arbitrary object returned as final result.
    • artifacts: list of context paths that will be returned as artifacts map.

Built-in stages

  • Vars: SetValueStage, CopyValueStage, IncrementStage, MergeDictStage.
  • Dicts: PickKeysStage, DropKeysStage.
  • Lists: AppendListStage, ExtendListStage, FilterListStage, UniqueListStage, PopListStage.
  • Strings: ConcatStage, TemplateStage.
  • Logic: AssertStage, FailStage, LogStage, SleepStage.

Each stage docstring describes arguments/config/outputs; they are also available via generate_docs_assets() / stages_json.

Useful refs

  • stageflow/core — core (pipeline, session, nodes, context).
  • stageflow/builtins — built-in stages.
  • stageflow/docs — schema/HTML generation helpers.
  • scripts/ — helpers (docs generator wrapper).
  • tests/ — unit tests.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

stageflow_framework-0.1.1.tar.gz (31.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

stageflow_framework-0.1.1-py3-none-any.whl (29.8 kB view details)

Uploaded Python 3

File details

Details for the file stageflow_framework-0.1.1.tar.gz.

File metadata

  • Download URL: stageflow_framework-0.1.1.tar.gz
  • Upload date:
  • Size: 31.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.7

File hashes

Hashes for stageflow_framework-0.1.1.tar.gz
Algorithm Hash digest
SHA256 2fea0d231926376bd739920a3471e53e1d990065ab423257a1f442d5627c236c
MD5 2a47671b9536a4f60b4a1bee4a16898c
BLAKE2b-256 df6837e23d7f3b2c2f9a8ac9fa3b542c5829db33918c0dbe8a902547713dd305

See more details on using hashes here.

File details

Details for the file stageflow_framework-0.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for stageflow_framework-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 055f441c51dc467b3d4eb5b4b4b0fd8299c96be2d56968eb430e031cc45bf178
MD5 aeb779d0504952445ecf1520595e14a9
BLAKE2b-256 298b88f89bc0e8dbcccd65257d7ee5be45b90d4917f15d090dac7abce1b8ac52

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page