Skip to main content

Python SDK for Cascade workflow orchestration

Project description

Cascade SDK

Python SDK for Cascade workflow orchestration APIs.

Cascade is a product by Noir Stack LLC.

Status

noirstack-cascade-sdk is currently in early public release (0.1.x). The API surface may evolve as orchestration and standards integrations mature.

Installation

pip install noirstack-cascade-sdk

Canonical Usage

Install from PyPI using:

pip install noirstack-cascade-sdk

Import in code using:

from cascade_sdk import CascadeClient, flow, task

Quick Start

1. Define Tasks and Flows

from cascade_sdk import task, flow

@task
def fetch_data(url: str) -> dict:
    """Fetch data from API."""
    import requests
    return requests.get(url).json()

@task
def process_data(data: dict) -> list:
    """Transform data."""
    return [item['id'] for item in data['results']]

@task
def save_results(ids: list) -> str:
    """Save to database."""
    # ... database logic ...
    return f"Saved {len(ids)} items"

@flow
def data_pipeline(api_url: str) -> str:
    """Complete ETL pipeline."""
    raw_data = fetch_data(api_url)
    processed = process_data(raw_data)
    return save_results(processed)

2. Compile to DAG

from cascade_sdk.compiler import build_dag_from_flow

dag = build_dag_from_flow(data_pipeline)
print(dag)
# {
#   'nodes': [
#     {'id': 'node-0', 'task': 'fetch_data'},
#     {'id': 'node-1', 'task': 'process_data', 'depends_on': ['node-0']},
#     {'id': 'node-2', 'task': 'save_results', 'depends_on': ['node-1']}
#   ],
#   'edges': [
#     {'from': 'node-0', 'to': 'node-1'},
#     {'from': 'node-1', 'to': 'node-2'}
#   ],
#   'return_node_id': 'node-2'
# }

3. Register and Execute

from cascade_sdk import CascadeClient, wait_for_completion

# Connect to Cascade server
client = CascadeClient(
    base_url="http://localhost:3000",
    api_key="your_api_key"
)

# Register flow
flow_id = client.register_flow("data_pipeline", dag)
print(f"Registered flow: {flow_id}")

# Trigger execution
run_id = client.trigger_flow(flow_id, {
    "api_url": "https://api.example.com/data"
})
print(f"Started run: {run_id}")

# Wait for completion
result = wait_for_completion(client, run_id, timeout=300)

if result['status'] == 'completed':
    print("Success:", result['result'])
else:
    print("Failed:", result.get('error'))

Features

Decorators

  • @task: Mark function as executable task
  • @flow: Mark function as workflow orchestration

Compiler

  • build_dag_from_flow(flow_func): Extract DAG from flow
  • build_dag_from_flow_json(flow_func): Get canonical JSON
  • canonical_json(obj): Deterministic serialization
  • canonicalize_dag(dag): Normalize DAG structure

Client

  • CascadeClient: HTTP client for Cascade API

    • register_flow(name, dag): Register flow definition
    • trigger_flow(flow_id, inputs): Start execution
    • get_run(run_id): Get run status
    • get_flow_graph(flow_id): Get flow DAG
    • get_run_graph(run_id): Get execution graph
  • wait_for_completion(client, run_id): Poll until done

Error Handling

from cascade_sdk import (
    AuthenticationError,
    ValidationError,
    NotFoundError,
    RateLimitError,
    OrchestrationError,
    NetworkError,
    TimeoutError,
)

try:
    flow_id = client.register_flow("my_flow", dag)
except ValidationError as e:
    print(f"Invalid DAG: {e}")
    print(f"Details: {e.response_body}")
except AuthenticationError:
    print("Check your API key")

Advanced Usage

Ecosystem Compatibility

If you are also using hexarch-guardrails in your stack, this SDK can be used alongside it:

  • keep guardrail/policy logic in hexarch-guardrails
  • keep DAG compilation + Cascade API transport in cascade-sdk

This separation helps maintain clean orchestration boundaries.

Standards-Aligned Workflow Metadata

Cascade SDK includes helpers aligned to workflow/orchestration standards:

  • BPMN 2.0: design-time mapping to executable DAG shape
  • OpenTelemetry: trace context propagation (traceparent, tracestate)
  • W3C PROV: provenance bundle helpers for entities/activities/agents
  • CNCF CloudEvents: event envelope helpers for run and task events
  • NIST SP 800-204: boundary and microservice security guidance profile

Install optional standards extras:

pip install -e ".[standards]"

Use helpers:

from cascade_sdk import build_cloudevent, build_prov_bundle, build_trace_context_headers

event = build_cloudevent(
    event_type="cascade.flow.run.started",
    source="urn:cascade:orchestrator",
    data={"run_id": "run-123"},
    extensions=build_trace_context_headers(),
)

prov = build_prov_bundle()

Validate standards artifacts before transport/storage:

from cascade_sdk import validate_cloudevent, validate_dag

validate_cloudevent(event)
validate_dag(dag)

See WORKFLOW_STANDARDS_PROFILE.md for the full profile.

BPMN 2.0 to DAG Mapping

from cascade_sdk import bpmn_xml_to_dag

with open("process.bpmn", "r", encoding="utf-8") as f:
    dag = bpmn_xml_to_dag(f.read())

