Skip to main content

Dependency-aware async task scheduling with reconciliation — DAGExecutor, WaveReconciler, MetaDAG

Project description

wave-executor

Dependency-aware async task scheduling for Python, with three execution modes that compose into a four-level hierarchy.

Task (atomic) → DAGExecutor → WaveReconciler → MetaDAG

Zero required dependencies. Pure asyncio. Python 3.9+.


The problem with level-based task execution

Most async task runners group tasks into "waves" by dependency depth:

A (10s) ─┐
          ├─→ C (1s)
B  (1s) ─┘
D  (1s) ─→ E (1s)

A level-based executor sees depth-0 = {A, B, D} and depth-1 = {C, E}. E must wait 10 seconds for A, even though E only depends on D.

DAGExecutor starts each task the moment its specific dependencies complete. E starts at t=1, completes at t=2 — 9 seconds earlier.


Installation

pip install wave-executor

DAGExecutor — one-shot dependency graph

import asyncio
from wave_executor import DAGExecutor, WaveConfig

async def process(task_id: str, spec: dict):
    # Your actual work here
    return f"result-{task_id}"

async def main():
    executor = DAGExecutor(config=WaveConfig(max_parallel_per_wave=20))
    result = await executor.execute(
        tasks={
            "fetch_a": {"url": "..."},
            "fetch_b": {"url": "..."},
            "analyze":  {"model": "fast"},
            "report":   {},
        },
        dependencies={
            "analyze": {"fetch_a", "fetch_b"},  # waits for both fetches
            "report":  {"analyze"},
        },
        task_executor=process,
    )

    print(f"{result.total_successful}/{result.total_tasks} succeeded")
    print(f"Critical path: {result.get_critical_path(dependencies)}")

asyncio.run(main())

