Skip to main content

High-level HTCondor DAG generation library

Project description

ezdag

High-level HTCondor DAG generation library

ci ci documentation pypi version conda version


Resources

Installation

With pip:

pip install ezdag

With conda:

conda install -c conda-forge ezdag

Features

This library provides a high-level API on top of htcondor.dags. Specifically, it adds two features:

  1. Programmatic way of generating command line arguments
  2. Track job dependencies automatically through file inputs and outputs

With (1), this allows you to define job arguments and tracked files on a per-job level for a given layer, and these get automatically formatted into the right submit description when the DAG is generated.

By tracking job dependencies from (2) based on file input/outputs from specific jobs, all the parent/child relationships get determined automatically without specifying them explicitly. This is similar in spirit to Makefiles, where node connections are based purely on what data the job needs and provides.

An example of both these features can be seen in the quickstart below.

Quickstart

The example below creates a simple DAG with two layers; a layer with 3 jobs which all produce output, and a second layer with a single job, combining output from the other jobs:

from ezdag import Argument, DAG, Option, Layer, Node

# create DAG
dag = DAG("my_dag")

# define job options
# this can be a dictionary or an htcondor.Submit object
options = {
    "environment": {
        "OMP_NUM_THREADS": 1,
    },
    "requirements": [
        "HAS_CVMFS_oasis_opensciencegrid_org=TRUE",
    ],
    "request_cpus": 1,
    "request_memory": "2GB"
}

# create processing layer, add nodes
process_layer = Layer("process_bins", submit_description=options)
output_files = []
for i in range(3):
    output_file = f"output_{i}.txt"
    process_layer += Node(
        arguments = [
            Argument("job-index", i),                       # {i}
            "--num-jobs", 1,                                # --num-jobs 1
            Option("bins", [3 * j + i for j in range(3)]),  # --bins {i} --bins {3 + i} --bins {6 + i}
        ],
        inputs = Option("input", "data.txt"),               # --input data.txt
        outputs = Argument("output", output_file)           # output_{i}.txt
    )
    output_files.append(output_file)

# add layer to DAG
dag.attach(process_layer)

# create combine layer, add node
combine_layer = Layer("combine_bins", submit_description=options)
combine_layer += Node(
    arguments = Option("verbose"),                          # --verbose
    inputs = Argument("input", output_files),               # output_0.txt output_1.txt output_2.txt
    outputs = Argument("output", "combined.txt")            # combined.txt
)

# add layer to DAG
dag.attach(combine_layer)

# write DAG to disk
dag.write()

This generates 3 files, a DAG file (my_dag.dag) as well as submit files for each of the layers (2 total):

my_dag.dag:

# BEGIN META
# END META
# BEGIN NODES AND EDGES
JOB process_bins:00000 process_bins.sub
VARS process_bins:00000 nodename="process_bins:00000" log_dir="logs" job_index="0" bins="--bins 0 --bins 3 --bins 6" input_="--input data.txt" input_input_="data.txt" output_="output_0.txt" output_output_="output_0.txt" output_output__remap=""
RETRY process_bins:00000 3
JOB process_bins:00001 process_bins.sub
VARS process_bins:00001 nodename="process_bins:00001" log_dir="logs" job_index="1" bins="--bins 1 --bins 4 --bins 7" input_="--input data.txt" input_input_="data.txt" output_="output_1.txt" output_output_="output_1.txt" output_output__remap=""
RETRY process_bins:00001 3
JOB process_bins:00002 process_bins.sub
VARS process_bins:00002 nodename="process_bins:00002" log_dir="logs" job_index="2" bins="--bins 2 --bins 5 --bins 8" input_="--input data.txt" input_input_="data.txt" output_="output_2.txt" output_output_="output_2.txt" output_output__remap=""
RETRY process_bins:00002 3
PARENT process_bins:00000 CHILD combine_bins:00000
PARENT process_bins:00001 CHILD combine_bins:00000
PARENT process_bins:00002 CHILD combine_bins:00000
JOB combine_bins:00000 combine_bins.sub
VARS combine_bins:00000 nodename="combine_bins:00000" log_dir="logs" verbose="--verbose" input_="output_0.txt output_1.txt output_2.txt" input_input_="output_0.txt,output_1.txt,output_2.txt" output_="combined.txt" output_output_="combined.txt" output_output__remap=""
RETRY combine_bins:00000 3
# END NODES AND EDGES

process_bins.sub:

universe = vanilla
executable = /path/to/process_bins
arguments = $(job_index) --num-jobs 1 $(bins) $(input_) $(output_)
environment = "OMP_NUM_THREADS='1'"
requirements = (HAS_CVMFS_oasis_opensciencegrid_org=TRUE)
request_cpus = 1
request_memory = 2GB
should_transfer_files = YES
when_to_transfer_output = ON_SUCCESS
success_exit_code = 0
preserve_relative_paths = True
transfer_input_files = $(input_input_)
transfer_output_files = $(output_output_)
transfer_output_remaps = "$(output_output__remap)"
output = $(log_dir)/$(nodename)-$(cluster)-$(process).out
error = $(log_dir)/$(nodename)-$(cluster)-$(process).err
notification = never

queue

combine_bins.sub:

universe = vanilla
executable = /path/to/combine_bins
arguments = $(verbose) $(input_) $(output_)
environment = "OMP_NUM_THREADS='1'"
requirements = (HAS_CVMFS_oasis_opensciencegrid_org=TRUE)
request_cpus = 1
request_memory = 2GB
should_transfer_files = YES
when_to_transfer_output = ON_SUCCESS
success_exit_code = 0
preserve_relative_paths = True
transfer_input_files = $(input_input_)
transfer_output_files = $(output_output_)
transfer_output_remaps = "$(output_output__remap)"
output = $(log_dir)/$(nodename)-$(cluster)-$(process).out
error = $(log_dir)/$(nodename)-$(cluster)-$(process).err
notification = never

queue

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ezdag-0.5.0.tar.gz (32.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ezdag-0.5.0-py3-none-any.whl (42.1 kB view details)

Uploaded Python 3

File details

Details for the file ezdag-0.5.0.tar.gz.

File metadata

  • Download URL: ezdag-0.5.0.tar.gz
  • Upload date:
  • Size: 32.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: Hatch/1.16.4 cpython/3.13.12 HTTPX/0.28.1

File hashes

Hashes for ezdag-0.5.0.tar.gz
Algorithm Hash digest
SHA256 8981d5deea4cfaed5e6e458de1a6252b953286c39f44560bbadce2c5011734a9
MD5 9dd0368245715eb61d93c798306748da
BLAKE2b-256 a0b30c12568c7bd17f9bfa5e9c93e2539892bc8384ab613ab41f4d846a1c1059

See more details on using hashes here.

File details

Details for the file ezdag-0.5.0-py3-none-any.whl.

File metadata

  • Download URL: ezdag-0.5.0-py3-none-any.whl
  • Upload date:
  • Size: 42.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: Hatch/1.16.4 cpython/3.13.12 HTTPX/0.28.1

File hashes

Hashes for ezdag-0.5.0-py3-none-any.whl
Algorithm Hash digest
SHA256 a4e32299879c576a75dac5adaecaf24531798b0f5eb4162c49f7bcd6533ea2a6
MD5 695d4568633879a309445a935fe47f26
BLAKE2b-256 7ae9980c2625f9e7dddd44db5da69b5d7c0e713ec13ea47b69da1539aef13e3c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page