Dependency-aware async task scheduling with reconciliation — DAGExecutor, WaveReconciler, MetaDAG
Project description
wave-executor
Dependency-aware async task scheduling for Python, with three execution modes that compose into a four-level hierarchy.
Task (atomic) → DAGExecutor → WaveReconciler → MetaDAG
Zero required dependencies. Pure asyncio. Python 3.9+.
The problem with level-based task execution
Most async task runners group tasks into "waves" by dependency depth:
A (10s) ─┐
├─→ C (1s)
B (1s) ─┘
D (1s) ─→ E (1s)
A level-based executor sees depth-0 = {A, B, D} and depth-1 = {C, E}. E must wait 10 seconds for A, even though E only depends on D.
DAGExecutor starts each task the moment its specific dependencies complete.
E starts at t=1, completes at t=2 — 9 seconds earlier.
Installation
pip install wave-executor
DAGExecutor — one-shot dependency graph
import asyncio
from wave_executor import DAGExecutor, WaveConfig
async def process(task_id: str, spec: dict):
# Your actual work here
return f"result-{task_id}"
async def main():
executor = DAGExecutor(config=WaveConfig(max_parallel_per_wave=20))
result = await executor.execute(
tasks={
"fetch_a": {"url": "..."},
"fetch_b": {"url": "..."},
"analyze": {"model": "fast"},
"report": {},
},
dependencies={
"analyze": {"fetch_a", "fetch_b"}, # waits for both fetches
"report": {"analyze"},
},
task_executor=process,
)
print(f"{result.total_successful}/{result.total_tasks} succeeded")
print(f"Critical path: {result.get_critical_path(dependencies)}")
asyncio.run(main())
Key features:
asyncio.Eventper task — zero-overhead dependency signallingAdaptiveSemaphore— AIMD-controlled concurrency window (self-adjusts under load)- Failed deps automatically skip all downstream tasks
- Optional dep-output injection: completed results are injected into downstream specs under
_dep_outputs - Exponential-backoff retry, per-task timeout, total execution timeout
- Cycle detection (Kahn's algorithm) before execution starts
WaveReconciler — desired-state reconciliation
Inspired by stereOS agentd's reconciliation loop. Instead of running once and returning, the reconciler holds a desired state and continuously ensures tasks match it.
from wave_executor import WaveReconciler, WaveConfig
from wave_executor.reconciler import RestartPolicy
async def run_job(task_id: str, spec: dict):
# Work that might crash and needs restarting
...
reconciler = WaveReconciler(
task_executor=run_job,
config=WaveConfig(max_parallel_per_wave=10),
poll_interval_s=2.0,
)
await reconciler.start()
await reconciler.update(
tasks={
"validator": {"port": 8080},
"aggregator": {"upstream": "validator"},
},
dependencies={"aggregator": {"validator"}},
policies={
"validator": "always", # restart after any exit
"aggregator": "on-failure", # restart only on crash
},
max_restart_attempts={"validator": 100, "aggregator": 5},
restart_backoff_base=3.0, # exponential: 0s, 3s, 6s, 12s...
)
# Hot-swap the spec while running — unchanged tasks are undisturbed
await reconciler.update(tasks={"validator": {"port": 9090}, "aggregator": ...})
# Wait for all non-ALWAYS tasks to reach terminal state
ok = await reconciler.wait_until_complete(timeout=60)
await reconciler.stop()
Key features:
- Three restart policies:
no/on-failure/always - Exponential backoff:
base * 2^(n-1)seconds between retries - Hot-update desired state without stopping everything
max_restart_attemptsceiling — task marked"blocked"when exceeded- Dependency-aware: tasks only start once upstream deps have
"success"status - Dep-output injection: outputs from completed deps injected into downstream specs
MetaDAG — DAG of DAGs
Orchestrate multiple DAGExecutors and WaveReconcilers as nodes in a higher-level dependency graph. Upstream node outputs are injected into downstream nodes' task specs.
from wave_executor import MetaDAG
from wave_executor.meta import MetaDAGNodeSpec
async def fetch(task_id, spec): ...
async def analyze(task_id, spec): ...
async def store(task_id, spec): ...
meta = (
MetaDAG(inject_dep_outputs=True)
.add_node(MetaDAGNodeSpec(
node_id="ingest",
tasks={"page_a": url_a, "page_b": url_b},
task_executor=fetch,
))
.add_node(MetaDAGNodeSpec(
node_id="analyze",
tasks={"report": {}},
task_executor=analyze, # receives _upstream_outputs["ingest"]
), depends_on=["ingest"])
.add_node(MetaDAGNodeSpec(
node_id="store",
tasks={"db_write": {}},
task_executor=store,
), depends_on=["analyze"])
)
result = await meta.execute()
print(result.to_dict())
Nodes can also be WaveReconciler instances (set node_type="reconciler"), which run until all their tasks complete before the node is considered done.
Configuration
from wave_executor import WaveConfig
config = WaveConfig(
max_parallel_per_wave=50, # global concurrency cap
default_task_timeout=30.0, # per-task timeout (seconds)
total_execution_timeout=600.0, # hard limit for the whole execution
max_retries=3, # exponential-backoff retries
retry_backoff_factor=2.0,
adaptive_batch_window=True, # AIMD concurrency auto-scaling
min_parallel=2, # floor for adaptive window
saturation_threshold=20, # waiting tasks that trigger shrink
)
Observability
Subscribe to execution events on any level:
executor = DAGExecutor()
@executor.on_event
async def handle(event_type: str, event: dict):
print(event_type, event["task_id"], event.get("duration_ms"))
# Events: EXECUTION_STARTED, TASK_STARTED, TASK_COMPLETED,
# TASK_FAILED, TASK_SKIPPED, EXECUTION_COMPLETED
MetaDAG fires additional events: META_EXECUTION_STARTED, NODE_STARTED, NODE_COMPLETED, NODE_SKIPPED, META_EXECUTION_COMPLETED.
Running tests
pip install wave-executor[dev]
pytest
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file wave_executor-0.2.0.tar.gz.
File metadata
- Download URL: wave_executor-0.2.0.tar.gz
- Upload date:
- Size: 24.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: Hatch/1.16.5 cpython/3.12.3 HTTPX/0.28.1
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5322115af98dae958c0c5df76c1c9ef394ad66a7ab9078b9612d9b998e8a3628
|
|
| MD5 |
737eaa1fe62c06ddb17ea22f7c19c859
|
|
| BLAKE2b-256 |
9f4b64e1463c9a346796c44b7f359cb4d02c1fce310699dd6977c6e86005f3e5
|
File details
Details for the file wave_executor-0.2.0-py3-none-any.whl.
File metadata
- Download URL: wave_executor-0.2.0-py3-none-any.whl
- Upload date:
- Size: 26.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: Hatch/1.16.5 cpython/3.12.3 HTTPX/0.28.1
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3180e96a7bb90b84f34f3a6a3e56994cc5e4d9acef9c44ec3e8482f0029d7a53
|
|
| MD5 |
0532be131b47c758baef560b327d1674
|
|
| BLAKE2b-256 |
2b084662d737f61b46e4d596283a9502f22932d92d751dcecbfd50da266428aa
|