Lightweight DAG composition framework
Project description
⚗️ Daglib - Lightweight DAG composition framework
Daglib is a lightweight, embeddable parallel task execution library used for turning pure Python functions into executable task graphs.
Installation
Core
pip install daglib
With visualizations enabled
pip install 'daglib[graphviz]' # static visualizations
# or
pip install 'daglib[ipycytoscape]' # interactive visulizations
Create your first DAG
import daglib
dag = daglib.Dag()
@dag.task()
def task_1a():
return "Hello"
@dag.task()
def task_1b():
return "world!"
@dag.task()
def task_2(task_1a, task_1b):
return f"{task_1a}, {task_1b}"
dag.run()
'Hello, world!'
Beyond the "Hello, world!" example
For a more involved example, we will create a small pipeline that takes data from four source tables and creates a single reporting table. The data is driver-level information from the current 2022 Formula 1 season. The output will be a pivot table for team-level metrics.
Source Tables
- Team - Team of driver
- Points - Current total Driver's World Championship points for each driver for the season
- Wins - Current number of wins for each driver for the season
- Podiums - Current number of times the driver finished in the top 3 for the season
import pandas as pd
import daglib
# Ignore. Used to render the DataFrame correctly in the README
pd.set_option("display.notebook_repr_html", False)
dag = daglib.Dag()
@dag.task()
def team():
return pd.DataFrame(dict(
driver=["Max", "Charles", "Lewis", "Sergio", "Carlos", "George"],
team=["Red Bull", "Ferrari", "Mercedes", "Red Bull", "Ferrari", "Mercedes"],
)).set_index("driver")
@dag.task()
def points():
return pd.DataFrame(dict(
driver=["Max", "Charles", "Lewis", "Sergio", "Carlos", "George"],
points=[258, 178, 146, 173, 156, 158]
)).set_index("driver")
@dag.task()
def wins():
return pd.DataFrame(dict(
driver=["Max", "Charles", "Lewis", "Sergio", "Carlos", "George"],
wins=[8, 3, 0, 1, 1, 0]
)).set_index("driver")
@dag.task()
def podiums():
return pd.DataFrame(dict(
driver=["Max", "Charles", "Lewis", "Sergio", "Carlos", "George"],
podiums=[10, 5, 6, 6, 6, 5]
)).set_index("driver")
@dag.task()
def driver_metrics(team, points, wins, podiums):
return team.join(points).join(wins).join(podiums)
@dag.task()
def team_metrics(driver_metrics):
return driver_metrics.groupby("team").sum().sort_values("points", ascending=False)
dag.run()
points wins podiums
team
Red Bull 431 9 16
Ferrari 334 4 11
Mercedes 304 0 11
Task Graph Visualization
The DAG we created above will create a task graph that looks like the following
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file daglib-0.6.0.tar.gz
.
File metadata
- Download URL: daglib-0.6.0.tar.gz
- Upload date:
- Size: 6.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.1.13 CPython/3.10.4 Darwin/21.3.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | bd0810901b1623f2cb79a1a5838241576f45f99eec9b10a6787a759c76330b20 |
|
MD5 | c2c387381657cbc9d5cd73345549dcf6 |
|
BLAKE2b-256 | fde2f4836d8e68487859ec16e6517d740f13f2cc9c2c8cc647b7769f6ca1cdbc |
File details
Details for the file daglib-0.6.0-py3-none-any.whl
.
File metadata
- Download URL: daglib-0.6.0-py3-none-any.whl
- Upload date:
- Size: 6.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.1.13 CPython/3.10.4 Darwin/21.3.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | c80c670746e8c8cf2d3c738b20e85c01f0be77fc79c025d6af7dc9aea47f5265 |
|
MD5 | 446d173b9232562ef937613c4f01c7c2 |
|
BLAKE2b-256 | d9de823f9c80ffb36a80b3d7e02e6497dd7e1288b558874fb301023cc047bfd5 |