Skip to main content

The cmd_queue module for a DAG of bash commands

Project description

Command Queue - cmd_queue

Pypi Downloads ReadTheDocs

Read the docs

https://cmd_queue.readthedocs.io

Github

https://github.com/Erotemic/cmd_queue

Pypi

https://pypi.org/project/cmd_queue

Slides

https://docs.google.com/presentation/d/1BjJkjMx6bxu1uek-hAGpwj760u9rraVn7st8J5OsZME

This is a simple module for “generating” a bash script that schedules multiples jobs (in parallel if possible) on a single machine. There are 3 backends with increasing levels of complexity: serial, tmux, and slurm.

In serial mode, a single bash script gets written that executes your jobs in sequence. There are no external dependencies

In tmux mode, multiple tmux sessions get opened and each of them executes your independent parts of your jobs. Dependencies are handled.

In slurm mode, a real heavy-weight scheduling algorithm is used. In this mode we simply convert your jobs to slurm commands and execute them.

Under the hood we build a DAG based on your specified dependencies and use this to appropriately order jobs.

By default, bash scripts that would execute your jobs print to the console. This gives the user fine-grained control if they only want to run a subset of a pipeline manually. But if asked to run, cmd_queue will execute the bash jobs.

Modivation

Recently, I needed to run several jobs on 4 jobs across 2 GPUs and then execute a script after all of them were done. What I should have done was use slurm or some other proper queuing system to schedule the jobs, but instead I wrote my own hacky scheduler using tmux. I opened N (number of parallel workers) tmux sessions and then I ran independent jobs in each different sessions.

This worked unreasonably well for my use cases, and it was nice to be able to effectively schedule jobs without heavyweight software like slurm on my machine.

Eventually I did get slurm on my machine, and I abstracted the API of my tmux_queue to be a general “command queue” that can use 1 of 3 backends: serial, tmux, or slurm.

Niche

There are many DAG schedulers out there:

  • airflow

  • luigi

  • submitit

  • rq_scheduler

The the niche for this is when you have large pipelines of bash commands that depend on each other and you want to template out those parameters with logic that you define in Python.

We plan on adding an airflow backend.

Examples

All of the dependency checking and book keeping logic is handled in bash itself. Write (or better yet template) your bash scripts in Python, and then use cmd_queue to “transpile” these sequences of commands to pure bash.

import cmd_queue
self = cmd_queue.Queue.create(name='demo_queue', backend='serial')
job1 = self.submit('echo hello && sleep 0.5')
job2 = self.submit('echo world && sleep 0.5', depends=[job1])
job3 = self.submit('echo foo && sleep 0.5')
job4 = self.submit('echo bar && sleep 0.5')
job5 = self.submit('echo spam && sleep 0.5', depends=[job1])
job6 = self.submit('echo spam && sleep 0.5')
job7 = self.submit('echo err && false')
job8 = self.submit('echo spam && sleep 0.5')
job9 = self.submit('echo eggs && sleep 0.5', depends=[job8])
job10 = self.submit('echo bazbiz && sleep 0.5', depends=[job9])

# Display the "user-friendly" pure bash
self.rprint()

# Display the real bash that gets executed under the hood
# that is independencly executable, tracks the success / failure of each job,
# and manages dependencies.
self.rprint(1, 1)

# Blocking will display a job monitor while it waits for everything to
# complete
self.run(block=True)

This prints the bash commands in an appropriate order to resolve dependencies.

# --- /home/joncrall/.cache/base_queue/demo_queue_2022-04-08_cc9d551e/demo_queue_2022-04-08_cc9d551e.sh

