Python SDK for Cascade workflow orchestration
Project description
Cascade SDK
Python SDK for Cascade workflow orchestration APIs.
Cascade is a product by Noir Stack LLC.
Status
noirstack-cascade-sdk is currently in early public release (0.1.x).
The API surface may evolve as orchestration and standards integrations mature.
Installation
pip install noirstack-cascade-sdk
Canonical Usage
Install from PyPI using:
pip install noirstack-cascade-sdk
Import in code using:
from cascade_sdk import CascadeClient, flow, task
Quick Start
1. Define Tasks and Flows
from cascade_sdk import task, flow
@task
def fetch_data(url: str) -> dict:
"""Fetch data from API."""
import requests
return requests.get(url).json()
@task
def process_data(data: dict) -> list:
"""Transform data."""
return [item['id'] for item in data['results']]
@task
def save_results(ids: list) -> str:
"""Save to database."""
# ... database logic ...
return f"Saved {len(ids)} items"
@flow
def data_pipeline(api_url: str) -> str:
"""Complete ETL pipeline."""
raw_data = fetch_data(api_url)
processed = process_data(raw_data)
return save_results(processed)
2. Compile to DAG
from cascade_sdk.compiler import build_dag_from_flow
dag = build_dag_from_flow(data_pipeline)
print(dag)
# {
# 'nodes': [
# {'id': 'node-0', 'task': 'fetch_data'},
# {'id': 'node-1', 'task': 'process_data', 'depends_on': ['node-0']},
# {'id': 'node-2', 'task': 'save_results', 'depends_on': ['node-1']}
# ],
# 'edges': [
# {'from': 'node-0', 'to': 'node-1'},
# {'from': 'node-1', 'to': 'node-2'}
# ],
# 'return_node_id': 'node-2'
# }
3. Register and Execute
from cascade_sdk import CascadeClient, wait_for_completion
# Connect to Cascade server
client = CascadeClient(
base_url="http://localhost:3000",
api_key="your_api_key"
)
# Register flow
flow_id = client.register_flow("data_pipeline", dag)
print(f"Registered flow: {flow_id}")
# Trigger execution
run_id = client.trigger_flow(flow_id, {
"api_url": "https://api.example.com/data"
})
print(f"Started run: {run_id}")
# Wait for completion
result = wait_for_completion(client, run_id, timeout=300)
if result['status'] == 'completed':
print("Success:", result['result'])
else:
print("Failed:", result.get('error'))
Features
Decorators
@task: Mark function as executable task@flow: Mark function as workflow orchestration
Compiler
build_dag_from_flow(flow_func): Extract DAG from flowbuild_dag_from_flow_json(flow_func): Get canonical JSONcanonical_json(obj): Deterministic serializationcanonicalize_dag(dag): Normalize DAG structure
Client
-
CascadeClient: HTTP client for Cascade APIregister_flow(name, dag): Register flow definitiontrigger_flow(flow_id, inputs): Start executionget_run(run_id): Get run statusget_flow_graph(flow_id): Get flow DAGget_run_graph(run_id): Get execution graph
-
wait_for_completion(client, run_id): Poll until done
Error Handling
from cascade_sdk import (
AuthenticationError,
ValidationError,
NotFoundError,
RateLimitError,
OrchestrationError,
NetworkError,
TimeoutError,
)
try:
flow_id = client.register_flow("my_flow", dag)
except ValidationError as e:
print(f"Invalid DAG: {e}")
print(f"Details: {e.response_body}")
except AuthenticationError:
print("Check your API key")
Advanced Usage
Ecosystem Compatibility
If you are also using hexarch-guardrails in your stack, this SDK can be used alongside it:
- keep guardrail/policy logic in
hexarch-guardrails - keep DAG compilation + Cascade API transport in
cascade-sdk
This separation helps maintain clean orchestration boundaries.
Standards-Aligned Workflow Metadata
Cascade SDK includes helpers aligned to workflow/orchestration standards:
BPMN 2.0: design-time mapping to executable DAG shapeOpenTelemetry: trace context propagation (traceparent,tracestate)W3C PROV: provenance bundle helpers for entities/activities/agentsCNCF CloudEvents: event envelope helpers for run and task eventsNIST SP 800-204: boundary and microservice security guidance profile
Install optional standards extras:
pip install -e ".[standards]"
Use helpers:
from cascade_sdk import build_cloudevent, build_prov_bundle, build_trace_context_headers
event = build_cloudevent(
event_type="cascade.flow.run.started",
source="urn:cascade:orchestrator",
data={"run_id": "run-123"},
extensions=build_trace_context_headers(),
)
prov = build_prov_bundle()
Validate standards artifacts before transport/storage:
from cascade_sdk import validate_cloudevent, validate_dag
validate_cloudevent(event)
validate_dag(dag)
See WORKFLOW_STANDARDS_PROFILE.md for the full profile.
BPMN 2.0 to DAG Mapping
from cascade_sdk import bpmn_xml_to_dag
with open("process.bpmn", "r", encoding="utf-8") as f:
dag = bpmn_xml_to_dag(f.read())
This maps BPMN task-like nodes and sequence flows into the SDK DAG shape.
Apache Airflow DAG Adapter
from cascade_sdk import airflow_dag_to_dag
# airflow_dag is an Airflow DAG instance
dag = airflow_dag_to_dag(airflow_dag)
Install Airflow interop extras when needed:
pip install -e ".[airflow]"
Multi-Orchestrator Topology Adapters
Dependency-light adapters are available for topology reuse from:
- Flyte
- MLRun
- Metaflow
- Apache NiFi
- Shipyard
- Mage
- Argo Workflows
- Kestra
- Luigi
- Dagster
Example:
from cascade_sdk import argo_workflow_to_dag
dag = argo_workflow_to_dag(argo_workflow_dict)
These adapters map task nodes + dependencies into the standard Cascade DAG shape.
Metadata Hooks on Trigger/Poll
def metadata_hook(cloudevent, provenance):
print(cloudevent["type"], cloudevent["id"])
client = CascadeClient(
base_url="http://localhost:3000",
api_key="your_api_key",
metadata_hook=metadata_hook,
)
The hook is emitted for:
- run trigger (
cascade.flow.run.triggered) - terminal states (e.g.
cascade.flow.run.completed,cascade.flow.run.failed)
Product-Focused Examples
See runnable examples in examples/:
examples/hexarch_forensics_provenance.pyexamples/agentation_human_in_loop.pyexamples/hexarch_ingest_events.py
Type Hints (Optional)
from cascade_sdk.types import DAGDefinition, FlowRun
dag: DAGDefinition = build_dag_from_flow(my_flow)
run: FlowRun = client.get_run(run_id)
Custom Polling
def on_status_change(status: str):
print(f"Status: {status}")
result = wait_for_completion(
client,
run_id,
timeout=600,
poll_interval=2.0,
on_status_change=on_status_change
)
SSL Configuration
client = CascadeClient(
base_url="https://cascade.example.com",
api_key="key",
timeout=60,
verify_ssl=False # Disable SSL verification (not recommended)
)
Architecture
Deterministic Compilation
The @flow decorator enables capture mode: task calls are intercepted to build the DAG structure without executing the actual task logic.
@flow
def example_flow():
x = task_a() # Captured as node-0
y = task_b(x) # Captured as node-1, depends on node-0
return task_c(y) # Captured as node-2, marked as return
This produces a deterministic DAG every time compilation runs.
Thin Client Design
The SDK has zero orchestration logic:
- No retries
- No caching
- No async task execution
- No agent integration
All orchestration is server-side. The SDK only compiles DAGs and calls HTTP APIs.
Error Envelopes
All errors follow RFC 7807 Problem Details:
{
"type": "https://cascade.dev/errors/validation",
"title": "Invalid DAG structure",
"status": 400,
"detail": "Node 'node-1' depends on non-existent node 'node-0'",
"instance": "/api/flows"
}
Requirements
- Python >= 3.8
requests>= 2.25.0
Development
# Clone repository
git clone https://github.com/no1rstack/cascade-sdk-public.git
cd cascade-sdk-public
# Install in editable mode
pip install -e .
# Install dev dependencies
pip install -e ".[dev]"
# Run tests
pytest -q
# Type check
mypy cascade_sdk
# Run micro-benchmarks
python scripts/bench_compile.py
Release Process
- Push a semantic version tag like
v0.1.0to trigger the release workflow. - The release workflow builds sdist/wheel and publishes to PyPI using trusted publishing.
- Configure PyPI trusted publisher for this GitHub repository before first publish.
Integration Parity Testing
An integration parity script is included at test_parity.py for local SDK/control-plane checks.
Detailed setup and local runtime notes are documented in CONTRIBUTING.md.
License
MIT
Changelog
See CHANGELOG.md for release history.
Support
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file noirstack_cascade_sdk-0.1.1.tar.gz.
File metadata
- Download URL: noirstack_cascade_sdk-0.1.1.tar.gz
- Upload date:
- Size: 27.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e366ca7607794f07c1dc01616a7514d8e936fe5bf8d0165faec7cc389e1b65b5
|
|
| MD5 |
b9f80aef4b39c111debac92ce80fa91a
|
|
| BLAKE2b-256 |
1b10e97df9f0fe1f778b31a42e0687b47bbd471317683682d12a8e9acd3891af
|
File details
Details for the file noirstack_cascade_sdk-0.1.1-py3-none-any.whl.
File metadata
- Download URL: noirstack_cascade_sdk-0.1.1-py3-none-any.whl
- Upload date:
- Size: 28.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9548c12e82eb708ac60fae5c5169d6e6f06236e866cb455ef13b9ad41c273688
|
|
| MD5 |
2f3caa2ea55df79cf1df9efb0607487f
|
|
| BLAKE2b-256 |
0aa8ad77acab847219a7705383e17b0095ffc1a1f8e5409d00ea7adbf955628a
|