Skip to main content

Declarative DAG workflows with WAIT/RESUME, priority scheduling, and distributed rehydration

Project description

nodus-workflow

Declarative DAG workflows with WAIT/RESUME, priority scheduling, and distributed rehydration for Nodus AI systems.

Standalone workflow primitives: define DAGs, execute them with priority-queued scheduling, suspend nodes on events, and rehydrate WAITING runs after process restart. No required external dependencies — pure stdlib.

Note on naming: This is the standalone nodus-workflow package (C:\dev\nodus-workflow). The nodus-lang runtime also ships an in-tree nodus_workflow package (src/nodus_workflow/) with a full HTTP/CLI server, SQLite store, and nodus-lang integration. The two are distinct.

Status: v0.1.0 — prepared, not yet published.


Install

pip install nodus-workflow

What it provides

Component Purpose
FlowDefinition DAG: nodes, edges, default retry, timeout
FlowNode One node with handler_id, config, optional retry override
FlowEdge Directed edge with optional condition function
FlowRun / InMemoryRunStore Run state + thread-safe in-memory store
SchedulerEngine Priority queue (high/normal/low) + WAIT/RESUME
FlowExecutor Orchestrates start(), resume(), handler registration
FlowRehydrator Re-registers WAITING runs after process restart
WorkflowWaitSignal Raise inside a handler to suspend node execution

Quick start

import asyncio
from nodus_workflow import (
    FlowDefinition, FlowNode, FlowEdge, FlowExecutor,
    InMemoryRunStore, SchedulerEngine,
)

# Define handlers
async def fetch_data(ctx):
    return {"data": "fetched"}

async def process_data(ctx):
    return {"processed": ctx["state"].get("data")}

# Build the DAG
flow = FlowDefinition(
    name="my-pipeline",
    nodes=[
        FlowNode(id="fetch",   handler_id="fetch_data"),
        FlowNode(id="process", handler_id="process_data"),
    ],
    edges=[
        FlowEdge(from_node="fetch", to_node="process"),
    ],
)

# Execute
store = InMemoryRunStore()
scheduler = SchedulerEngine()
executor = FlowExecutor(store=store, scheduler=scheduler)
executor.register_handler("fetch_data",   fetch_data)
executor.register_handler("process_data", process_data)

run = await executor.start(flow, initial_state={})
print(run.status)   # FlowStatus.COMPLETED

WAIT/RESUME semantics

from nodus_workflow import WorkflowWaitSignal

async def approval_node(ctx):
    raise WorkflowWaitSignal(
        event_type="approval.granted",
        correlation_key=ctx["run_id"],
    )

# Later, when the event fires:
await executor.resume(run_id, event_payload={"approver": "alice"})

When a node raises WorkflowWaitSignal, the run transitions to WAITING and is parked in the scheduler until notify_event or resume is called.


SchedulerEngine

from nodus_workflow import SchedulerEngine
from nodus_workflow.run import FlowStatus

scheduler = SchedulerEngine()

# Schedule with priority
scheduler.schedule(run_id, priority="high")      # high / normal / low
scheduler.schedule(run_id, priority="normal")

next_run_id = scheduler.pop()   # returns highest-priority pending run | None

# WAIT/RESUME
scheduler.wait_for_event(run_id, event_type="approval.granted", key="k")
scheduler.notify_event(event_type="approval.granted", key="k")  # re-queues run
scheduler.cancel_wait(run_id)

FlowRehydrator

from nodus_workflow import FlowRehydrator, InMemoryRunStore

store = InMemoryRunStore()
rehydrator = FlowRehydrator(store=store, scheduler=scheduler)

# On process startup — re-register all WAITING runs
rehydrator.rehydrate()

FlowStatus transitions

PENDING → RUNNING → WAITING → (event fires) → EXECUTING → COMPLETED
                                                          ↘ FAILED

Design

  • No required dependencies. Pure stdlib (asyncio, threading, heapq, dataclasses, datetime, uuid).
  • Protocol-based handlers. Any async callable (context: dict) → dict satisfies NodeHandler.
  • Thread-safe. SchedulerEngine and InMemoryRunStore use threading.Lock.
  • Separate from nodus-lang. No nodus-lang import required — use as a standalone workflow engine or integrate with any Python application.

Development

pip install -e ".[dev]"
pytest tests/ -q

License

MIT — see LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

nodus_workflow-0.1.0.tar.gz (13.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

nodus_workflow-0.1.0-py3-none-any.whl (12.8 kB view details)

Uploaded Python 3

File details

Details for the file nodus_workflow-0.1.0.tar.gz.

File metadata

  • Download URL: nodus_workflow-0.1.0.tar.gz
  • Upload date:
  • Size: 13.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.9

File hashes

Hashes for nodus_workflow-0.1.0.tar.gz
Algorithm Hash digest
SHA256 157b3850a288a1f17c62cfb1b944920f6de3609c3d8f9e5662bd384bfbcd464c
MD5 22e64f0a8e8e32892879af5ae938dab4
BLAKE2b-256 e1d7efd60333b10b2cf8d282db760a75ddf1f5936735430e602b2bcd6f4a2a10

See more details on using hashes here.

File details

Details for the file nodus_workflow-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: nodus_workflow-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 12.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.9

File hashes

Hashes for nodus_workflow-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 60dcc2e33a6c12540bda4f7246ab8c01fa73c6cdf4e31027790bad3582fc88e7
MD5 cf15a881a1d75dd5e3a48d71e97da4ac
BLAKE2b-256 3d48c957a3fc7d0c522f7efc1479065faeb7730e95b2d7b0487093f728058ef2

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page