A simple library for running complex DAG of async tasks
Project description
async-dag
A simple library for running complex DAG of async tasks
example
import asyncio
from dataclasses import dataclass
from datetime import datetime
from async_dag import build_dag
@dataclass
class Event:
timestamp: datetime
location: str
class DatabaseClient:
async def insert(self, event: Event) -> bool:
# simulate async access to the database
await asyncio.sleep(0.5)
return True
class HttpClient:
async def fetch(self, url: str) -> Event:
# simulate async http request
await asyncio.sleep(0.5)
return Event(timestamp=datetime.now(), location=url)
async def publish_logs(self, results: list[bool]) -> None:
# simulate async http request
await asyncio.sleep(0.5)
@dataclass
class Parameters:
http_client: HttpClient
db_client: DatabaseClient
allowed_locations: str
async def fetch_event(url: str, params: Parameters) -> Event:
# NOTE: we have access to the invoke params, http client for example
return await params.http_client.fetch(url)
async def insert_to_db(event: Event, params: Parameters) -> bool:
if event.location != params.allowed_locations:
return False
return await params.db_client.insert(event)
async def publish_results(result_1: bool, result_2: bool, params: Parameters) -> None:
await params.http_client.publish_logs([result_1, result_2])
# NOTE: we don't have to request receive the Parameters argument, we can also request nodes that are not in the last batch
async def logger(
event_1: Event, result_1: bool, event_2: Event, result_2: bool
) -> None:
print(event_1, result_1, event_2, result_2)
with build_dag(Parameters) as tm:
moon_url = tm.add_immediate_node("moon")
moon_event = tm.add_node(fetch_event, moon_url)
moon_insert = tm.add_node(insert_to_db, moon_event)
sun_url = tm.add_immediate_node("sun")
sun_event = tm.add_node(fetch_event, sun_url)
sun_insert = tm.add_node(insert_to_db, sun_event)
tm.add_node(publish_results, moon_insert, sun_insert)
tm.add_node(logger, moon_event, moon_insert, sun_event, sun_insert)
async def main():
http_client = HttpClient()
db_client = DatabaseClient()
# prints due to logger
# Event(timestamp=datetime.datetime(2025, 3, 23, 16, 13, 55, 498349), location='moon') True Event(timestamp=datetime.datetime(2025, 3, 23, 16, 13, 55, 498361), location='sun') False
first_result = await tm.invoke(Parameters(http_client, db_client, "moon"))
# Event(timestamp=datetime.datetime(2025, 3, 23, 16, 13, 55, 498349), location='moon')
# NOTE: the result of each node using the ExecutionResult object
print(moon_event.extract_result(first_result))
# True
print(moon_insert.extract_result(first_result))
# prints due to logger
# Event(timestamp=datetime.datetime(2025, 3, 23, 16, 13, 57, 48707), location='moon') False Event(timestamp=datetime.datetime(2025, 3, 23, 16, 13, 57, 48717), location='sun') True
# NOTE: we can use the same TaskGroup many times, there is no need to rebuild the DAG
second_result = await tm.invoke(Parameters(http_client, db_client, "sun"))
# Event(timestamp=datetime.datetime(2025, 3, 23, 16, 13, 57, 48707), location='moon')
print(moon_event.extract_result(second_result)) # prints:
# False
print(moon_insert.extract_result(second_result)) # prints: sun
if __name__ == "__main__":
asyncio.run(main())
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
async_dag-0.1.2.tar.gz
(16.3 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
async_dag-0.1.2-py3-none-any.whl
(10.0 kB
view details)
File details
Details for the file async_dag-0.1.2.tar.gz.
File metadata
- Download URL: async_dag-0.1.2.tar.gz
- Upload date:
- Size: 16.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.6.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2f592230d7601ae23929ee21c5d1a51a203dcd8c460e650a59d73a7d5ab8d71b
|
|
| MD5 |
b2056d860560cb9ad1d2f7f8d709af58
|
|
| BLAKE2b-256 |
ce5d59ee066dadc291ce7405f29ed0cc1737ff5dcae4f5492d3a25edf7271108
|
File details
Details for the file async_dag-0.1.2-py3-none-any.whl.
File metadata
- Download URL: async_dag-0.1.2-py3-none-any.whl
- Upload date:
- Size: 10.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.6.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
93b4af61b352bb93d814609adb03d46fca1c58d5ccc137f36e60a9ed84da5010
|
|
| MD5 |
33caaa78490ebf6446e8838acf3c11c4
|
|
| BLAKE2b-256 |
b46c009417d376e6b45512989c6bd58c5a87974f4b665a1ab103e1e43ecc0780
|