Skip to main content

Python SDK for Cascade workflow orchestration

Project description

Cascade SDK

Python SDK for Cascade workflow orchestration APIs.

Cascade is a product by Noir Stack LLC.

Status

noirstack-cascade-sdk is currently in early public release (0.1.x). The API surface may evolve as orchestration and standards integrations mature.

Installation

pip install noirstack-cascade-sdk

Quick Start

1. Define Tasks and Flows

from cascade_sdk import task, flow

@task
def fetch_data(url: str) -> dict:
    """Fetch data from API."""
    import requests
    return requests.get(url).json()

@task
def process_data(data: dict) -> list:
    """Transform data."""
    return [item['id'] for item in data['results']]

@task
def save_results(ids: list) -> str:
    """Save to database."""
    # ... database logic ...
    return f"Saved {len(ids)} items"

@flow
def data_pipeline(api_url: str) -> str:
    """Complete ETL pipeline."""
    raw_data = fetch_data(api_url)
    processed = process_data(raw_data)
    return save_results(processed)

2. Compile to DAG

from cascade_sdk.compiler import build_dag_from_flow

dag = build_dag_from_flow(data_pipeline)
print(dag)
# {
#   'nodes': [
#     {'id': 'node-0', 'task': 'fetch_data'},
#     {'id': 'node-1', 'task': 'process_data', 'depends_on': ['node-0']},
#     {'id': 'node-2', 'task': 'save_results', 'depends_on': ['node-1']}
#   ],
#   'edges': [
#     {'from': 'node-0', 'to': 'node-1'},
#     {'from': 'node-1', 'to': 'node-2'}
#   ],
#   'return_node_id': 'node-2'
# }

3. Register and Execute

from cascade_sdk import CascadeClient, wait_for_completion

# Connect to Cascade server
client = CascadeClient(
    base_url="http://localhost:3000",
    api_key="your_api_key"
)

# Register flow
flow_id = client.register_flow("data_pipeline", dag)
print(f"Registered flow: {flow_id}")

# Trigger execution
run_id = client.trigger_flow(flow_id, {
    "api_url": "https://api.example.com/data"
})
print(f"Started run: {run_id}")

# Wait for completion
result = wait_for_completion(client, run_id, timeout=300)

if result['status'] == 'completed':
    print("Success:", result['result'])
else:
    print("Failed:", result.get('error'))

Features

Decorators

  • @task: Mark function as executable task
  • @flow: Mark function as workflow orchestration

Compiler

  • build_dag_from_flow(flow_func): Extract DAG from flow
  • build_dag_from_flow_json(flow_func): Get canonical JSON
  • canonical_json(obj): Deterministic serialization
  • canonicalize_dag(dag): Normalize DAG structure

Client

  • CascadeClient: HTTP client for Cascade API

    • register_flow(name, dag): Register flow definition
    • trigger_flow(flow_id, inputs): Start execution
    • get_run(run_id): Get run status
    • get_flow_graph(flow_id): Get flow DAG
    • get_run_graph(run_id): Get execution graph
  • wait_for_completion(client, run_id): Poll until done

Error Handling

from cascade_sdk import (
    AuthenticationError,
    ValidationError,
    NotFoundError,
    RateLimitError,
    OrchestrationError,
    NetworkError,
    TimeoutError,
)

try:
    flow_id = client.register_flow("my_flow", dag)
except ValidationError as e:
    print(f"Invalid DAG: {e}")
    print(f"Details: {e.response_body}")
except AuthenticationError:
    print("Check your API key")

Advanced Usage

Ecosystem Compatibility

If you are also using hexarch-guardrails in your stack, this SDK can be used alongside it:

  • keep guardrail/policy logic in hexarch-guardrails
  • keep DAG compilation + Cascade API transport in cascade-sdk

This separation helps maintain clean orchestration boundaries.

Standards-Aligned Workflow Metadata

Cascade SDK includes helpers aligned to workflow/orchestration standards:

  • BPMN 2.0: design-time mapping to executable DAG shape
  • OpenTelemetry: trace context propagation (traceparent, tracestate)
  • W3C PROV: provenance bundle helpers for entities/activities/agents
  • CNCF CloudEvents: event envelope helpers for run and task events
  • NIST SP 800-204: boundary and microservice security guidance profile

Install optional standards extras:

pip install -e ".[standards]"

Use helpers:

from cascade_sdk import build_cloudevent, build_prov_bundle, build_trace_context_headers

event = build_cloudevent(
    event_type="cascade.flow.run.started",
    source="urn:cascade:orchestrator",
    data={"run_id": "run-123"},
    extensions=build_trace_context_headers(),
)

prov = build_prov_bundle()

Validate standards artifacts before transport/storage:

from cascade_sdk import validate_cloudevent, validate_dag

validate_cloudevent(event)
validate_dag(dag)

See WORKFLOW_STANDARDS_PROFILE.md for the full profile.

BPMN 2.0 to DAG Mapping

from cascade_sdk import bpmn_xml_to_dag

with open("process.bpmn", "r", encoding="utf-8") as f:
    dag = bpmn_xml_to_dag(f.read())

This maps BPMN task-like nodes and sequence flows into the SDK DAG shape.

