Skip to main content

Dependency-aware async task scheduling with reconciliation — DAGExecutor, WaveReconciler, MetaDAG

Project description

wave-executor

Dependency-aware async task scheduling for Python, with three execution modes that compose into a four-level hierarchy.

Task (atomic) → DAGExecutor → WaveReconciler → MetaDAG

Zero required dependencies. Pure asyncio. Python 3.9+.


The problem with level-based task execution

Most async task runners group tasks into "waves" by dependency depth:

A (10s) ─┐
          ├─→ C (1s)
B  (1s) ─┘
D  (1s) ─→ E (1s)

A level-based executor sees depth-0 = {A, B, D} and depth-1 = {C, E}. E must wait 10 seconds for A, even though E only depends on D.

DAGExecutor starts each task the moment its specific dependencies complete. E starts at t=1, completes at t=2 — 9 seconds earlier.


Installation

pip install wave-executor

DAGExecutor — one-shot dependency graph

import asyncio
from wave_executor import DAGExecutor, WaveConfig

async def process(task_id: str, spec: dict):
    # Your actual work here
    return f"result-{task_id}"

async def main():
    executor = DAGExecutor(config=WaveConfig(max_parallel_per_wave=20))
    result = await executor.execute(
        tasks={
            "fetch_a": {"url": "..."},
            "fetch_b": {"url": "..."},
            "analyze":  {"model": "fast"},
            "report":   {},
        },
        dependencies={
            "analyze": {"fetch_a", "fetch_b"},  # waits for both fetches
            "report":  {"analyze"},
        },
        task_executor=process,
    )

    print(f"{result.total_successful}/{result.total_tasks} succeeded")
    print(f"Critical path: {result.get_critical_path(dependencies)}")

asyncio.run(main())

Key features:

  • asyncio.Event per task — zero-overhead dependency signalling
  • AdaptiveSemaphore — AIMD-controlled concurrency window (self-adjusts under load)
  • Failed deps automatically skip all downstream tasks
  • Optional dep-output injection: completed results are injected into downstream specs under _dep_outputs
  • Exponential-backoff retry, per-task timeout, total execution timeout
  • Cycle detection (Kahn's algorithm) before execution starts

WaveReconciler — desired-state reconciliation

Inspired by stereOS agentd's reconciliation loop. Instead of running once and returning, the reconciler holds a desired state and continuously ensures tasks match it.

from wave_executor import WaveReconciler, WaveConfig
from wave_executor.reconciler import RestartPolicy

async def run_job(task_id: str, spec: dict):
    # Work that might crash and needs restarting
    ...

reconciler = WaveReconciler(
    task_executor=run_job,
    config=WaveConfig(max_parallel_per_wave=10),
    poll_interval_s=2.0,
)

await reconciler.start()

await reconciler.update(
    tasks={
        "validator":  {"port": 8080},
        "aggregator": {"upstream": "validator"},
    },
    dependencies={"aggregator": {"validator"}},
    policies={
        "validator":  "always",      # restart after any exit
        "aggregator": "on-failure",  # restart only on crash
    },
    max_restart_attempts={"validator": 100, "aggregator": 5},
    restart_backoff_base=3.0,   # exponential: 0s, 3s, 6s, 12s...
)

# Hot-swap the spec while running — unchanged tasks are undisturbed
await reconciler.update(tasks={"validator": {"port": 9090}, "aggregator": ...})

# Wait for all non-ALWAYS tasks to reach terminal state
ok = await reconciler.wait_until_complete(timeout=60)
await reconciler.stop()

Key features:

  • Three restart policies: no / on-failure / always
  • Exponential backoff: base * 2^(n-1) seconds between retries
  • Hot-update desired state without stopping everything
  • max_restart_attempts ceiling — task marked "blocked" when exceeded
  • Dependency-aware: tasks only start once upstream deps have "success" status
  • Dep-output injection: outputs from completed deps injected into downstream specs

MetaDAG — DAG of DAGs

Orchestrate multiple DAGExecutors and WaveReconcilers as nodes in a higher-level dependency graph. Upstream node outputs are injected into downstream nodes' task specs.

from wave_executor import MetaDAG
from wave_executor.meta import MetaDAGNodeSpec

async def fetch(task_id, spec): ...
async def analyze(task_id, spec): ...
async def store(task_id, spec): ...

meta = (
    MetaDAG(inject_dep_outputs=True)
    .add_node(MetaDAGNodeSpec(
        node_id="ingest",
        tasks={"page_a": url_a, "page_b": url_b},
        task_executor=fetch,
    ))
    .add_node(MetaDAGNodeSpec(
        node_id="analyze",
        tasks={"report": {}},
        task_executor=analyze,   # receives _upstream_outputs["ingest"]
    ), depends_on=["ingest"])
    .add_node(MetaDAGNodeSpec(
        node_id="store",
        tasks={"db_write": {}},
        task_executor=store,
    ), depends_on=["analyze"])
)

result = await meta.execute()
print(result.to_dict())

Nodes can also be WaveReconciler instances (set node_type="reconciler"), which run until all their tasks complete before the node is considered done.


Configuration

from wave_executor import WaveConfig

config = WaveConfig(
    max_parallel_per_wave=50,      # global concurrency cap
    default_task_timeout=30.0,     # per-task timeout (seconds)
    total_execution_timeout=600.0, # hard limit for the whole execution
    max_retries=3,                 # exponential-backoff retries
    retry_backoff_factor=2.0,
    adaptive_batch_window=True,    # AIMD concurrency auto-scaling
    min_parallel=2,                # floor for adaptive window
    saturation_threshold=20,       # waiting tasks that trigger shrink
)

Observability

Subscribe to execution events on any level:

executor = DAGExecutor()

@executor.on_event
async def handle(event_type: str, event: dict):
    print(event_type, event["task_id"], event.get("duration_ms"))

# Events: EXECUTION_STARTED, TASK_STARTED, TASK_COMPLETED,
#         TASK_FAILED, TASK_SKIPPED, EXECUTION_COMPLETED

MetaDAG fires additional events: META_EXECUTION_STARTED, NODE_STARTED, NODE_COMPLETED, NODE_SKIPPED, META_EXECUTION_COMPLETED.


Running tests

pip install wave-executor[dev]
pytest

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

wave_executor-0.2.0.tar.gz (24.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

wave_executor-0.2.0-py3-none-any.whl (26.2 kB view details)

Uploaded Python 3

File details

Details for the file wave_executor-0.2.0.tar.gz.

File metadata

  • Download URL: wave_executor-0.2.0.tar.gz
  • Upload date:
  • Size: 24.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: Hatch/1.16.5 cpython/3.12.3 HTTPX/0.28.1

File hashes

Hashes for wave_executor-0.2.0.tar.gz
Algorithm Hash digest
SHA256 5322115af98dae958c0c5df76c1c9ef394ad66a7ab9078b9612d9b998e8a3628
MD5 737eaa1fe62c06ddb17ea22f7c19c859
BLAKE2b-256 9f4b64e1463c9a346796c44b7f359cb4d02c1fce310699dd6977c6e86005f3e5

See more details on using hashes here.

File details

Details for the file wave_executor-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: wave_executor-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 26.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: Hatch/1.16.5 cpython/3.12.3 HTTPX/0.28.1

File hashes

Hashes for wave_executor-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 3180e96a7bb90b84f34f3a6a3e56994cc5e4d9acef9c44ec3e8482f0029d7a53
MD5 0532be131b47c758baef560b327d1674
BLAKE2b-256 2b084662d737f61b46e4d596283a9502f22932d92d751dcecbfd50da266428aa

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page