This maps BPMN task-like nodes and sequence flows into the SDK DAG shape.

Apache Airflow DAG Adapter

from cascade_sdk import airflow_dag_to_dag

# airflow_dag is an Airflow DAG instance
dag = airflow_dag_to_dag(airflow_dag)

Install Airflow interop extras when needed:

pip install -e ".[airflow]"

Multi-Orchestrator Topology Adapters

Dependency-light adapters are available for topology reuse from:

  • Flyte
  • MLRun
  • Metaflow
  • Apache NiFi
  • Shipyard
  • Mage
  • Argo Workflows
  • Kestra
  • Luigi
  • Dagster

Example:

from cascade_sdk import argo_workflow_to_dag

dag = argo_workflow_to_dag(argo_workflow_dict)

These adapters map task nodes + dependencies into the standard Cascade DAG shape.

Metadata Hooks on Trigger/Poll

def metadata_hook(cloudevent, provenance):
    print(cloudevent["type"], cloudevent["id"])

client = CascadeClient(
    base_url="http://localhost:3000",
    api_key="your_api_key",
    metadata_hook=metadata_hook,
)

The hook is emitted for:

  • run trigger (cascade.flow.run.triggered)
  • terminal states (e.g. cascade.flow.run.completed, cascade.flow.run.failed)

Product-Focused Examples

See runnable examples in examples/:

  • examples/hexarch_forensics_provenance.py
  • examples/agentation_human_in_loop.py
  • examples/hexarch_ingest_events.py

Type Hints (Optional)

from cascade_sdk.types import DAGDefinition, FlowRun

dag: DAGDefinition = build_dag_from_flow(my_flow)
run: FlowRun = client.get_run(run_id)

Custom Polling

def on_status_change(status: str):
    print(f"Status: {status}")

result = wait_for_completion(
    client,
    run_id,
    timeout=600,
    poll_interval=2.0,
    on_status_change=on_status_change
)

SSL Configuration

client = CascadeClient(
    base_url="https://cascade.example.com",
    api_key="key",
    timeout=60,
    verify_ssl=False  # Disable SSL verification (not recommended)
)

Architecture

Deterministic Compilation

The @flow decorator enables capture mode: task calls are intercepted to build the DAG structure without executing the actual task logic.

@flow
def example_flow():
    x = task_a()      # Captured as node-0
    y = task_b(x)     # Captured as node-1, depends on node-0
    return task_c(y)  # Captured as node-2, marked as return

This produces a deterministic DAG every time compilation runs.

Thin Client Design

The SDK has zero orchestration logic:

  • No retries
  • No caching
  • No async task execution
  • No agent integration

All orchestration is server-side. The SDK only compiles DAGs and calls HTTP APIs.

Error Envelopes

All errors follow RFC 7807 Problem Details:

{
  "type": "https://cascade.dev/errors/validation",
  "title": "Invalid DAG structure",
  "status": 400,
  "detail": "Node 'node-1' depends on non-existent node 'node-0'",
  "instance": "/api/flows"
}

Requirements

  • Python >= 3.8
  • requests >= 2.25.0

Development

# Clone repository
git clone https://github.com/no1rstack/cascade-sdk-public.git
cd cascade-sdk-public

# Install in editable mode
pip install -e .

# Install dev dependencies
pip install -e ".[dev]"

# Run tests
pytest -q

# Type check
mypy cascade_sdk

# Run micro-benchmarks
python scripts/bench_compile.py

Release Process

  • Push a semantic version tag like v0.1.0 to trigger the release workflow.
  • The release workflow builds sdist/wheel and publishes to PyPI using trusted publishing.
  • Configure PyPI trusted publisher for this GitHub repository before first publish.

Integration Parity Testing

An integration parity script is included at test_parity.py for local SDK/control-plane checks. Detailed setup and local runtime notes are documented in CONTRIBUTING.md.

License

MIT

Changelog

See CHANGELOG.md for release history.

Support

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

noirstack_cascade_sdk-0.1.1.tar.gz (27.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

noirstack_cascade_sdk-0.1.1-py3-none-any.whl (28.3 kB view details)

Uploaded Python 3

File details

Details for the file noirstack_cascade_sdk-0.1.1.tar.gz.

File metadata

  • Download URL: noirstack_cascade_sdk-0.1.1.tar.gz
  • Upload date:
  • Size: 27.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.2

File hashes

Hashes for noirstack_cascade_sdk-0.1.1.tar.gz
Algorithm Hash digest
SHA256 e366ca7607794f07c1dc01616a7514d8e936fe5bf8d0165faec7cc389e1b65b5
MD5 b9f80aef4b39c111debac92ce80fa91a
BLAKE2b-256 1b10e97df9f0fe1f778b31a42e0687b47bbd471317683682d12a8e9acd3891af

See more details on using hashes here.

File details

Details for the file noirstack_cascade_sdk-0.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for noirstack_cascade_sdk-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 9548c12e82eb708ac60fae5c5169d6e6f06236e866cb455ef13b9ad41c273688
MD5 2f3caa2ea55df79cf1df9efb0607487f
BLAKE2b-256 0aa8ad77acab847219a7705383e17b0095ffc1a1f8e5409d00ea7adbf955628a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page