Skip to main content

A simple library for running complex DAG of async tasks

Project description

async-dag

A simple library for running complex DAG of async tasks.

Use case example

Lets assume that you have the following task dependencies graph:

graph TD;
    FastTask_A-->SlowTask_B;
    SlowTask_B-->EndTask;

    SlowTask_A-->FastTask_C;
    FastTask_B-->FastTask_C;
    FastTask_C-->EndTask;

The optimal way to run this flow would be:

  1. Run FastTask_A, FastTask_B, and SlowTask_A all at once,
  2. as soon as FastTask_A ends, start executing SlowTask_B
  3. as soon as SlowTask_A and FastTask_B ends, start executing FastTask_C
  4. as soon as SlowTask_B and FastTask_C ends, start executing EndTask

Creating this flow in code isn't trivial and require managing tasks manually, and from my experience most people miss the performance benefits of starting to execute SlowTask_B as soon as possible (because it's just easy to gather(FastTask_A, SlowTask_A, FastTask_B)).

This library provides a simple interface for creating the optimal execution path for async tasks that build a DAG.

Code example

import asyncio

from async_dag import build_dag


async def inc_task(name: str, delay: float, n: int) -> int:
    print(f"{name} task started...")
    await asyncio.sleep(delay)
    print(f"{name} task done!")

    return n + 1


async def add_task(name: str, delay: float, a: int, b: int) -> int:
    print(f"{name} task started...")
    await asyncio.sleep(delay)
    print(f"{name} task done!")

    return a + b


# Define the DAG
with build_dag(int) as tm:
    # tm.add_immediate_node(...) defines a graph node that resolve immediately and returns its value, this is useful for passing constants to tasks
    # if the last parameter is emitted and matches the type passed into `build_dag` the invoke parameter(the value passed to `tm.invoke`) will be passed automatically
    fast_task_a = tm.add_node(
        inc_task, tm.add_immediate_node("fast_task_a"), tm.add_immediate_node(0.1)
    )

    # here we pass the result from fast_task_a as the n param to inc_task node
    slow_task_b = tm.add_node(
        inc_task,
        tm.add_immediate_node("slow_task_b"),
        tm.add_immediate_node(1),
        fast_task_a,
    )

    slow_task_a = tm.add_node(
        inc_task, tm.add_immediate_node("slow_task_a"), tm.add_immediate_node(0.5)
    )
    fast_task_b = tm.add_node(
        inc_task, tm.add_immediate_node("fast_task_b"), tm.add_immediate_node(0.1)
    )
    fast_task_c = tm.add_node(
        add_task,
        tm.add_immediate_node("fast_task_c"),
        tm.add_immediate_node(0.1),
        slow_task_a,
        fast_task_b,
    )

    end_task = tm.add_node(
        add_task,
        tm.add_immediate_node("end_task"),
        tm.add_immediate_node(0.1),
        fast_task_c,
        slow_task_b,
    )


# Invoke the DAG
async def main():
    # prints:
    # fast_task_a task started...
    # slow_task_a task started...
    # fast_task_b task started...
    # fast_task_a task done!
    # fast_task_b task done!
    # slow_task_b task started...
    # slow_task_a task done!
    # fast_task_c task started...
    # fast_task_c task done!
    # slow_task_b task done!
    # end_task task started...
    # end_task task done!
    execution_result = await tm.invoke(0)

    # we can extract each node return value
    print(fast_task_a.extract_result(execution_result))  # 1
    print(end_task.extract_result(execution_result))  # 4


if __name__ == "__main__":
    asyncio.run(main())

Another code example

import asyncio
from dataclasses import dataclass
from datetime import datetime

from async_dag import build_dag


@dataclass
class Event:
    timestamp: datetime
    location: str


class DatabaseClient:
    async def insert(self, event: Event) -> bool:
        # simulate async access to the database
        await asyncio.sleep(0.5)

        return True


class HttpClient:
    async def fetch(self, url: str) -> Event:
        # simulate async http request
        await asyncio.sleep(0.5)

        return Event(timestamp=datetime.now(), location=url)

    async def publish_logs(self, results: list[bool]) -> None:
        # simulate async http request
        await asyncio.sleep(0.5)


