Skip to main content

A simple library for running complex DAG of async tasks

Project description

async-dag

A simple library for running complex DAG of async tasks

example

import asyncio
from dataclasses import dataclass
from datetime import datetime
from typing import Awaitable, Callable

from async_dag import build_dag


@dataclass
class Event:
    timestamp: datetime
    location: str


class DatabaseClient:
    async def insert(self, event: Event) -> bool:
        # simulate async access to the database
        await asyncio.sleep(0.5)

        return True


class HttpClient:
    async def fetch(self, url: str) -> Event:
        # simulate async http request
        await asyncio.sleep(0.5)

        return Event(timestamp=datetime.now(), location=url)

    async def publish_logs(self, results: list[bool]) -> None:
        # simulate async http request
        await asyncio.sleep(0.5)


@dataclass
class Parameters:
    http_client: HttpClient
    db_client: DatabaseClient
    allowed_locations: str


async def fetch_event(url: str, params: Parameters) -> Event:
    # NOTE: we have access to the invoke params, http client for example
    return await params.http_client.fetch(url)


async def insert_to_db(event: Event, params: Parameters) -> bool:
    if event.location != params.allowed_locations:
        return False

    return await params.db_client.insert(event)


async def publish_results(result_1: bool, result_2: bool, params: Parameters) -> None:
    await params.http_client.publish_logs([result_1, result_2])


# NOTE: we don't have to request receive the Parameters argument, we can also request nodes that are not in the last batch
async def logger(
    event_1: Event, result_1: bool, event_2: Event, result_2: bool
) -> None:
    print(event_1, result_1, event_2, result_2)


def url_immediate(url: str) -> Callable[[], Awaitable[str]]:
    async def _inner() -> str:
        return url

    return _inner


with build_dag(Parameters) as tm:
    moon_url = tm.add_node(url_immediate("moon"))
    moon_event = tm.add_node(fetch_event, moon_url)
    moon_insert = tm.add_node(insert_to_db, moon_event)

    sun_url = tm.add_node(url_immediate("sun"))
    sun_event = tm.add_node(fetch_event, sun_url)
    sun_insert = tm.add_node(insert_to_db, sun_event)

    tm.add_node(publish_results, moon_insert, sun_insert)

    tm.add_node(logger, moon_event, moon_insert, sun_event, sun_insert)


async def main():
    http_client = HttpClient()
    db_client = DatabaseClient()

    # prints due to logger
    # Event(timestamp=datetime.datetime(2025, 3, 23, 16, 13, 55, 498349), location='moon') True Event(timestamp=datetime.datetime(2025, 3, 23, 16, 13, 55, 498361), location='sun') False
    first_result = await tm.invoke(Parameters(http_client, db_client, "moon"))

    # Event(timestamp=datetime.datetime(2025, 3, 23, 16, 13, 55, 498349), location='moon')
    # NOTE: the result of each node using the ExecutionResult object
    print(moon_event.extract_result(first_result))
    # True
    print(moon_insert.extract_result(first_result))

    # prints due to logger
    # Event(timestamp=datetime.datetime(2025, 3, 23, 16, 13, 57, 48707), location='moon') False Event(timestamp=datetime.datetime(2025, 3, 23, 16, 13, 57, 48717), location='sun') True
    # NOTE: we can use the same TaskGroup many times, there is no need to rebuild the DAG
    second_result = await tm.invoke(Parameters(http_client, db_client, "sun"))

    # Event(timestamp=datetime.datetime(2025, 3, 23, 16, 13, 57, 48707), location='moon')
    print(moon_event.extract_result(second_result))  # prints:
    # False
    print(moon_insert.extract_result(second_result))  # prints: sun


if __name__ == "__main__":
    asyncio.run(main())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

async_dag-0.1.1.tar.gz (16.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

async_dag-0.1.1-py3-none-any.whl (10.0 kB view details)

Uploaded Python 3

File details

Details for the file async_dag-0.1.1.tar.gz.

File metadata

  • Download URL: async_dag-0.1.1.tar.gz
  • Upload date:
  • Size: 16.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.6.9

File hashes

Hashes for async_dag-0.1.1.tar.gz
Algorithm Hash digest
SHA256 b1240c1ef96240659ec5894fc28922605004235071842b56c77f91b19fcee4ed
MD5 7b36a5ffda519118c5adca0f3754b780
BLAKE2b-256 770047c569ab53d32934c39f1c3a3d4ff358d1d582c97f990dc934ae2a3e948b

See more details on using hashes here.

File details

Details for the file async_dag-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: async_dag-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 10.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.6.9

File hashes

Hashes for async_dag-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 2d889fe2de413a9d3f65f9576416a8ae3a11a4c37b5ba094cde1cda11423cf3b
MD5 d4fa7ef387767e7a74550245e1e1cabb
BLAKE2b-256 c5a224e3306f360dd0572bae64b675782bc83efb20adee0eb362158296042686

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page