Key features:

  • asyncio.Event per task — zero-overhead dependency signalling
  • AdaptiveSemaphore — AIMD-controlled concurrency window (self-adjusts under load)
  • Failed deps automatically skip all downstream tasks
  • Optional dep-output injection: completed results are injected into downstream specs under _dep_outputs
  • Exponential-backoff retry, per-task timeout, total execution timeout
  • Cycle detection (Kahn's algorithm) before execution starts

WaveReconciler — desired-state reconciliation

Inspired by stereOS agentd's reconciliation loop. Instead of running once and returning, the reconciler holds a desired state and continuously ensures tasks match it.

from wave_executor import WaveReconciler, WaveConfig
from wave_executor.reconciler import RestartPolicy

async def run_job(task_id: str, spec: dict):
    # Work that might crash and needs restarting
    ...

reconciler = WaveReconciler(
    task_executor=run_job,
    config=WaveConfig(max_parallel_per_wave=10),
    poll_interval_s=2.0,
)

await reconciler.start()

await reconciler.update(
    tasks={
        "validator":  {"port": 8080},
        "aggregator": {"upstream": "validator"},
    },
    dependencies={"aggregator": {"validator"}},
    policies={
        "validator":  "always",      # restart after any exit
        "aggregator": "on-failure",  # restart only on crash
    },
    max_restart_attempts={"validator": 100, "aggregator": 5},
    restart_backoff_base=3.0,   # exponential: 0s, 3s, 6s, 12s...
)

# Hot-swap the spec while running — unchanged tasks are undisturbed
await reconciler.update(tasks={"validator": {"port": 9090}, "aggregator": ...})

# Wait for all non-ALWAYS tasks to reach terminal state
ok = await reconciler.wait_until_complete(timeout=60)
await reconciler.stop()

Key features:

  • Three restart policies: no / on-failure / always
  • Exponential backoff: base * 2^(n-1) seconds between retries
  • Hot-update desired state without stopping everything
  • max_restart_attempts ceiling — task marked "blocked" when exceeded
  • Dependency-aware: tasks only start once upstream deps have "success" status
  • Dep-output injection: outputs from completed deps injected into downstream specs

MetaDAG — DAG of DAGs

Orchestrate multiple DAGExecutors and WaveReconcilers as nodes in a higher-level dependency graph. Upstream node outputs are injected into downstream nodes' task specs.

from wave_executor import MetaDAG
from wave_executor.meta import MetaDAGNodeSpec

async def fetch(task_id, spec): ...
async def analyze(task_id, spec): ...
async def store(task_id, spec): ...

meta = (
    MetaDAG(inject_dep_outputs=True)
    .add_node(MetaDAGNodeSpec(
        node_id="ingest",
        tasks={"page_a": url_a, "page_b": url_b},
        task_executor=fetch,
    ))
    .add_node(MetaDAGNodeSpec(
        node_id="analyze",
        tasks={"report": {}},
        task_executor=analyze,   # receives _upstream_outputs["ingest"]
    ), depends_on=["ingest"])
    .add_node(MetaDAGNodeSpec(
        node_id="store",
        tasks={"db_write": {}},
        task_executor=store,
    ), depends_on=["analyze"])
)

result = await meta.execute()
print(result.to_dict())

Nodes can also be WaveReconciler instances (set node_type="reconciler"), which run until all their tasks complete before the node is considered done.


Configuration

from wave_executor import WaveConfig

config = WaveConfig(
    max_parallel_per_wave=50,      # global concurrency cap
    default_task_timeout=30.0,     # per-task timeout (seconds)
    total_execution_timeout=600.0, # hard limit for the whole execution
    max_retries=3,                 # exponential-backoff retries
    retry_backoff_factor=2.0,
    adaptive_batch_window=True,    # AIMD concurrency auto-scaling
    min_parallel=2,                # floor for adaptive window
    saturation_threshold=20,       # waiting tasks that trigger shrink
)

Observability

Subscribe to execution events on any level:

executor = DAGExecutor()

@executor.on_event
async def handle(event_type: str, event: dict):
    print(event_type, event["task_id"], event.get("duration_ms"))

# Events: EXECUTION_STARTED, TASK_STARTED, TASK_COMPLETED,
#         TASK_FAILED, TASK_SKIPPED, EXECUTION_COMPLETED

MetaDAG fires additional events: META_EXECUTION_STARTED, NODE_STARTED, NODE_COMPLETED, NODE_SKIPPED, META_EXECUTION_COMPLETED.


Running tests

pip install wave-executor[dev]
pytest

Changelog

v0.3.0 (2026-03-08)

Structured escalation reports

When a task exhausts all retries or hits a timeout, DAGExecutor now emits an EscalationReport instead of a bare error string. The report is stored under task_result.output["escalation"] and contains:

  • failure_history — error message from every attempt
  • root_cause — heuristic: consistent errors across attempts → "code bug or bad input spec"; circuit trip → "downstream service unavailable"
  • resolution_options — suggested next actions
  • circuit_state — circuit breaker state at escalation time
  • escalated_at — ISO timestamp

A TASK_ESCALATED event is also fired to all registered on_event callbacks.

Circuit breaker HALF_OPEN pre-entry guard

_check_circuit_breaker_state() is now called at the top of every retry iteration (attempt > 0), before the task executor and before any backoff sleep. It fast-fails if the circuit is already HALF_OPEN (a probe is in flight — additional retries must not pile on as extra probes) or OPEN with recovery timeout not yet elapsed. This makes the single-probe recovery semantics strict rather than advisory.

EscalationReport is now exported from the top-level package.


v0.2.0

  • Circuit breaker (CLOSED → OPEN → HALF_OPEN → CLOSED lifecycle) on DAGExecutor
  • Checkpoint persistence on task failure
  • AdaptiveSemaphore AIMD concurrency control

v0.1.0

  • Initial release: DAGExecutor, WaveReconciler, MetaDAG
  • Kahn's algorithm cycle detection
  • Dep-output injection, exponential-backoff retry, per-task timeout
  • on_event observability callbacks

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

wave_executor-0.3.0.tar.gz (28.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

wave_executor-0.3.0-py3-none-any.whl (28.5 kB view details)

Uploaded Python 3

File details

Details for the file wave_executor-0.3.0.tar.gz.

File metadata

  • Download URL: wave_executor-0.3.0.tar.gz
  • Upload date:
  • Size: 28.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for wave_executor-0.3.0.tar.gz
Algorithm Hash digest
SHA256 b1772fd023ea7aa8a2ce3ab70e60016509c59dfa3c68666d6bf115d7482e41f4
MD5 75745411e09ec314e51518bd781e80e8
BLAKE2b-256 855c758710e0a24fc22c0ad400ec6d380c149c8e90cc1fd14b31cb4560f63bae

See more details on using hashes here.

File details

Details for the file wave_executor-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: wave_executor-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 28.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for wave_executor-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 a24023c498b3be6d746e7d6b2e792daaa598d7e38f152870106096551f0db3ed
MD5 31bc0f82af195d626edea08e903938a4
BLAKE2b-256 22c767e3e2a306163afa28f7252d9ddbf3e96413533b3487b03f4fab86f51f8e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page