Skip to main content

StageFlow: Pipeline framework for stages

Project description

StageFlow

StageFlow is a lightweight framework for describing and running JSON pipelines with nodes, stages, and schema validation.

Features

  • Node types: stage, condition, parallel, map, subpipeline, terminal.
  • Custom stages (async run) with documented arguments/config/outputs.
  • Input waiting (wait_input), event history, session snapshots.
  • Built-in stages for dict/list/string/logic utilities.
  • Pipeline JSON Schema (stage enum injected on the fly) and HTML doc generator.

Installation

python -m venv .venv
source .venv/bin/activate  # Windows: .venv\Scripts\activate
pip install stageflow-framework
# or for local dev:
# pip install -e .

Requires Python 3.10+.

Quickstart

  1. Register your stage:
from stageflow.core.stage import BaseStage, register_stage

@register_stage("HelloStage")
class HelloStage(BaseStage):
    """
    description: "Custom stage example"
    arguments:
      name: string
    outputs:
      greeting: string
    """
    async def run(self):
        name = self.get_arguments().get("name", "world")
        self.set_outputs({"greeting": f"Hello, {name}!"})
  1. Describe a pipeline in JSON:
pipeline_dict = {
    "entry": "start",
    "nodes": [
        {
            "id": "start",
            "type": "stage",
            "stage": "HelloStage",
            "arguments": {"name": "user_name"},
            "outputs": {"greeting": "greeting"},
            "next": "finish",
        },
        {
            "id": "finish",
            "type": "terminal",
            "result": {"status": "ok"},
            "artifacts": ["greeting"],
        },
    ],
}
  1. Run:
import asyncio
from stageflow.core.pipeline import Pipeline
from stageflow.core.session import Session
from stageflow.core.context import Context

async def main():
    pipe = Pipeline.from_dict(pipeline_dict)
    session = Session(id="demo", pipeline=pipe, context=Context(payload={"user_name": "Alice"}))
    result = await session.run()
    print(result.result)        # {'status': 'ok'}
    print(result.artifacts)     # {'greeting': 'Hello, Alice!'}

asyncio.run(main())

Docs and schema generation

  • Programmatic:
from stageflow.docs.html import generate_docs_assets

html_page, pipeline_schema, stages_json = generate_docs_assets()
# html_page — ready-to-serve HTML
# pipeline_schema — dict with JSON Schema (stage enum injected)
# stages_json — raw JSON with stage specs
  • CLI (prints JSON payload with html, pipeline_schema, stages_json to stdout):
python scripts/generate_docs_html.py > docs_assets.json

If needed, persist the html from the JSON payload yourself.

Tests

python -m unittest

Node types overview

  • stage: Executes a registered Stage. Fields: stage (name), config/arguments/outputs mappings, next, optional fallback.
  • condition: List of {if, then} branches (JsonLogic conditions), optional else.
  • parallel: Runs several children; policy = all|any, cancel_on_error flag, optional next.
  • map: Iterates over items path, runs body node for each item (sequential/parallel), collects mapped outputs.
  • subpipeline: Invokes nested pipeline by id, with inputs mapping, artifact_outputs, optional result_output, next.
  • terminal: Ends execution, returns result and selects artifacts.

Node types in detail

  • Stage node
    • Required: id, type="stage", stage (registered stage name).
    • Data flow: arguments map context paths -> stage inputs; outputs map stage return keys -> context paths.
    • Control flow: next points to following node; optional fallback node is used on stage exceptions.
    • Stage class declares skipable, allowed_events, allowed_inputs, timeout, retries.
  • Condition node
    • Required: id, type="condition", non-empty conditions list.
    • Each condition: { "if": <JsonLogic>, "then": "<node id>" }; first matched branch wins.
    • Optional else target if no branch matches.
  • Parallel node
    • Required: id, type="parallel", non-empty children (node ids).
    • policy: all (default) waits all children; any completes on first success.
    • cancel_on_error: whether to cancel siblings on failure.
    • next: node id to continue after parallel finishes.
  • Map node
    • Required: id, type="map", items (context path with iterable), body (node id).
    • mode: sequential (default) or parallel for body executions.
    • output_map: map of body outputs -> target context paths for aggregated results.
    • item_path/index_path: where to inject current item/index into context for body runs.
    • cancel_on_error: abort remaining iterations on error.
    • next: node id after all iterations finish.
  • Subpipeline node
    • Required: id, type="subpipeline", subpipeline_id (key in subpipelines dict of pipeline).
    • inputs: context mapping passed into subpipeline root context.
    • artifact_outputs: map subpipeline artifacts -> parent context paths.
    • result_output: path in parent context to store subpipeline result.
    • next: next node in parent pipeline.
  • Terminal node
    • Required: id, type="terminal".
    • result: arbitrary object returned as final result.
    • artifacts: list of context paths that will be returned as artifacts map.

Built-in stages

  • Vars: SetValueStage, CopyValueStage, IncrementStage, MergeDictStage.
  • Dicts: PickKeysStage, DropKeysStage.
  • Lists: AppendListStage, ExtendListStage, FilterListStage, UniqueListStage, PopListStage.
  • Strings: ConcatStage, TemplateStage.
  • Logic: AssertStage, FailStage, LogStage, SleepStage.

Each stage docstring describes arguments/config/outputs; they are also available via generate_docs_assets() / stages_json.

Useful refs

  • stageflow/core — core (pipeline, session, nodes, context).
  • stageflow/builtins — built-in stages.
  • stageflow/docs — schema/HTML generation helpers.
  • scripts/ — helpers (docs generator wrapper).
  • tests/ — unit tests.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

stageflow_framework-0.1.2.tar.gz (31.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

stageflow_framework-0.1.2-py3-none-any.whl (29.8 kB view details)

Uploaded Python 3

File details

Details for the file stageflow_framework-0.1.2.tar.gz.

File metadata

  • Download URL: stageflow_framework-0.1.2.tar.gz
  • Upload date:
  • Size: 31.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.7

File hashes

Hashes for stageflow_framework-0.1.2.tar.gz
Algorithm Hash digest
SHA256 6b96cfe0952b05ed57118c56b42fdad5572111a755a88c62dfd148b4be1e456c
MD5 afe4277fbe0a15bbdb05285da1fbc092
BLAKE2b-256 f6f35a351ea2d0509cba77642033aefd4783003107c211d2a3814533eb8f2f04

See more details on using hashes here.

File details

Details for the file stageflow_framework-0.1.2-py3-none-any.whl.

File metadata

File hashes

Hashes for stageflow_framework-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 9241c0daaacb0df3375b85e6355e0b6d2f00b7ba4cb86161d1afa62b5703e2b1
MD5 939c2f6599b568220dbe3871e317c230
BLAKE2b-256 5d428b93b941434b59cbdcf4e4568494b56614ca5583912f5553ba874806f3d6

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page