A standard for authoring profiler-compatible DAG tasks
Project description
dagprofiler
A standard framework for authoring profiler-compatible DAG (Directed Acyclic Graph) tasks with automatic compute and communication profiling.
Read the full specification (PDF)
Overview
dagprofiler is a standalone, pip-installable Python package that defines the DAG Task Standard — a contract between application developers, the profiler, and schedulers. It provides:
- A standard for authoring profiler-compatible DAG tasks (the "DAG Task Standard")
- Per-task compute cost profiling (instruction estimates)
- Per-edge communication volume measurement (bytes/bits transferred)
- Reproducible profile storage (JSON format with metadata)
Any application developer who follows the DAG Task Standard can automatically get profiling data that feeds into schedulers like SAGA — for any network type, not just 5G.
Installation
pip install dagprofiler
Or install from source:
git clone https://github.com/ANRGUSC/dagprofiler.git
cd dagprofiler
pip install -e ".[dev]"
Quick Start
Define Tasks
import numpy as np
from dagprofiler import Task, Config, DAG
class CollectData(Task):
inputs = []
outputs = ["data", "labels"]
config_deps = ["N"]
def compute(self, cfg=None):
N = cfg.N
return {
"data": np.random.randn(N, 10),
"labels": np.random.randint(0, 3, N),
}
@staticmethod
def estimate_instructions(cfg):
return cfg.N * 10
class ProcessData(Task):
inputs = ["data"]
outputs = ["features"]
config_deps = ["N"]
def compute(self, data, cfg=None):
return {"features": data ** 2}
@staticmethod
def estimate_instructions(cfg):
return cfg.N * 100
class Classify(Task):
inputs = ["features", "labels"]
outputs = ["predictions"]
def compute(self, features, labels, cfg=None):
return {"predictions": labels} # Placeholder
Build and Run a DAG
dag = DAG(
tasks={
"collect": CollectData(),
"process": ProcessData(),
"classify": Classify(),
},
edges=[
("collect", "process"), # Auto-routes matching outputs->inputs
("collect", "classify"),
("process", "classify"),
],
config=Config(N=10000),
)
profile = dag.run(seed=42)
profile.save("profile.json")
print(f"Total time: {profile.metrics.total_time_ms:.2f} ms")
print(f"Node weights: {profile.metrics.node_weights}")
print(f"Edge weights: {profile.metrics.edge_weights}")
Using the Decorator API
For simpler tasks, use the @task decorator:
from dagprofiler import task
from dagprofiler.decorators import dag
@task(inputs=[], outputs=["data"])
def source(cfg):
return {"data": np.random.randn(cfg.N)}
@task(inputs=["data"], outputs=["result"], estimate_fn=lambda cfg: cfg.N * 10)
def process(data, cfg):
return {"result": data * 2}
my_dag = dag(tasks=[source, process], config=Config(N=100))
profile = my_dag.run()
The DAG Task Standard
Any task following the standard must satisfy three requirements:
1. Declare I/O Schema
class MyTask(Task):
inputs = ["data_in"] # Expected input field names
outputs = ["data_out"] # Produced output field names
config_deps = ["N", "scale"] # Config parameters used in estimation
2. Implement the Compute Interface
def compute(self, data_in, cfg=None):
result = process(data_in)
return {"data_out": result}
3. Optionally Provide a Cost Model
@staticmethod
def estimate_instructions(cfg):
return cfg.N * 100 # O(N) cost model
Profile JSON Format
The output is a scheduler-agnostic JSON file:
{
"metadata": {
"timestamp": "2025-01-30T17:45:00Z",
"dagprofiler_version": "0.1.0",
"seed": 42,
"hostname": "worker-01"
},
"configuration": {
"N": 10000
},
"dag_structure": {
"nodes": ["collect", "process", "classify"],
"edges": [
{"source": "collect", "target": "process"},
{"source": "collect", "target": "classify"},
{"source": "process", "target": "classify"}
],
"execution_order": ["collect", "process", "classify"]
},
"node_weights": {
"collect": 100000,
"process": 1000000,
"classify": 0
},
"edge_weights": {
"collect->process": 6400000,
"collect->classify": 640080,
"process->classify": 6400000
},
"execution_metrics": [
{
"task": "collect",
"compute_time_ms": 5.627,
"serialize_time_ms": 0.129,
"deserialize_time_ms": 0.0,
"bytes_in": 0,
"bytes_out": 600395
}
],
"total_time_ms": 45.2
}
API Reference
Core Classes
| Class | Description |
|---|---|
Task |
Abstract base class for all DAG tasks |
Config |
Dynamic configuration container |
DAG |
DAG executor with automatic profiling |
Profile |
Profile storage, loading, and comparison |
TaskMetrics |
Per-task execution metrics |
EdgeMetrics |
Per-edge data transfer metrics |
Config
cfg = Config(N_UE=10000, N_CELL=7, N_SLICE=5)
cfg.N_UE # Attribute access
cfg["N_CELL"] # Dictionary access
cfg.get("N_SLICE", 5) # With default
cfg.to_dict() # Export as dict
cfg.copy(N_UE=20000) # Create modified copy
Profile
profile.save("output.json") # Save to JSON
loaded = Profile.load("output.json") # Load from JSON
diff = profile.compare(other) # Compare two profiles
Testing
pip install -e ".[dev]"
pytest tests/ -v
Design Principles
- Declarative over Procedural — I/O declarations eliminate manual routing logic
- Generic not Domain-Specific — Works for any DAG, any network, any domain
- Automatic Profiling — Metrics collected without modifying application code
- Reproducible — Profiles include seed, config, and timestamp for exact replay
- Scheduler-Agnostic — Output format consumable by any scheduler
- Extensible — Custom serialization, cost models, and task types supported
- Testable — Comprehensive test suite included
Contributed by Yoonjae Hwang, Bhaskar Krishnamachari (USC), February 12, 2026
License
MIT License. See LICENSE for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dagprofiler-0.1.0.tar.gz.
File metadata
- Download URL: dagprofiler-0.1.0.tar.gz
- Upload date:
- Size: 257.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1f6de4a5dbf29dfa866b350595690e78ae8f718238d5b2d10741046fe918d3ac
|
|
| MD5 |
73c00874c00cb2eb9ad07723a97220e4
|
|
| BLAKE2b-256 |
76465788c419b6b72373582ae2012dba5d197b1b95c9713adb97624d23fbaea5
|
File details
Details for the file dagprofiler-0.1.0-py3-none-any.whl.
File metadata
- Download URL: dagprofiler-0.1.0-py3-none-any.whl
- Upload date:
- Size: 16.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
528a4e82ad20431a3944777e4776d8225cac7e5680b61606fe9b36c583e01374
|
|
| MD5 |
93d3125c71c59ef02e4e52b87c883a55
|
|
| BLAKE2b-256 |
8e507a1c81d509d1e3a26ac299eb821fb32fc6741bd284e11d6e8d6d3740e320
|