#!/bin/bash
#
# Jobs
#
### Command 1 / 10 - demo_queue-job-0
echo hello && sleep 0.5
#
### Command 2 / 10 - demo_queue-job-1
echo world && sleep 0.5
#
### Command 3 / 10 - demo_queue-job-2
echo foo && sleep 0.5
#
### Command 4 / 10 - demo_queue-job-3
echo bar && sleep 0.5
#
### Command 5 / 10 - demo_queue-job-4
echo spam && sleep 0.5
#
### Command 6 / 10 - demo_queue-job-5
echo spam && sleep 0.5
#
### Command 7 / 10 - demo_queue-job-6
echo err && false
#
### Command 8 / 10 - demo_queue-job-7
echo spam && sleep 0.5
#
### Command 9 / 10 - demo_queue-job-8
echo eggs && sleep 0.5
#
### Command 10 / 10 - demo_queue-job-9
echo bazbiz && sleep 0.5
# Need to tell the tmux queue how many processes can run at the same time
import cmd_queue
self = cmd_queue.Queue.create(size=4, name='demo_queue', backend='tmux')
job1 = self.submit('echo hello && sleep 0.5')
job2 = self.submit('echo world && sleep 0.5', depends=[job1])
job3 = self.submit('echo foo && sleep 0.5')
job4 = self.submit('echo bar && sleep 0.5')
job5 = self.submit('echo spam && sleep 0.5', depends=[job1])
job6 = self.submit('echo spam && sleep 0.5')
job7 = self.submit('echo err && false')
job8 = self.submit('echo spam && sleep 0.5')
job9 = self.submit('echo eggs && sleep 0.5', depends=[job8])
job10 = self.submit('echo bazbiz && sleep 0.5', depends=[job9])

# Display the "user-friendly" pure bash
self.rprint()

# Display the real bash that gets executed under the hood
# that is independencly executable, tracks the success / failure of each job,
# and manages dependencies.
self.rprint(1, 1)

# Blocking will display a job monitor while it waits for everything to
# complete
self.run(block=True)

This prints the sequence of bash commands that will be executed in each tmux session.

# --- /home/joncrall/.cache/base_queue/demo_queue_2022-04-08_a1ef7600/queue_demo_queue_0_2022-04-08_a1ef7600.sh

#!/bin/bash
#
# Jobs
#
### Command 1 / 3 - demo_queue-job-7
echo spam && sleep 0.5
#
### Command 2 / 3 - demo_queue-job-8
echo eggs && sleep 0.5
#
### Command 3 / 3 - demo_queue-job-9
echo bazbiz && sleep 0.5

# --- /home/joncrall/.cache/base_queue/demo_queue_2022-04-08_a1ef7600/queue_demo_queue_1_2022-04-08_a1ef7600.sh

#!/bin/bash
#
# Jobs
#
### Command 1 / 2 - demo_queue-job-2
echo foo && sleep 0.5
#
### Command 2 / 2 - demo_queue-job-6
echo err && false

# --- /home/joncrall/.cache/base_queue/demo_queue_2022-04-08_a1ef7600/queue_demo_queue_2_2022-04-08_a1ef7600.sh

#!/bin/bash
#
# Jobs
#
### Command 1 / 2 - demo_queue-job-0
echo hello && sleep 0.5
#
### Command 2 / 2 - demo_queue-job-5
echo spam && sleep 0.5

# --- /home/joncrall/.cache/base_queue/demo_queue_2022-04-08_a1ef7600/queue_demo_queue_3_2022-04-08_a1ef7600.sh

#!/bin/bash
#
# Jobs
#
### Command 1 / 1 - demo_queue-job-3
echo bar && sleep 0.5

# --- /home/joncrall/.cache/base_queue/demo_queue_2022-04-08_a1ef7600/queue_demo_queue_4_2022-04-08_a1ef7600.sh

#!/bin/bash
#
# Jobs
#
### Command 1 / 1 - demo_queue-job-4
echo spam && sleep 0.5

# --- /home/joncrall/.cache/base_queue/demo_queue_2022-04-08_a1ef7600/queue_demo_queue_5_2022-04-08_a1ef7600.sh

#!/bin/bash
#
# Jobs
#
### Command 1 / 1 - demo_queue-job-1
echo world && sleep 0.5

Slurm mode is the real deal. But you need slurm installed on your machint to use it. Asking for tmux is a might ligher weight tool. We can specify slurm options here

import cmd_queue
self = cmd_queue.Queue.create(name='demo_queue', backend='slurm')
job1 = self.submit('echo hello && sleep 0.5', cpus=4, mem='8GB')
job2 = self.submit('echo world && sleep 0.5', depends=[job1], parition='default')
job3 = self.submit('echo foo && sleep 0.5')
job4 = self.submit('echo bar && sleep 0.5')
job5 = self.submit('echo spam && sleep 0.5', depends=[job1])
job6 = self.submit('echo spam && sleep 0.5')
job7 = self.submit('echo err && false')
job8 = self.submit('echo spam && sleep 0.5')
job9 = self.submit('echo eggs && sleep 0.5', depends=[job8])
job10 = self.submit('echo bazbiz && sleep 0.5', depends=[job9])