Apache Airflow DAG Adapter

from cascade_sdk import airflow_dag_to_dag

# airflow_dag is an Airflow DAG instance
dag = airflow_dag_to_dag(airflow_dag)

Install Airflow interop extras when needed:

pip install -e ".[airflow]"

Multi-Orchestrator Topology Adapters

Dependency-light adapters are available for topology reuse from:

  • Flyte
  • MLRun
  • Metaflow
  • Apache NiFi
  • Shipyard
  • Mage
  • Argo Workflows
  • Kestra
  • Luigi
  • Dagster

Example:

from cascade_sdk import argo_workflow_to_dag

dag = argo_workflow_to_dag(argo_workflow_dict)

These adapters map task nodes + dependencies into the standard Cascade DAG shape.

Metadata Hooks on Trigger/Poll

def metadata_hook(cloudevent, provenance):
    print(cloudevent["type"], cloudevent["id"])

client = CascadeClient(
    base_url="http://localhost:3000",
    api_key="your_api_key",
    metadata_hook=metadata_hook,
)

The hook is emitted for:

  • run trigger (cascade.flow.run.triggered)
  • terminal states (e.g. cascade.flow.run.completed, cascade.flow.run.failed)

Product-Focused Examples

See runnable examples in examples/:

  • examples/hexarch_forensics_provenance.py
  • examples/agentation_human_in_loop.py
  • examples/hexarch_ingest_events.py

Type Hints (Optional)

from cascade_sdk.types import DAGDefinition, FlowRun

dag: DAGDefinition = build_dag_from_flow(my_flow)
run: FlowRun = client.get_run(run_id)

Custom Polling

def on_status_change(status: str):
    print(f"Status: {status}")

result = wait_for_completion(
    client,
    run_id,
    timeout=600,
    poll_interval=2.0,
    on_status_change=on_status_change
)

SSL Configuration

client = CascadeClient(
    base_url="https://cascade.example.com",
    api_key="key",
    timeout=60,
    verify_ssl=False  # Disable SSL verification (not recommended)
)

Architecture

Deterministic Compilation

The @flow decorator enables capture mode: task calls are intercepted to build the DAG structure without executing the actual task logic.

@flow
def example_flow():
    x = task_a()      # Captured as node-0
    y = task_b(x)     # Captured as node-1, depends on node-0
    return task_c(y)  # Captured as node-2, marked as return

This produces a deterministic DAG every time compilation runs.

Thin Client Design

The SDK has zero orchestration logic:

  • No retries
  • No caching
  • No async task execution
  • No agent integration

All orchestration is server-side. The SDK only compiles DAGs and calls HTTP APIs.

Error Envelopes

All errors follow RFC 7807 Problem Details:

{
  "type": "https://cascade.dev/errors/validation",
  "title": "Invalid DAG structure",
  "status": 400,
  "detail": "Node 'node-1' depends on non-existent node 'node-0'",
  "instance": "/api/flows"
}

Requirements

  • Python >= 3.8
  • requests >= 2.25.0

Development

# Clone repository
git clone https://github.com/no1rstack/cascade-sdk-public.git
cd cascade-sdk-public

# Install in editable mode
pip install -e .

# Install dev dependencies
pip install -e ".[dev]"

# Run tests
pytest -q

# Type check
mypy cascade_sdk

# Run micro-benchmarks
python scripts/bench_compile.py

Release Process

  • Push a semantic version tag like v0.1.0 to trigger the release workflow.
  • The release workflow builds sdist/wheel and publishes to PyPI using trusted publishing.
  • Configure PyPI trusted publisher for this GitHub repository before first publish.

Integration Parity Testing

An integration parity script is included at test_parity.py for local SDK/control-plane checks. Detailed setup and local runtime notes are documented in CONTRIBUTING.md.

License

MIT

Changelog

See CHANGELOG.md for release history.

Support

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

noirstack_cascade_sdk-0.1.0.tar.gz (27.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

noirstack_cascade_sdk-0.1.0-py3-none-any.whl (28.3 kB view details)

Uploaded Python 3

File details

Details for the file noirstack_cascade_sdk-0.1.0.tar.gz.

File metadata

  • Download URL: noirstack_cascade_sdk-0.1.0.tar.gz
  • Upload date:
  • Size: 27.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.2

File hashes

Hashes for noirstack_cascade_sdk-0.1.0.tar.gz
Algorithm Hash digest
SHA256 715cb2227800ef50637c0a195a7e1649e390e93d8af233ef9be9f0b7f196133a
MD5 cf2d3b7ba0e6d939a26b835fd105a2ae
BLAKE2b-256 929ea60288e4110a91020d32efc4861e2ac8daf0292e27b4a5ff915bef05d70a

See more details on using hashes here.

File details

Details for the file noirstack_cascade_sdk-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for noirstack_cascade_sdk-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 9e37f83323771ed6390d7393127e7855d12b2be274fec69a2246c0a4e4a72e46
MD5 e37e41f0f3cefb372e36a5a025f9565e
BLAKE2b-256 09033b7289c9bdb59b236068b83407ad9fbe10a53f7d61dbb60cc943dd700e0c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page