Skip to main content

StageFlow: Pipeline framework for stages

Project description

StageFlow

StageFlow is a lightweight framework for describing and running JSON pipelines with nodes, stages, and schema validation.

Features

  • Node types: stage, condition, parallel, map, subpipeline, terminal.
  • Custom stages (async run) with documented arguments/config/outputs.
  • Input waiting (wait_input), event history, session snapshots.
  • Built-in stages for dict/list/string/logic utilities.
  • Pipeline JSON Schema (stage enum injected on the fly) and HTML doc generator.

Installation

python -m venv .venv
source .venv/bin/activate  # Windows: .venv\Scripts\activate
pip install stageflow-framework
# or for local dev:
# pip install -e .

Requires Python 3.10+.

Quickstart

  1. Register your stage:
from stageflow.core.stage import BaseStage, register_stage

@register_stage("HelloStage")
class HelloStage(BaseStage):
    """
    description: "Custom stage example"
    arguments:
      name: string
    outputs:
      greeting: string
    """
    async def run(self):
        name = self.get_arguments().get("name", "world")
        self.set_outputs({"greeting": f"Hello, {name}!"})
  1. Describe a pipeline in JSON:
pipeline_dict = {
    "entry": "start",
    "nodes": [
        {
            "id": "start",
            "type": "stage",
            "stage": "HelloStage",
            "arguments": {"name": "user_name"},
            "outputs": {"greeting": "greeting"},
            "next": "finish",
        },
        {
            "id": "finish",
            "type": "terminal",
            "result": {"status": "ok"},
            "artifacts": ["greeting"],
        },
    ],
}
  1. Run:
import asyncio
from stageflow.core.pipeline import Pipeline
from stageflow.core.session import Session
from stageflow.core.context import Context

async def main():
    pipe = Pipeline.from_dict(pipeline_dict)
    session = Session(id="demo", pipeline=pipe, context=Context(payload={"user_name": "Alice"}))
    result = await session.run()
    print(result.result)        # {'status': 'ok'}
    print(result.artifacts)     # {'greeting': 'Hello, Alice!'}

asyncio.run(main())

Docs and schema generation

  • Programmatic:
from stageflow.docs.html import generate_docs_assets

html_page, pipeline_schema, stages_json = generate_docs_assets()
# html_page — ready-to-serve HTML
# pipeline_schema — dict with JSON Schema (stage enum injected)
# stages_json — raw JSON with stage specs
  • CLI (prints JSON payload with html, pipeline_schema, stages_json to stdout):
python scripts/generate_docs_html.py > docs_assets.json

If needed, persist the html from the JSON payload yourself.

Tests

python -m unittest

Node types overview

  • stage: Executes a registered Stage. Fields: stage (name), config/arguments/outputs mappings, next, optional fallback.
  • condition: List of {if, then} branches (JsonLogic conditions), optional else.
  • parallel: Runs several children; policy = all|any, cancel_on_error flag, optional next.
  • map: Iterates over items path, runs body node for each item (sequential/parallel), collects mapped outputs.
  • subpipeline: Invokes nested pipeline by id, with inputs mapping, artifact_outputs, optional result_output, next.
  • terminal: Ends execution, returns result and selects artifacts.

Node types in detail

  • Stage node
    • Required: id, type="stage", stage (registered stage name).
    • Data flow: arguments map context paths -> stage inputs; outputs map stage return keys -> context paths.
    • Control flow: next points to following node; optional fallback node is used on stage exceptions.
    • Stage class declares skipable, allowed_events, allowed_inputs, timeout, retries.
  • Condition node
    • Required: id, type="condition", non-empty conditions list.
    • Each condition: { "if": <JsonLogic>, "then": "<node id>" }; first matched branch wins.
    • Optional else target if no branch matches.
  • Parallel node
    • Required: id, type="parallel", non-empty children (node ids).
    • policy: all (default) waits all children; any completes on first success.
    • cancel_on_error: whether to cancel siblings on failure.
    • next: node id to continue after parallel finishes.
  • Map node
    • Required: id, type="map", items (context path with iterable), body (node id).
    • mode: sequential (default) or parallel for body executions.
    • output_map: map of body outputs -> target context paths for aggregated results.
    • item_path/index_path: where to inject current item/index into context for body runs.
    • cancel_on_error: abort remaining iterations on error.
    • next: node id after all iterations finish.
  • Subpipeline node
    • Required: id, type="subpipeline", subpipeline_id (key in subpipelines dict of pipeline).
    • inputs: context mapping passed into subpipeline root context.
    • artifact_outputs: map subpipeline artifacts -> parent context paths.
    • result_output: path in parent context to store subpipeline result.
    • next: next node in parent pipeline.
  • Terminal node
    • Required: id, type="terminal".
    • result: arbitrary object returned as final result.
    • artifacts: list of context paths that will be returned as artifacts map.

Built-in stages

  • Vars: SetValueStage, CopyValueStage, IncrementStage, MergeDictStage.
  • Dicts: PickKeysStage, DropKeysStage.
  • Lists: AppendListStage, ExtendListStage, FilterListStage, UniqueListStage, PopListStage.
  • Strings: ConcatStage, TemplateStage.
  • Logic: AssertStage, FailStage, LogStage, SleepStage.

Each stage docstring describes arguments/config/outputs; they are also available via generate_docs_assets() / stages_json.

Useful refs

  • stageflow/core — core (pipeline, session, nodes, context).
  • stageflow/builtins — built-in stages.
  • stageflow/docs — schema/HTML generation helpers.
  • scripts/ — helpers (docs generator wrapper).
  • tests/ — unit tests.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

stageflow_framework-0.1.4.tar.gz (31.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

stageflow_framework-0.1.4-py3-none-any.whl (29.9 kB view details)

Uploaded Python 3

File details

Details for the file stageflow_framework-0.1.4.tar.gz.

File metadata

  • Download URL: stageflow_framework-0.1.4.tar.gz
  • Upload date:
  • Size: 31.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.7

File hashes

Hashes for stageflow_framework-0.1.4.tar.gz
Algorithm Hash digest
SHA256 63e8c7f5e780f6be0df271db47e7de99ec555dabeb6d39d94cfa896a857e917e
MD5 61d5404282cfb09ae289625eea18146a
BLAKE2b-256 551ae125918acfeb0fc057c5ae847fbb6d651e551091e41e4a5e7d3950245aff

See more details on using hashes here.

File details

Details for the file stageflow_framework-0.1.4-py3-none-any.whl.

File metadata

File hashes

Hashes for stageflow_framework-0.1.4-py3-none-any.whl
Algorithm Hash digest
SHA256 5c5a91f1e46b88beb5c4f08f22fbe2470ca0de884f3fb6f8981bdd7554d51722
MD5 518bc19856a5764e6583fedbdfbc64e8
BLAKE2b-256 1b8e0e1f2f3ca902679ece2d83b5ce670b4766cbc937b3e16e9f4417948bd683

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page