Skip to main content

Composable chain-of-steps orchestrator with context passing, logging, retries, and validation.

Project description

StepChain 🪢

Composable step orchestrator for Python.
Write clean, declarative pipelines for synchronous and asynchronous workflows — with retries, validation, logging, and hooks — all in a tiny, dependency-free package.


Why StepChain?

Every real system has orchestration glue code:

  • “extract → transform → load” ETL jobs
  • “fetch → enrich → push” API calls
  • “validate → save → notify” business workflows

Without structure, these pipelines quickly become:

  • ❌ Nested try/except spaghetti
  • ❌ Repeated logging boilerplate
  • ❌ Retry logic scattered everywhere
  • ❌ Hard to test, hard to extend

StepChain exists to fix that.
It gives you a fluent API to compose steps into a pipeline that is:

  • 🔒 Safe — retries, validation hooks, deadlines, error classes
  • 📊 Transparent — precompiled logging, redaction, before/after hooks
  • ⚡ Fast — near-zero runtime overhead, tiny dependency footprint (great for Lambdas & microservices)
  • 🧩 Universal — works in basic Python, FastAPI/asyncio, or inside your ETL jobs

Use Cases

  • Data pipelines
    ETL/ELT flows with retries, validation, and logging baked in.

  • APIs & async services
    Orchestrate async calls (HTTP, DB, queues) with retries and deadlines.

  • Business workflows
    Chaining validation → save → publish → notify in web apps.

  • Serverless (AWS Lambda, GCP Cloud Functions)
    Tiny cold-start friendly orchestrator (no heavy deps).

  • Testing & prototyping
    Express multi-step flows declaratively, without external schedulers.


Install

pip install py-stepchain

Usage

Sync example (basic Python ETL)

from stepchain import Chain

data = [
    {"id": 1, "name": "alice", "active": True},
    {"id": 2, "name": "bob",   "active": False},
]

def extract():
    return data

def transform(rows):
    return [r["name"].upper() for r in rows if r["active"]]

def load(names):
    print("Loaded:", names)
    return {"count": len(names)}

ctx = (
    Chain()
    .next(extract, out="raw", log_fmt="raw_n={raw.__len__}")
    .next(transform, out="clean", args=["raw"], log_fmt="clean_n={clean.__len__}")
    .next(load, out="loadres", args=["clean"], log_fmt="loaded={loadres.count}")
    .run()
)

print(ctx["loadres"])
# → {"count": 1}

Async example (FastAPI)

from fastapi import FastAPI
from stepchain.chain.async_chain import AsyncChain

app = FastAPI()

async def fetch_user(user_id: int):
    return {"id": user_id, "name": "Alice"}

async def enrich(user):
    user["greeting"] = f"Hello {user['name']}!"
    return user

@app.get("/hello/{user_id}")
async def hello(user_id: int):
    ctx = await (
        AsyncChain()
        .put("id", user_id)
        .next(fetch_user, out="user", args=["id"])
        .next(enrich, out="enriched", args=["user"], log_fmt="User={enriched.name}")
        .run()
    )
    return ctx["enriched"]

Features

✅ Sync + Async APIs

✅ Retries with backoff + jitter

✅ Validation hooks per step

✅ Logging templates with {dotted.refs} and JSON serialization

✅ Before/After hooks for metrics and tracing

✅ Context redaction for secrets

✅ Strict mode: unresolved refs cause errors

✅ 100% type hints (mypy-friendly)

✅ Dependency-free (only stdlib)


Why not Airflow/Prefect/dbt/etc?

  • Those are heavy DAG engines for distributed orchestration.

  • StepChain is for the inner loop: inside your function, microservice, or Lambda.

  • It complements those tools, not replaces them.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

py_stepchain-0.1.0.tar.gz (14.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

py_stepchain-0.1.0-py3-none-any.whl (12.4 kB view details)

Uploaded Python 3

File details

Details for the file py_stepchain-0.1.0.tar.gz.

File metadata

  • Download URL: py_stepchain-0.1.0.tar.gz
  • Upload date:
  • Size: 14.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for py_stepchain-0.1.0.tar.gz
Algorithm Hash digest
SHA256 291fc74fc5f7139ea1f92fbc5bf95ae4e07a066e78795a8691e523a864ccb891
MD5 1febd02efb90c5e8645959e232a104e9
BLAKE2b-256 e541594b0e2a93a9563236b09b1cf226f8503f22b04f89ca3d82b0989da31c54

See more details on using hashes here.

Provenance

The following attestation bundles were made for py_stepchain-0.1.0.tar.gz:

Publisher: publish.yml on inderdevbhumi82-prog/stepchain

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file py_stepchain-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: py_stepchain-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 12.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for py_stepchain-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 c5b81e96324013c3eb3a71c32420a2c3763e799f5b8280c43f2fde8a6bccf567
MD5 49779233624379bf4a0c9133e30d841f
BLAKE2b-256 b59735063457a9ea8080a5ddbffda1bc161fb9fce6d5f4ddc0e5a4463b10e1d1

See more details on using hashes here.

Provenance

The following attestation bundles were made for py_stepchain-0.1.0-py3-none-any.whl:

Publisher: publish.yml on inderdevbhumi82-prog/stepchain

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page