Declarative DAG workflows with WAIT/RESUME, priority scheduling, and distributed rehydration
Project description
nodus-workflow
Declarative DAG workflows with WAIT/RESUME, priority scheduling, and distributed rehydration for Nodus AI systems.
Standalone workflow primitives: define DAGs, execute them with priority-queued scheduling, suspend nodes on events, and rehydrate WAITING runs after process restart. No required external dependencies — pure stdlib.
Note on naming: This is the standalone
nodus-workflowpackage (C:\dev\nodus-workflow). The nodus-lang runtime also ships an in-treenodus_workflowpackage (src/nodus_workflow/) with a full HTTP/CLI server, SQLite store, and nodus-lang integration. The two are distinct.
Status: v0.1.0 — prepared, not yet published.
Install
pip install nodus-workflow
What it provides
| Component | Purpose |
|---|---|
FlowDefinition |
DAG: nodes, edges, default retry, timeout |
FlowNode |
One node with handler_id, config, optional retry override |
FlowEdge |
Directed edge with optional condition function |
FlowRun / InMemoryRunStore |
Run state + thread-safe in-memory store |
SchedulerEngine |
Priority queue (high/normal/low) + WAIT/RESUME |
FlowExecutor |
Orchestrates start(), resume(), handler registration |
FlowRehydrator |
Re-registers WAITING runs after process restart |
WorkflowWaitSignal |
Raise inside a handler to suspend node execution |
Quick start
import asyncio
from nodus_workflow import (
FlowDefinition, FlowNode, FlowEdge, FlowExecutor,
InMemoryRunStore, SchedulerEngine,
)
# Define handlers
async def fetch_data(ctx):
return {"data": "fetched"}
async def process_data(ctx):
return {"processed": ctx["state"].get("data")}
# Build the DAG
flow = FlowDefinition(
name="my-pipeline",
nodes=[
FlowNode(id="fetch", handler_id="fetch_data"),
FlowNode(id="process", handler_id="process_data"),
],
edges=[
FlowEdge(from_node="fetch", to_node="process"),
],
)
# Execute
store = InMemoryRunStore()
scheduler = SchedulerEngine()
executor = FlowExecutor(store=store, scheduler=scheduler)
executor.register_handler("fetch_data", fetch_data)
executor.register_handler("process_data", process_data)
run = await executor.start(flow, initial_state={})
print(run.status) # FlowStatus.COMPLETED
WAIT/RESUME semantics
from nodus_workflow import WorkflowWaitSignal
async def approval_node(ctx):
raise WorkflowWaitSignal(
event_type="approval.granted",
correlation_key=ctx["run_id"],
)
# Later, when the event fires:
await executor.resume(run_id, event_payload={"approver": "alice"})
When a node raises WorkflowWaitSignal, the run transitions to WAITING
and is parked in the scheduler until notify_event or resume is called.
SchedulerEngine
from nodus_workflow import SchedulerEngine
from nodus_workflow.run import FlowStatus
scheduler = SchedulerEngine()
# Schedule with priority
scheduler.schedule(run_id, priority="high") # high / normal / low
scheduler.schedule(run_id, priority="normal")
next_run_id = scheduler.pop() # returns highest-priority pending run | None
# WAIT/RESUME
scheduler.wait_for_event(run_id, event_type="approval.granted", key="k")
scheduler.notify_event(event_type="approval.granted", key="k") # re-queues run
scheduler.cancel_wait(run_id)
FlowRehydrator
from nodus_workflow import FlowRehydrator, InMemoryRunStore
store = InMemoryRunStore()
rehydrator = FlowRehydrator(store=store, scheduler=scheduler)
# On process startup — re-register all WAITING runs
rehydrator.rehydrate()
FlowStatus transitions
PENDING → RUNNING → WAITING → (event fires) → EXECUTING → COMPLETED
↘ FAILED
Design
- No required dependencies. Pure stdlib (
asyncio,threading,heapq,dataclasses,datetime,uuid). - Protocol-based handlers. Any async callable
(context: dict) → dictsatisfiesNodeHandler. - Thread-safe.
SchedulerEngineandInMemoryRunStoreusethreading.Lock. - Separate from nodus-lang. No nodus-lang import required — use as a standalone workflow engine or integrate with any Python application.
Development
pip install -e ".[dev]"
pytest tests/ -q
License
MIT — see LICENSE.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file nodus_workflow-0.1.0.tar.gz.
File metadata
- Download URL: nodus_workflow-0.1.0.tar.gz
- Upload date:
- Size: 13.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
157b3850a288a1f17c62cfb1b944920f6de3609c3d8f9e5662bd384bfbcd464c
|
|
| MD5 |
22e64f0a8e8e32892879af5ae938dab4
|
|
| BLAKE2b-256 |
e1d7efd60333b10b2cf8d282db760a75ddf1f5936735430e602b2bcd6f4a2a10
|
File details
Details for the file nodus_workflow-0.1.0-py3-none-any.whl.
File metadata
- Download URL: nodus_workflow-0.1.0-py3-none-any.whl
- Upload date:
- Size: 12.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
60dcc2e33a6c12540bda4f7246ab8c01fa73c6cdf4e31027790bad3582fc88e7
|
|
| MD5 |
cf15a881a1d75dd5e3a48d71e97da4ac
|
|
| BLAKE2b-256 |
3d48c957a3fc7d0c522f7efc1479065faeb7730e95b2d7b0487093f728058ef2
|