Skip to main content

High-level HTCondor DAG generation library

Project description

ezdag

High-level HTCondor DAG generation library

ci ci documentation pypi version conda version


Resources

Installation

With pip:

pip install ezdag

With conda:

conda install -c conda-forge ezdag

Features

This library provides a high-level API on top of htcondor.dags. Specifically, it adds two features:

  1. Programmatic way of generating command line arguments
  2. Track job dependencies automatically through file inputs and outputs

With (1), this allows you to define job arguments and tracked files on a per-job level for a given layer, and these get automatically formatted into the right submit description when the DAG is generated.

By tracking job dependencies from (2) based on file input/outputs from specific jobs, all the parent/child relationships get determined automatically without specifying them explicitly. This is similar in spirit to Makefiles, where node connections are based purely on what data the job needs and provides.

An example of both these features can be seen in the quickstart below.

Quickstart

The example below creates a simple DAG with two layers; a layer with 3 jobs which all produce output, and a second layer with a single job, combining output from the other jobs:

from ezdag import Argument, DAG, Option, Layer, Node

# create DAG
dag = DAG("my_dag")

# define job options
# this can be a dictionary or an htcondor.Submit object
options = {
    "environment": {
        "OMP_NUM_THREADS": 1,
    },
    "requirements": [
        "HAS_CVMFS_oasis_opensciencegrid_org=TRUE",
    ],
    "request_cpus": 1,
    "request_memory": 2000
}

# create processing layer, add nodes
process_layer = Layer("process_bins", submit_description=options)
output_files = []
for i in range(3):
    output_file = f"output_{i}.txt"
    process_layer += Node(
        arguments = [
            Argument("job-index", i),                       # {i}
            Option("verbose"),                              # --verbose
            Option("bins", [3 * j + i for j in range(3)]),  # --bins {i} --bins {3 + i} --bins {6 + i}
        ],
        inputs = Option("input", "data.txt"),               # --input data.txt
        outputs = Argument("output", output_file)           # output_{i}.txt
    )
    output_files.append(output_file)

# add layer to DAG
dag.attach(process_layer)

# create combine layer, add node
combine_layer = Layer("combine_bins", submit_description=options)
combine_layer += Node(
    arguments = Option("verbose"),                          # --verbose
    inputs = Argument("input", output_files),               # output_0.txt output_1.txt output_2.txt
    outputs = Argument("output", "combined.txt")            # combined.txt
)

# add layer to DAG
dag.attach(combine_layer)

# write DAG to disk
dag.write()

This generates 3 files, a DAG file (my_dag.dag) as well as submit files for each of the layers (2 total):

my_dag.dag:

# BEGIN META
# END META
# BEGIN NODES AND EDGES
JOB process_bins:00000 process_bins.sub
VARS process_bins:00000 nodename="process_bins:00000" log_dir="logs" job_index="0" verbose="--verbose" bins="--bins 0 --bins 3 --bins 6" input_="--input data.txt" input_input_="data.txt" output_="output_0.txt" output_output_="output_0.txt" output_output__remap=""
RETRY process_bins:00000 3
JOB process_bins:00001 process_bins.sub
VARS process_bins:00001 nodename="process_bins:00001" log_dir="logs" job_index="1" verbose="--verbose" bins="--bins 1 --bins 4 --bins 7" input_="--input data.txt" input_input_="data.txt" output_="output_1.txt" output_output_="output_1.txt" output_output__remap=""
RETRY process_bins:00001 3
JOB process_bins:00002 process_bins.sub
VARS process_bins:00002 nodename="process_bins:00002" log_dir="logs" job_index="2" verbose="--verbose" bins="--bins 2 --bins 5 --bins 8" input_="--input data.txt" input_input_="data.txt" output_="output_2.txt" output_output_="output_2.txt" output_output__remap=""
RETRY process_bins:00002 3
PARENT process_bins:00000 CHILD combine_bins:00000
PARENT process_bins:00001 CHILD combine_bins:00000
PARENT process_bins:00002 CHILD combine_bins:00000
JOB combine_bins:00000 combine_bins.sub
VARS combine_bins:00000 nodename="combine_bins:00000" log_dir="logs" verbose="--verbose" input_="output_0.txt output_1.txt output_2.txt" input_input_="output_0.txt,output_1.txt,output_2.txt" output_="combined.txt" output_output_="combined.txt" output_output__remap=""
RETRY combine_bins:00000 3
# END NODES AND EDGES

process_bins.sub:

universe = vanilla
executable = /path/to/process_bins
arguments = $(job_index) $(verbose) $(bins) $(input_) $(output_)
environment = "OMP_NUM_THREADS='1'"
requirements = (HAS_CVMFS_oasis_opensciencegrid_org=TRUE)
request_cpus = 1
request_memory = 2000
should_transfer_files = YES
when_to_transfer_output = ON_SUCCESS
success_exit_code = 0
preserve_relative_paths = True
transfer_input_files = $(input_input_)
transfer_output_files = $(output_output_)
transfer_output_remaps = "$(output_output__remap)"
output = $(log_dir)/$(nodename)-$(cluster)-$(process).out
error = $(log_dir)/$(nodename)-$(cluster)-$(process).err
notification = never

queue

combine_bins.sub:

universe = vanilla
executable = /path/to/combine_bins
arguments = $(verbose) $(input_) $(output_)
environment = "OMP_NUM_THREADS='1'"
requirements = (HAS_CVMFS_oasis_opensciencegrid_org=TRUE)
request_cpus = 1
request_memory = 2000
should_transfer_files = YES
when_to_transfer_output = ON_SUCCESS
success_exit_code = 0
preserve_relative_paths = True
transfer_input_files = $(input_input_)
transfer_output_files = $(output_output_)
transfer_output_remaps = "$(output_output__remap)"
output = $(log_dir)/$(nodename)-$(cluster)-$(process).out
error = $(log_dir)/$(nodename)-$(cluster)-$(process).err
notification = never

queue

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ezdag-0.4.0.tar.gz (88.7 kB view details)

Uploaded Source

Built Distribution

ezdag-0.4.0-py3-none-any.whl (39.4 kB view details)

Uploaded Python 3

File details

Details for the file ezdag-0.4.0.tar.gz.

File metadata

  • Download URL: ezdag-0.4.0.tar.gz
  • Upload date:
  • Size: 88.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: python-httpx/0.27.2

File hashes

Hashes for ezdag-0.4.0.tar.gz
Algorithm Hash digest
SHA256 569a6981557950a76fb54e58d41d759b5ac13ac202bfbd5a5a318cd7b497fc6b
MD5 0611ff4873f368276011286a6e972b80
BLAKE2b-256 8e2d2980219759dcf8660a9ab0dc848a1ed62127cb79b757ea7ccfaed95cb965

See more details on using hashes here.

File details

Details for the file ezdag-0.4.0-py3-none-any.whl.

File metadata

  • Download URL: ezdag-0.4.0-py3-none-any.whl
  • Upload date:
  • Size: 39.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: python-httpx/0.27.2

File hashes

Hashes for ezdag-0.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 3580e4969c81abfb3450c48d6b64357c2773a7e7384f36cbec55aafcb3f31559
MD5 455ca071a04d2775b901978c722b7308
BLAKE2b-256 5c68d2d8b68b13044a62cae15721a34cc6d001ace7e7c475629246ec5b536041

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page