Dependency-aware async task scheduling with reconciliation — DAGExecutor, WaveReconciler, MetaDAG
Project description
wave-executor
Dependency-aware async task scheduling for Python, with three execution modes that compose into a four-level hierarchy.
Task (atomic) → DAGExecutor → WaveReconciler → MetaDAG
Zero required dependencies. Pure asyncio. Python 3.9+.
The problem with level-based task execution
Most async task runners group tasks into "waves" by dependency depth:
A (10s) ─┐
├─→ C (1s)
B (1s) ─┘
D (1s) ─→ E (1s)
A level-based executor sees depth-0 = {A, B, D} and depth-1 = {C, E}. E must wait 10 seconds for A, even though E only depends on D.
DAGExecutor starts each task the moment its specific dependencies complete.
E starts at t=1, completes at t=2 — 9 seconds earlier.
Installation
pip install wave-executor
DAGExecutor — one-shot dependency graph
import asyncio
from wave_executor import DAGExecutor, WaveConfig
async def process(task_id: str, spec: dict):
# Your actual work here
return f"result-{task_id}"
async def main():
executor = DAGExecutor(config=WaveConfig(max_parallel_per_wave=20))
result = await executor.execute(
tasks={
"fetch_a": {"url": "..."},
"fetch_b": {"url": "..."},
"analyze": {"model": "fast"},
"report": {},
},
dependencies={
"analyze": {"fetch_a", "fetch_b"}, # waits for both fetches
"report": {"analyze"},
},
task_executor=process,
)
print(f"{result.total_successful}/{result.total_tasks} succeeded")
print(f"Critical path: {result.get_critical_path(dependencies)}")
asyncio.run(main())
Key features:
asyncio.Eventper task — zero-overhead dependency signallingAdaptiveSemaphore— AIMD-controlled concurrency window (self-adjusts under load)- Failed deps automatically skip all downstream tasks
- Optional dep-output injection: completed results are injected into downstream specs under
_dep_outputs - Exponential-backoff retry, per-task timeout, total execution timeout
- Cycle detection (Kahn's algorithm) before execution starts
WaveReconciler — desired-state reconciliation
Inspired by stereOS agentd's reconciliation loop. Instead of running once and returning, the reconciler holds a desired state and continuously ensures tasks match it.
from wave_executor import WaveReconciler, WaveConfig
from wave_executor.reconciler import RestartPolicy
async def run_job(task_id: str, spec: dict):
# Work that might crash and needs restarting
...
reconciler = WaveReconciler(
task_executor=run_job,
config=WaveConfig(max_parallel_per_wave=10),
poll_interval_s=2.0,
)
await reconciler.start()
await reconciler.update(
tasks={
"validator": {"port": 8080},
"aggregator": {"upstream": "validator"},
},
dependencies={"aggregator": {"validator"}},
policies={
"validator": "always", # restart after any exit
"aggregator": "on-failure", # restart only on crash
},
max_restart_attempts={"validator": 100, "aggregator": 5},
restart_backoff_base=3.0, # exponential: 0s, 3s, 6s, 12s...
)
# Hot-swap the spec while running — unchanged tasks are undisturbed
await reconciler.update(tasks={"validator": {"port": 9090}, "aggregator": ...})
# Wait for all non-ALWAYS tasks to reach terminal state
ok = await reconciler.wait_until_complete(timeout=60)
await reconciler.stop()
Key features:
- Three restart policies:
no/on-failure/always - Exponential backoff:
base * 2^(n-1)seconds between retries - Hot-update desired state without stopping everything
max_restart_attemptsceiling — task marked"blocked"when exceeded- Dependency-aware: tasks only start once upstream deps have
"success"status - Dep-output injection: outputs from completed deps injected into downstream specs
MetaDAG — DAG of DAGs
Orchestrate multiple DAGExecutors and WaveReconcilers as nodes in a higher-level dependency graph. Upstream node outputs are injected into downstream nodes' task specs.
from wave_executor import MetaDAG
from wave_executor.meta import MetaDAGNodeSpec
async def fetch(task_id, spec): ...
async def analyze(task_id, spec): ...
async def store(task_id, spec): ...
meta = (
MetaDAG(inject_dep_outputs=True)
.add_node(MetaDAGNodeSpec(
node_id="ingest",
tasks={"page_a": url_a, "page_b": url_b},
task_executor=fetch,
))
.add_node(MetaDAGNodeSpec(
node_id="analyze",
tasks={"report": {}},
task_executor=analyze, # receives _upstream_outputs["ingest"]
), depends_on=["ingest"])
.add_node(MetaDAGNodeSpec(
node_id="store",
tasks={"db_write": {}},
task_executor=store,
), depends_on=["analyze"])
)
result = await meta.execute()
print(result.to_dict())
Nodes can also be WaveReconciler instances (set node_type="reconciler"), which run until all their tasks complete before the node is considered done.
Configuration
from wave_executor import WaveConfig
config = WaveConfig(
max_parallel_per_wave=50, # global concurrency cap
default_task_timeout=30.0, # per-task timeout (seconds)
total_execution_timeout=600.0, # hard limit for the whole execution
max_retries=3, # exponential-backoff retries
retry_backoff_factor=2.0,
adaptive_batch_window=True, # AIMD concurrency auto-scaling
min_parallel=2, # floor for adaptive window
saturation_threshold=20, # waiting tasks that trigger shrink
)
Observability
Subscribe to execution events on any level:
executor = DAGExecutor()
@executor.on_event
async def handle(event_type: str, event: dict):
print(event_type, event["task_id"], event.get("duration_ms"))
# Events: EXECUTION_STARTED, TASK_STARTED, TASK_COMPLETED,
# TASK_FAILED, TASK_SKIPPED, EXECUTION_COMPLETED
MetaDAG fires additional events: META_EXECUTION_STARTED, NODE_STARTED, NODE_COMPLETED, NODE_SKIPPED, META_EXECUTION_COMPLETED.
Running tests
pip install wave-executor[dev]
pytest
Changelog
v0.3.0 (2026-03-08)
Structured escalation reports
When a task exhausts all retries or hits a timeout, DAGExecutor now emits an EscalationReport instead of a bare error string. The report is stored under task_result.output["escalation"] and contains:
failure_history— error message from every attemptroot_cause— heuristic: consistent errors across attempts → "code bug or bad input spec"; circuit trip → "downstream service unavailable"resolution_options— suggested next actionscircuit_state— circuit breaker state at escalation timeescalated_at— ISO timestamp
A TASK_ESCALATED event is also fired to all registered on_event callbacks.
Circuit breaker HALF_OPEN pre-entry guard
_check_circuit_breaker_state() is now called at the top of every retry iteration (attempt > 0), before the task executor and before any backoff sleep. It fast-fails if the circuit is already HALF_OPEN (a probe is in flight — additional retries must not pile on as extra probes) or OPEN with recovery timeout not yet elapsed. This makes the single-probe recovery semantics strict rather than advisory.
EscalationReport is now exported from the top-level package.
v0.2.0
- Circuit breaker (
CLOSED → OPEN → HALF_OPEN → CLOSEDlifecycle) onDAGExecutor - Checkpoint persistence on task failure
AdaptiveSemaphoreAIMD concurrency control
v0.1.0
- Initial release:
DAGExecutor,WaveReconciler,MetaDAG - Kahn's algorithm cycle detection
- Dep-output injection, exponential-backoff retry, per-task timeout
on_eventobservability callbacks
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file wave_executor-0.3.0.tar.gz.
File metadata
- Download URL: wave_executor-0.3.0.tar.gz
- Upload date:
- Size: 28.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b1772fd023ea7aa8a2ce3ab70e60016509c59dfa3c68666d6bf115d7482e41f4
|
|
| MD5 |
75745411e09ec314e51518bd781e80e8
|
|
| BLAKE2b-256 |
855c758710e0a24fc22c0ad400ec6d380c149c8e90cc1fd14b31cb4560f63bae
|
File details
Details for the file wave_executor-0.3.0-py3-none-any.whl.
File metadata
- Download URL: wave_executor-0.3.0-py3-none-any.whl
- Upload date:
- Size: 28.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a24023c498b3be6d746e7d6b2e792daaa598d7e38f152870106096551f0db3ed
|
|
| MD5 |
31bc0f82af195d626edea08e903938a4
|
|
| BLAKE2b-256 |
22c767e3e2a306163afa28f7252d9ddbf3e96413533b3487b03f4fab86f51f8e
|