Skip to main content

StageFlow: Pipeline framework for stages

Project description

StageFlow

StageFlow is a lightweight framework for describing and running JSON pipelines with nodes, stages, and schema validation.

Features

  • Node types: stage, condition, parallel, map, subpipeline, terminal.
  • Custom stages (async run) with documented arguments/config/outputs.
  • Input waiting (wait_input), event history, session snapshots.
  • Built-in stages for dict/list/string/logic utilities.
  • Pipeline JSON Schema (stage enum injected on the fly) and HTML doc generator.

Installation

python -m venv .venv
source .venv/bin/activate  # Windows: .venv\Scripts\activate
pip install stageflow-framework
# or for local dev:
# pip install -e .

Requires Python 3.10+.

Quickstart

  1. Register your stage:
from stageflow.core.stage import BaseStage, register_stage

@register_stage("HelloStage")
class HelloStage(BaseStage):
    """
    description: "Custom stage example"
    arguments:
      name: string
    outputs:
      greeting: string
    """
    async def run(self):
        name = self.get_arguments().get("name", "world")
        self.set_outputs({"greeting": f"Hello, {name}!"})
  1. Describe a pipeline in JSON:
pipeline_dict = {
    "entry": "start",
    "nodes": [
        {
            "id": "start",
            "type": "stage",
            "stage": "HelloStage",
            "arguments": {"name": "user_name"},
            "outputs": {"greeting": "greeting"},
            "next": "finish",
        },
        {
            "id": "finish",
            "type": "terminal",
            "result": {"status": "ok"},
            "artifacts": ["greeting"],
        },
    ],
}
  1. Run:
import asyncio
from stageflow.core.pipeline import Pipeline
from stageflow.core.session import Session
from stageflow.core.context import Context

async def main():
    pipe = Pipeline.from_dict(pipeline_dict)
    session = Session(id="demo", pipeline=pipe, context=Context(payload={"user_name": "Alice"}))
    result = await session.run()
    print(result.result)        # {'status': 'ok'}
    print(result.artifacts)     # {'greeting': 'Hello, Alice!'}

asyncio.run(main())

Docs and schema generation

  • Programmatic:
from stageflow.docs.html import generate_docs_assets

html_page, pipeline_schema, stages_json = generate_docs_assets()
# html_page — ready-to-serve HTML
# pipeline_schema — dict with JSON Schema (stage enum injected)
# stages_json — raw JSON with stage specs
  • CLI (prints JSON payload with html, pipeline_schema, stages_json to stdout):
python scripts/generate_docs_html.py > docs_assets.json

If needed, persist the html from the JSON payload yourself.

Tests

python -m unittest

Node types overview

  • stage: Executes a registered Stage. Fields: stage (name), config/arguments/outputs mappings, next, optional fallback.
  • condition: List of {if, then} branches (JsonLogic conditions), optional else.
  • parallel: Runs several children; policy = all|any, cancel_on_error flag, optional next.
  • map: Iterates over items path, runs body node for each item (sequential/parallel), collects mapped outputs.
  • subpipeline: Invokes nested pipeline by id, with inputs mapping, artifact_outputs, optional result_output, next.
  • terminal: Ends execution, returns result and selects artifacts.

Node types in detail

  • Stage node
    • Required: id, type="stage", stage (registered stage name).
    • Data flow: arguments map context paths -> stage inputs; outputs map stage return keys -> context paths.
    • Control flow: next points to following node; optional fallback node is used on stage exceptions.
    • Stage class declares skipable, allowed_events, allowed_inputs, timeout, retries.
  • Condition node
    • Required: id, type="condition", non-empty conditions list.
    • Each condition: { "if": <JsonLogic>, "then": "<node id>" }; first matched branch wins.
    • Optional else target if no branch matches.
  • Parallel node
    • Required: id, type="parallel", non-empty children (node ids).
    • policy: all (default) waits all children; any completes on first success.
    • cancel_on_error: whether to cancel siblings on failure.
    • next: node id to continue after parallel finishes.
  • Map node
    • Required: id, type="map", items (context path with iterable), body (node id).
    • mode: sequential (default) or parallel for body executions.
    • output_map: map of body outputs -> target context paths for aggregated results.
    • item_path/index_path: where to inject current item/index into context for body runs.
    • cancel_on_error: abort remaining iterations on error.
    • next: node id after all iterations finish.
  • Subpipeline node
    • Required: id, type="subpipeline", subpipeline_id (key in subpipelines dict of pipeline).
    • inputs: context mapping passed into subpipeline root context.
    • artifact_outputs: map subpipeline artifacts -> parent context paths.
    • result_output: path in parent context to store subpipeline result.
    • next: next node in parent pipeline.
  • Terminal node
    • Required: id, type="terminal".
    • result: arbitrary object returned as final result.
    • artifacts: list of context paths that will be returned as artifacts map.

Built-in stages

  • Vars: SetValueStage, CopyValueStage, IncrementStage, MergeDictStage.
  • Dicts: PickKeysStage, DropKeysStage.
  • Lists: AppendListStage, ExtendListStage, FilterListStage, UniqueListStage, PopListStage.
  • Strings: ConcatStage, TemplateStage.
  • Logic: AssertStage, FailStage, LogStage, SleepStage.

Each stage docstring describes arguments/config/outputs; they are also available via generate_docs_assets() / stages_json.

Useful refs

  • stageflow/core — core (pipeline, session, nodes, context).
  • stageflow/builtins — built-in stages.
  • stageflow/docs — schema/HTML generation helpers.
  • scripts/ — helpers (docs generator wrapper).
  • tests/ — unit tests.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

stageflow_framework-0.1.5.tar.gz (31.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

stageflow_framework-0.1.5-py3-none-any.whl (30.0 kB view details)

Uploaded Python 3

File details

Details for the file stageflow_framework-0.1.5.tar.gz.

File metadata

  • Download URL: stageflow_framework-0.1.5.tar.gz
  • Upload date:
  • Size: 31.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.7

File hashes

Hashes for stageflow_framework-0.1.5.tar.gz
Algorithm Hash digest
SHA256 696392ada5fa27afb8b6f850e09f2978b1d49c94b65784a35222b5ebaf1600c1
MD5 e55779b367fca99549eb0467330be727
BLAKE2b-256 9b62f3e99e7a0e8927e1cb1fbb41c747bea084a1b8292c4ff486e36a35fb1ddf

See more details on using hashes here.

File details

Details for the file stageflow_framework-0.1.5-py3-none-any.whl.

File metadata

File hashes

Hashes for stageflow_framework-0.1.5-py3-none-any.whl
Algorithm Hash digest
SHA256 7c48a6931aabc9add0462f0f08f11a3dcb33a9fddc25bb6e8fa43b80e4be2706
MD5 98d6e68feb3b404a1e9a24b2022276f3
BLAKE2b-256 156eb430d32ff958c20c8774c62fcca93aac68568026f873cbb358a518d5d8d3

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page