Skip to main content

Async Support for Dagster

Project description

dagster-async-executor

An executor for Dagster that adds native asyncio support for ops and assets.

dagster-async-executor lets you:

  • Run async def ops without manually managing event loops or thread pools.
  • Mix sync and async ops in the same job.
  • Use dynamic / fan-out graphs with async upstream and downstream dependencies.
  • Keep the familiar Dagster executor interface, while enabling more scalable, concurrency‑friendly workloads (e.g. I/O‑heavy tasks, service calls, and streaming patterns).

This integration is a community‑maintained port of the original core PR: dagster-io/dagster#32833.


Installation

pip install dagster-async-executor

Quickstart

Use the async_executor when defining your job and write your ops as async def:

import anyio
import dagster as dg
from dagster_async_executor import async_executor

NUM_FANOUTS = 300
SLEEP_SECONDS = 3


@dg.op(out=dg.DynamicOut())
async def create_dynamic_outputs():
    """Creates a dynamic number of outputs."""
    for i in range(NUM_FANOUTS):
        yield dg.DynamicOutput(value=f"item_{i}", mapping_key=f"key_{i}")


@dg.op
async def process_item(context: dg.OpExecutionContext, item: str):
    """Process each item from the fan-out."""
    context.log.info(f"[{context.op_handle}] sleeping...")
    await anyio.sleep(SLEEP_SECONDS)
    context.log.info(f"[{context.op_handle}] completed")
    return item


@dg.op
async def collect_results(context: dg.OpExecutionContext, results: list):
    """Collect all results from the fan-out."""
    context.log.info(f"[{context.op_handle}] collected {len(results)} results")
    return results


@dg.job(executor_def=async_executor)
def simple_fanout_job():
    # no need to use await
    dynamic_items = create_dynamic_outputs()
    processed = dynamic_items.map(process_item)
    collect_results(processed.collect())

Run the job as usual (e.g. via Dagit, dagster job execute, or your orchestration environment). From the outside, this executor behaves like a standard Dagster executor – but internally it uses async orchestration.


How it works

At a high level, dagster-async-executor introduces an AsyncExecutor that:

  • Reuses Dagster’s existing execution plan machinery.
  • Runs steps in an async orchestration loop backed by an anyio.TaskGroup.
  • Bridges between async step execution and Dagster’s synchronous executor interface via a queue‑based sync–async bridge.

Execution model

Conceptually, execution looks like this:

  1. Plan creation (sync)
    The run, plan, and context are created synchronously, just like with in_process and other executors.

  2. Async orchestration loop
    Once the plan is ready, an async orchestrator:

    • Schedules each ready step as an async task.
    • Uses dagster_event_sequence_for_step to obtain an async sequence of DagsterEvents for each step.
    • Sends those events through async streams.
  3. Sync–async event bridge
    A synchronous wrapper:

    • Starts the async orchestrator inside an anyio BlockingPortal.
    • Streams DagsterEvents into a queue.Queue.
    • Exposes a standard Iterator[DagsterEvent] to Dagster’s core execution machinery.

From the rest of the system’s perspective, this executor still “looks like” a normal synchronous executor, which keeps:

  • Resource initialization behavior consistent.
  • Logging and event semantics unchanged.
  • Compatibility with existing Dagster entrypoints and tooling.

Sync + async ops

Per-step behavior:

  • Async orchestration drives all steps.
  • Each step:
    • Builds a step_context.
    • Iterates over dagster_event_sequence_for_step(step_context) in an async for loop.
    • Sends each DagsterEvent back through the async stream → queue → iterator bridge.

Because the core execution semantics are reused, you can mix sync and async ops in the same graph:

  • Async upstream → sync downstream
  • Sync upstream → async downstream
  • Dynamic outputs and mapped steps that interleave async work

Performance

The executor is designed for I/O‑bound and highly concurrent workloads. An initial performance test (test_async_executor_performance.py::test_async_performance_basic) shows improved scaling with increased parallelism.

Example results (fan‑out of async ops sleeping for 3 seconds):

Number of ops Sleep (seconds) Job duration (seconds)
1 3 3.25
5 3 3.51
20 3 4.23
100 3 6.86
300 3 14.17

These numbers are illustrative; real‑world performance depends on your environment, I/O characteristics, and concurrency limits.


Testing & behavior guarantees

The test suite focuses on validating behavior across a representative set of job shapes:

  • Basic async jobs

    • Single async op producing a simple output.
    • Multiple async ops with dependencies and parallelism where possible.
  • Mixed sync/async graphs

    • Async upstream feeding into sync downstream.
    • Sync upstream feeding into async downstream.
    • Ensuring consistent materializations, events, and success/failure semantics across both kinds of ops.
  • Dynamic / fan‑out graphs

    • Async producers yielding dynamic outputs.
    • Downstream mapping over dynamic keys.
    • Interleaving async mapped steps and verifying all mapped outputs are awaited and collected correctly.
  • Error handling

    • Exceptions raised from async ops (including inside dynamic maps).
    • Failures reported on the correct steps.
    • Downstream steps cancelled or skipped according to normal Dagster rules.

Limitations & notes

  • This executor has not been tested on Python 3.14 free-threaded mode.
  • The executor targets I/O‑bound concurrency; CPU‑bound workloads should still be offloaded to processes or threads.
  • Cancellation, backpressure, and resource lifetime semantics follow Dagster’s existing execution model, but async nuances may still evolve.
  • This is a community‑maintained integration; behavior may change more rapidly than core Dagster executors as we iterate.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dagster_async_executor-0.0.1.tar.gz (7.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dagster_async_executor-0.0.1-py3-none-any.whl (8.4 kB view details)

Uploaded Python 3

File details

Details for the file dagster_async_executor-0.0.1.tar.gz.

File metadata

  • Download URL: dagster_async_executor-0.0.1.tar.gz
  • Upload date:
  • Size: 7.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.16 {"installer":{"name":"uv","version":"0.9.16","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for dagster_async_executor-0.0.1.tar.gz
Algorithm Hash digest
SHA256 f702e2719b5dff1a1aa1d03a1548cafd0ccb32623b6c278e0fecbdc44a49de70
MD5 a1ea0c61f5430a1b998a3b6e3205da79
BLAKE2b-256 5d4ddf2534165ae46b2acfa2c8bc939235b68f6dd7080e5b8091e32705052bf2

See more details on using hashes here.

File details

Details for the file dagster_async_executor-0.0.1-py3-none-any.whl.

File metadata

  • Download URL: dagster_async_executor-0.0.1-py3-none-any.whl
  • Upload date:
  • Size: 8.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.16 {"installer":{"name":"uv","version":"0.9.16","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for dagster_async_executor-0.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 302edbab445f04da47d9da2d9849ec4e9240be29c3959b0bb709ad6d9b434d23
MD5 779df3e50c1056ef19f7fff341aafb92
BLAKE2b-256 2262412b4c305eccac6ce39f5b4afe433c98496b16b03b0211ad1a8173b9b9b6

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page