Skip to main content

StageFlow: Pipeline framework for stages

Project description

StageFlow

StageFlow is a lightweight framework for describing and running JSON pipelines with nodes, stages, and schema validation.

Features

  • Node types: stage, condition, parallel, map, subpipeline, terminal.
  • Custom stages (async run) with documented arguments/config/outputs.
  • Input waiting (wait_input), event history, session snapshots.
  • Built-in stages for dict/list/string/logic utilities.
  • Pipeline JSON Schema (stage enum injected on the fly) and HTML doc generator.

Installation

python -m venv .venv
source .venv/bin/activate  # Windows: .venv\Scripts\activate
pip install stageflow-framework
# or for local dev:
# pip install -e .

Requires Python 3.10+.

Quickstart

  1. Register your stage:
from stageflow.core.stage import BaseStage, register_stage

@register_stage("HelloStage")
class HelloStage(BaseStage):
    """
    description: "Custom stage example"
    arguments:
      name: string
    outputs:
      greeting: string
    """
    async def run(self):
        name = self.get_arguments().get("name", "world")
        self.set_outputs({"greeting": f"Hello, {name}!"})
  1. Describe a pipeline in JSON:
pipeline_dict = {
    "entry": "start",
    "nodes": [
        {
            "id": "start",
            "type": "stage",
            "stage": "HelloStage",
            "arguments": {"name": "user_name"},
            "outputs": {"greeting": "greeting"},
            "next": "finish",
        },
        {
            "id": "finish",
            "type": "terminal",
            "result": {"status": "ok"},
            "artifacts": ["greeting"],
        },
    ],
}
  1. Run:
import asyncio
from stageflow.core.pipeline import Pipeline
from stageflow.core.session import Session
from stageflow.core.context import Context

async def main():
    pipe = Pipeline.from_dict(pipeline_dict)
    session = Session(id="demo", pipeline=pipe, context=Context(payload={"user_name": "Alice"}))
    result = await session.run()
    print(result.result)        # {'status': 'ok'}
    print(result.artifacts)     # {'greeting': 'Hello, Alice!'}

asyncio.run(main())

Docs and schema generation

  • Programmatic:
from stageflow.docs.html import generate_docs_assets

html_page, pipeline_schema, stages_json = generate_docs_assets()
# html_page — ready-to-serve HTML
# pipeline_schema — dict with JSON Schema (stage enum injected)
# stages_json — raw JSON with stage specs
  • CLI (prints JSON payload with html, pipeline_schema, stages_json to stdout):
python scripts/generate_docs_html.py > docs_assets.json

If needed, persist the html from the JSON payload yourself.

Tests

python -m unittest

Node types overview

  • stage: Executes a registered Stage. Fields: stage (name), config/arguments/outputs mappings, next, optional fallback.
  • condition: List of {if, then} branches (JsonLogic conditions), optional else.
  • parallel: Runs several children; policy = all|any, cancel_on_error flag, optional next.
  • map: Iterates over items path, runs body node for each item (sequential/parallel), collects mapped outputs.
  • subpipeline: Invokes nested pipeline by id, with inputs mapping, artifact_outputs, optional result_output, next.
  • terminal: Ends execution, returns result and selects artifacts.

Node types in detail

  • Stage node
    • Required: id, type="stage", stage (registered stage name).
    • Data flow: arguments map context paths -> stage inputs; outputs map stage return keys -> context paths.
    • Control flow: next points to following node; optional fallback node is used on stage exceptions.
    • Stage class declares skipable, allowed_events, allowed_inputs, timeout, retries.
  • Condition node
    • Required: id, type="condition", non-empty conditions list.
    • Each condition: { "if": <JsonLogic>, "then": "<node id>" }; first matched branch wins.
    • Optional else target if no branch matches.
  • Parallel node
    • Required: id, type="parallel", non-empty children (node ids).
    • policy: all (default) waits all children; any completes on first success.
    • cancel_on_error: whether to cancel siblings on failure.
    • next: node id to continue after parallel finishes.
  • Map node
    • Required: id, type="map", items (context path with iterable), body (node id).
    • mode: sequential (default) or parallel for body executions.
    • output_map: map of body outputs -> target context paths for aggregated results.
    • item_path/index_path: where to inject current item/index into context for body runs.
    • cancel_on_error: abort remaining iterations on error.
    • next: node id after all iterations finish.
  • Subpipeline node
    • Required: id, type="subpipeline", subpipeline_id (key in subpipelines dict of pipeline).
    • inputs: context mapping passed into subpipeline root context.
    • artifact_outputs: map subpipeline artifacts -> parent context paths.
    • result_output: path in parent context to store subpipeline result.
    • next: next node in parent pipeline.
  • Terminal node
    • Required: id, type="terminal".
    • result: arbitrary object returned as final result.
    • artifacts: list of context paths that will be returned as artifacts map.

Built-in stages

  • Vars: SetValueStage, CopyValueStage, IncrementStage, MergeDictStage.
  • Dicts: PickKeysStage, DropKeysStage.
  • Lists: AppendListStage, ExtendListStage, FilterListStage, UniqueListStage, PopListStage.
  • Strings: ConcatStage, TemplateStage.
  • Logic: AssertStage, FailStage, LogStage, SleepStage.

Each stage docstring describes arguments/config/outputs; they are also available via generate_docs_assets() / stages_json.

Useful refs

  • stageflow/core — core (pipeline, session, nodes, context).
  • stageflow/builtins — built-in stages.
  • stageflow/docs — schema/HTML generation helpers.
  • scripts/ — helpers (docs generator wrapper).
  • tests/ — unit tests.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

stageflow_framework-0.1.6.tar.gz (32.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

stageflow_framework-0.1.6-py3-none-any.whl (30.3 kB view details)

Uploaded Python 3

File details

Details for the file stageflow_framework-0.1.6.tar.gz.

File metadata

  • Download URL: stageflow_framework-0.1.6.tar.gz
  • Upload date:
  • Size: 32.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.7

File hashes

Hashes for stageflow_framework-0.1.6.tar.gz
Algorithm Hash digest
SHA256 77d2a4c33e5024406f9e68757f9c14f097721be3ad674170c6e13e06ef14ca2a
MD5 ddfd3709565af5c5d45289b956de6a93
BLAKE2b-256 891d0506ee4716f73d01589bad2fdf7bf4c2ac10b7b7d3f789cb798e4962fc22

See more details on using hashes here.

File details

Details for the file stageflow_framework-0.1.6-py3-none-any.whl.

File metadata

File hashes

Hashes for stageflow_framework-0.1.6-py3-none-any.whl
Algorithm Hash digest
SHA256 be313f47b7ecb15a3c88958ff20574f5a073f73c0d760f861b5a06e26609d193
MD5 6c7fa29c79b71c58500aecfe8ae48ceb
BLAKE2b-256 44340eb21a8c5ad33a8ace17f4c75d3aab5b8d8920ffa8e7e6e6849f3b719318

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page