A simple library for running complex DAG of async tasks
Project description
async-dag
A simple library for running complex DAG of async tasks
example
import asyncio
from dataclasses import dataclass
from datetime import datetime
from typing import Awaitable, Callable
from async_dag import build_dag
@dataclass
class Event:
timestamp: datetime
location: str
class DatabaseClient:
async def insert(self, event: Event) -> bool:
# simulate async access to the database
await asyncio.sleep(0.5)
return True
class HttpClient:
async def fetch(self, url: str) -> Event:
# simulate async http request
await asyncio.sleep(0.5)
return Event(timestamp=datetime.now(), location=url)
async def publish_logs(self, results: list[bool]) -> None:
# simulate async http request
await asyncio.sleep(0.5)
@dataclass
class Parameters:
http_client: HttpClient
db_client: DatabaseClient
allowed_locations: str
async def fetch_event(url: str, params: Parameters) -> Event:
# NOTE: we have access to the invoke params, http client for example
return await params.http_client.fetch(url)
async def insert_to_db(event: Event, params: Parameters) -> bool:
if event.location != params.allowed_locations:
return False
return await params.db_client.insert(event)
async def publish_results(result_1: bool, result_2: bool, params: Parameters) -> None:
await params.http_client.publish_logs([result_1, result_2])
# NOTE: we don't have to request receive the Parameters argument, we can also request nodes that are not in the last batch
async def logger(
event_1: Event, result_1: bool, event_2: Event, result_2: bool
) -> None:
print(event_1, result_1, event_2, result_2)
def url_immediate(url: str) -> Callable[[], Awaitable[str]]:
async def _inner() -> str:
return url
return _inner
with build_dag(Parameters) as tm:
moon_url = tm.add_node(url_immediate("moon"))
moon_event = tm.add_node(fetch_event, moon_url)
moon_insert = tm.add_node(insert_to_db, moon_event)
sun_url = tm.add_node(url_immediate("sun"))
sun_event = tm.add_node(fetch_event, sun_url)
sun_insert = tm.add_node(insert_to_db, sun_event)
tm.add_node(publish_results, moon_insert, sun_insert)
tm.add_node(logger, moon_event, moon_insert, sun_event, sun_insert)
async def main():
http_client = HttpClient()
db_client = DatabaseClient()
# prints due to logger
# Event(timestamp=datetime.datetime(2025, 3, 23, 16, 13, 55, 498349), location='moon') True Event(timestamp=datetime.datetime(2025, 3, 23, 16, 13, 55, 498361), location='sun') False
first_result = await tm.invoke(Parameters(http_client, db_client, "moon"))
# Event(timestamp=datetime.datetime(2025, 3, 23, 16, 13, 55, 498349), location='moon')
# NOTE: the result of each node using the ExecutionResult object
print(moon_event.extract_result(first_result))
# True
print(moon_insert.extract_result(first_result))
# prints due to logger
# Event(timestamp=datetime.datetime(2025, 3, 23, 16, 13, 57, 48707), location='moon') False Event(timestamp=datetime.datetime(2025, 3, 23, 16, 13, 57, 48717), location='sun') True
# NOTE: we can use the same TaskGroup many times, there is no need to rebuild the DAG
second_result = await tm.invoke(Parameters(http_client, db_client, "sun"))
# Event(timestamp=datetime.datetime(2025, 3, 23, 16, 13, 57, 48707), location='moon')
print(moon_event.extract_result(second_result)) # prints:
# False
print(moon_insert.extract_result(second_result)) # prints: sun
if __name__ == "__main__":
asyncio.run(main())
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
async_dag-0.1.1.tar.gz
(16.2 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
async_dag-0.1.1-py3-none-any.whl
(10.0 kB
view details)
File details
Details for the file async_dag-0.1.1.tar.gz.
File metadata
- Download URL: async_dag-0.1.1.tar.gz
- Upload date:
- Size: 16.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.6.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b1240c1ef96240659ec5894fc28922605004235071842b56c77f91b19fcee4ed
|
|
| MD5 |
7b36a5ffda519118c5adca0f3754b780
|
|
| BLAKE2b-256 |
770047c569ab53d32934c39f1c3a3d4ff358d1d582c97f990dc934ae2a3e948b
|
File details
Details for the file async_dag-0.1.1-py3-none-any.whl.
File metadata
- Download URL: async_dag-0.1.1-py3-none-any.whl
- Upload date:
- Size: 10.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.6.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2d889fe2de413a9d3f65f9576416a8ae3a11a4c37b5ba094cde1cda11423cf3b
|
|
| MD5 |
d4fa7ef387767e7a74550245e1e1cabb
|
|
| BLAKE2b-256 |
c5a224e3306f360dd0572bae64b675782bc83efb20adee0eb362158296042686
|