@dataclass
class Parameters:
    http_client: HttpClient
    db_client: DatabaseClient
    allowed_locations: str


async def fetch_event(url: str, params: Parameters) -> Event:
    # NOTE: we have access to the invoke params, http client for example
    return await params.http_client.fetch(url)


async def insert_to_db(event: Event, params: Parameters) -> bool:
    if event.location != params.allowed_locations:
        return False

    return await params.db_client.insert(event)


async def publish_results(result_1: bool, result_2: bool, params: Parameters) -> None:
    await params.http_client.publish_logs([result_1, result_2])


# NOTE: we don't have to request receive the Parameters argument, we can also request nodes that are not in the last batch
async def logger(
    event_1: Event, result_1: bool, event_2: Event, result_2: bool
) -> None:
    print(event_1, result_1, event_2, result_2)


with build_dag(Parameters) as tm:
    moon_url = tm.add_immediate_node("moon")
    moon_event = tm.add_node(fetch_event, moon_url)
    moon_insert = tm.add_node(insert_to_db, moon_event)

    sun_url = tm.add_immediate_node("sun")
    sun_event = tm.add_node(fetch_event, sun_url)
    sun_insert = tm.add_node(insert_to_db, sun_event)

    tm.add_node(publish_results, moon_insert, sun_insert)

    tm.add_node(logger, moon_event, moon_insert, sun_event, sun_insert)


async def main():
    http_client = HttpClient()
    db_client = DatabaseClient()

    # prints due to logger
    # Event(timestamp=datetime.datetime(2025, 3, 23, 16, 13, 55, 498349), location='moon') True Event(timestamp=datetime.datetime(2025, 3, 23, 16, 13, 55, 498361), location='sun') False
    first_result = await tm.invoke(Parameters(http_client, db_client, "moon"))

    # Event(timestamp=datetime.datetime(2025, 3, 23, 16, 13, 55, 498349), location='moon')
    # NOTE: the result of each node using the ExecutionResult object
    print(moon_event.extract_result(first_result))
    # True
    print(moon_insert.extract_result(first_result))

    # prints due to logger
    # Event(timestamp=datetime.datetime(2025, 3, 23, 16, 13, 57, 48707), location='moon') False Event(timestamp=datetime.datetime(2025, 3, 23, 16, 13, 57, 48717), location='sun') True
    # NOTE: we can use the same TaskGroup many times, there is no need to rebuild the DAG
    second_result = await tm.invoke(Parameters(http_client, db_client, "sun"))

    # Event(timestamp=datetime.datetime(2025, 3, 23, 16, 13, 57, 48707), location='moon')
    print(moon_event.extract_result(second_result))
    # False
    print(moon_insert.extract_result(second_result))


if __name__ == "__main__":
    asyncio.run(main())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

async_dag-0.1.4.tar.gz (14.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

async_dag-0.1.4-py3-none-any.whl (7.9 kB view details)

Uploaded Python 3

File details

Details for the file async_dag-0.1.4.tar.gz.

File metadata

  • Download URL: async_dag-0.1.4.tar.gz
  • Upload date:
  • Size: 14.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.6.9

File hashes

Hashes for async_dag-0.1.4.tar.gz
Algorithm Hash digest
SHA256 ace9c13e8f73c57fd69e7100f08db6bcc07e1454cad5196c0246775197dd5441
MD5 30e98747bef3fc937236177574cdc263
BLAKE2b-256 bd87242bf2e5dbd0508c14d8abff8fcc55d7b57ec94d4d0ab1f653cef445582e

See more details on using hashes here.

File details

Details for the file async_dag-0.1.4-py3-none-any.whl.

File metadata

  • Download URL: async_dag-0.1.4-py3-none-any.whl
  • Upload date:
  • Size: 7.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.6.9

File hashes

Hashes for async_dag-0.1.4-py3-none-any.whl
Algorithm Hash digest
SHA256 66a30b8c51b97f281bb78eebfd5bfb6b4895cad8104ab2abcf5e5a2d86f927e5
MD5 68790f0dcdd56fba633da65a7e3158b1
BLAKE2b-256 8668998cb79bb0adfb166dafa92a1a88905ff6a9e438a2ff900d86e13c8e3f4f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page