# Display the "user-friendly" pure bash
self.rprint()

# Display the real bash that gets executed under the hood
# that is independencly executable, tracks the success / failure of each job,
# and manages dependencies.
self.rprint(1, 1)

# Blocking will display a job monitor while it waits for everything to
# complete
self.run(block=True)

This prints the very simple slurm submission script:

# --- /home/joncrall/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/demo_queue-20220408T170615-a9e238b5.sh

mkdir -p "$HOME/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/logs"
JOB_000=$(sbatch --job-name="J0000-demo_queue-20220408T170615-a9e238b5" --cpus-per-task=4 --mem=8000 --output="/home/joncrall/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/logs/J0000-demo_queue-20220408T170615-a9e238b5.sh" --wrap 'echo hello && sleep 0.5' --parsable)
JOB_001=$(sbatch --job-name="J0002-demo_queue-20220408T170615-a9e238b5" --output="/home/joncrall/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/logs/J0002-demo_queue-20220408T170615-a9e238b5.sh" --wrap 'echo foo && sleep 0.5' --parsable)
JOB_002=$(sbatch --job-name="J0003-demo_queue-20220408T170615-a9e238b5" --output="/home/joncrall/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/logs/J0003-demo_queue-20220408T170615-a9e238b5.sh" --wrap 'echo bar && sleep 0.5' --parsable)
JOB_003=$(sbatch --job-name="J0005-demo_queue-20220408T170615-a9e238b5" --output="/home/joncrall/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/logs/J0005-demo_queue-20220408T170615-a9e238b5.sh" --wrap 'echo spam && sleep 0.5' --parsable)
JOB_004=$(sbatch --job-name="J0006-demo_queue-20220408T170615-a9e238b5" --output="/home/joncrall/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/logs/J0006-demo_queue-20220408T170615-a9e238b5.sh" --wrap 'echo err && false' --parsable)
JOB_005=$(sbatch --job-name="J0007-demo_queue-20220408T170615-a9e238b5" --output="/home/joncrall/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/logs/J0007-demo_queue-20220408T170615-a9e238b5.sh" --wrap 'echo spam && sleep 0.5' --parsable)
JOB_006=$(sbatch --job-name="J0001-demo_queue-20220408T170615-a9e238b5" --output="/home/joncrall/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/logs/J0001-demo_queue-20220408T170615-a9e238b5.sh" --wrap 'echo world && sleep 0.5' "--dependency=afterok:${JOB_000}" --parsable)
JOB_007=$(sbatch --job-name="J0004-demo_queue-20220408T170615-a9e238b5" --output="/home/joncrall/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/logs/J0004-demo_queue-20220408T170615-a9e238b5.sh" --wrap 'echo spam && sleep 0.5' "--dependency=afterok:${JOB_000}" --parsable)
JOB_008=$(sbatch --job-name="J0008-demo_queue-20220408T170615-a9e238b5" --output="/home/joncrall/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/logs/J0008-demo_queue-20220408T170615-a9e238b5.sh" --wrap 'echo eggs && sleep 0.5' "--dependency=afterok:${JOB_005}" --parsable)
JOB_009=$(sbatch --job-name="J0009-demo_queue-20220408T170615-a9e238b5" --output="/home/joncrall/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/logs/J0009-demo_queue-20220408T170615-a9e238b5.sh" --wrap 'echo bazbiz && sleep 0.5' "--dependency=afterok:${JOB_008}" --parsable)

Installation

This will be on pypi once it is cleaned up, but for now:

python -m pip install git+https://gitlab.kitware.com/computer-vision/cmd_queue.git@main

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

cmd_queue-0.1.2-py3-none-any.whl (46.2 kB view details)

Uploaded Python 3

File details

Details for the file cmd_queue-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: cmd_queue-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 46.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.8.13

File hashes

Hashes for cmd_queue-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 e9de05e9fd8336db5476b0f1a13cf9720b3f8d18eaa592a81d0764752af6385f
MD5 282430a4ab8d1e5c43c738155bde6635
BLAKE2b-256 4aa077373e0e768e3279a16b2acedb2f2eead87184e71ff877af0d0410293002

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page