Skip to main content

A simple library for running complex DAG of async tasks

Project description

async-dag

A simple library for running complex DAG of async tasks.

Use case example

Lets assume that you have the following task dependencies graph:

graph TD;
    FastTask_A-->SlowTask_B;
    SlowTask_B-->EndTask;

    SlowTask_A-->FastTask_C;
    FastTask_B-->FastTask_C;
    FastTask_C-->EndTask;

The optimal way to run this flow would be:

  1. Run FastTask_A, FastTask_B, and SlowTask_A all at once,
  2. as soon as FastTask_A ends, start executing SlowTask_B
  3. as soon as SlowTask_A and FastTask_B ends, start executing FastTask_C
  4. as soon as SlowTask_B and FastTask_C ends, start executing EndTask

Creating this flow in code isn't trivial and require managing tasks manually, and from my experience most people miss the performance benefits of starting to execute SlowTask_B as soon as possible (because it's just easy to gather(FastTask_A, SlowTask_A, FastTask_B)).

This library provides a simple interface for creating the optimal execution path for async tasks that build a DAG.

Code example

import asyncio

from async_dag import build_dag


async def inc_task(n: int, name: str, delay: float) -> int:
    print(f"{name} task started...")
    await asyncio.sleep(delay)
    print(f"{name} task done!")

    return n + 1


async def add_task(a: int, b: int, name: str, delay: float) -> int:
    print(f"{name} task started...")
    await asyncio.sleep(delay)
    print(f"{name} task done!")

    return a + b


# Define the DAG
with build_dag(int) as tm:
    # tm.parameters_node is a spacial node that will get resolved into the invoke parameters (the value passed to `tm.invoke`)
    # tm.add_immediate_node(...) defines a graph node that resolve immediately and returns its value, this is useful for passing constants to tasks
    fast_task_a = tm.add_node(
        inc_task,
        tm.parameters_node,
        tm.add_immediate_node("fast_task_a"),
        tm.add_immediate_node(0.1),
    )

    # here we pass the result from fast_task_a as the n param to inc_task node
    slow_task_b = tm.add_node(
        inc_task,
        fast_task_a,
        tm.add_immediate_node("slow_task_b"),
        tm.add_immediate_node(1),
    )

    slow_task_a = tm.add_node(
        inc_task,
        tm.parameters_node,
        tm.add_immediate_node("slow_task_a"),
        tm.add_immediate_node(0.5),
    )
    fast_task_b = tm.add_node(
        inc_task,
        tm.parameters_node,
        tm.add_immediate_node("fast_task_b"),
        tm.add_immediate_node(0.1),
    )
    fast_task_c = tm.add_node(
        add_task,
        slow_task_a,
        fast_task_b,
        tm.add_immediate_node("fast_task_c"),
        tm.add_immediate_node(0.1),
    )

    end_task = tm.add_node(
        add_task,
        fast_task_c,
        slow_task_b,
        tm.add_immediate_node("end_task"),
        tm.add_immediate_node(0.1),
    )


# Invoke the DAG
async def main():
    # prints:
    # fast_task_a task started...
    # slow_task_a task started...
    # fast_task_b task started...
    # fast_task_a task done!
    # fast_task_b task done!
    # slow_task_b task started...
    # slow_task_a task done!
    # fast_task_c task started...
    # fast_task_c task done!
    # slow_task_b task done!
    # end_task task started...
    # end_task task done!
    execution_result = await tm.invoke(0)

    # we can extract each node return value
    print(fast_task_a.extract_result(execution_result))  # 1
    print(end_task.extract_result(execution_result))  # 4


if __name__ == "__main__":
    asyncio.run(main())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

async_dag-0.2.0.tar.gz (13.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

async_dag-0.2.0-py3-none-any.whl (7.0 kB view details)

Uploaded Python 3

File details

Details for the file async_dag-0.2.0.tar.gz.

File metadata

  • Download URL: async_dag-0.2.0.tar.gz
  • Upload date:
  • Size: 13.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.6.9

File hashes

Hashes for async_dag-0.2.0.tar.gz
Algorithm Hash digest
SHA256 8b9bcc1c6e62b210256d9482c60b15509293fc03db2d30271824c2445934cca3
MD5 0ebd35f4e5b66b057fed5856b9f29162
BLAKE2b-256 e78766a1f3355ec3d39c508026079021b659c19a741f591cdba55172885943e0

See more details on using hashes here.

File details

Details for the file async_dag-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: async_dag-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 7.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.6.9

File hashes

Hashes for async_dag-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 ac7acf5c84d0404cba1da393b38cfadaeb6e5559cbf74c54b4891c5e5d5a7ac3
MD5 935aead38d488ee6ccd7dea6e4a8e127
BLAKE2b-256 03ffc83738fe047d2c96e081ef517984963a425f0d619f75f0f6d9f2c658c131

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page