Skip to main content

A job pool for distributed compute clusters inspired by python's multoprocessing.Pool.

Project description

TestStatus PyPiStatus BlackStyle BlackPackStyle MITLicenseBadge

A python package for job pools (as in multiprocessing.Pool()) which makes use of workload managers on distributed compute clusters.

The pypoolparty provides a Pool() with a map() function which aims to be a drop-in-replacement for multiprocessing.Pool()’s map(). This way you can always fall back to the builtin pools and map-functions in case a distributed compute cluster is not available.

This package respects the concept of ‘fair share’ what is commonly found in scientific environments, but is not common in commercial environments. Here, fair share means that compute resources are only requested when they are needed. Compute resources are not requested to just idle and wait for jobs to be submitted.

A consequence of this fair sharing is, that this package expects your jobs to randomly die in conflicts for resources with jobs submitted by other users, such as conflicts for limited disk space on temporary drives. If your jobs run into error states, they will be resubmitted until a predefined limit is reached.

Installing

pip install pypoolparty

Basic Usage

import pypoolparty

pool = pypoolparty.slurm.array.Pool()
results = pool.map(sum, [[1, 2], [2, 3], [4, 5], ])

The pool implements two functions map and starmap.

import operator

results = pool.starmap(operator.eq, zip([1, 2, 3], [1, "nope", 3]))

For more details, see the Pool()'s docs, e.g. pypoolparty.slurm.array.Pool?. Options to the Pool()s are defined in therir constructors e.g.

pool = pypoolparty.slurm.array.Pool(
    num_simultaneously_running_tasks=200,
    python_path="/path/to/python/interpreter/to/be/called/on/the/worker/nodes",
    polling_interval=5.0,
    work_dir="/path/to/the/pools/work_dir/where/the/map/and/reduce/happens",
    keep_work_dir=True,  # e.g for debugging
    verbose=True,  # Talk to me!
    slurm_call_timeout=60.0,
    max_num_resubmissions=3,
)

Pools

pypoolparty.slurm.array.Pool()

Uses slurm’s --array option. It will call sbatch --array, squeue and scancel.

pypoolparty.sun_grid_engine.Pool()

It will call qsub, qstat and qdel.

pypoolparty.slurm.Pool()

It will call sbatch, squeue and scancel. Uses the same inner workings as pypoolparty.sun_grid_engine.Pool().

Testing

The pypoolparty comes with ist own dummys for slurm and the sun grid engine. This allows to test the full chain without the actual workload managers to be installed.

pytest -s pypoolparty

Workload managers

We tested:

  • SLURM, version 22.05.6

  • Sun Grid Engine (SGE), version 8.1.9

Alternatives

When you do not share resources with other users, when you do not need to respect fair share, and when you have some administrative power you might want to use one of these:

  • Dask has a job_queue which also supports other flavors such as PBS, SLURM.

  • pyABC.sge has a pool.map() very much like the one in this package.

  • ipyparallel

Inner Workings

The maaping and reducing takes place in a work_dir in the filesystem. The work_dir can be defined manually and must be reachable by all compute notes.

slurm.array.Pool

  • Makes a work_dir where it creates a zip-file named tasks.zip in which it dumps all tasks using pickle.

  • Starts a logger which logs into a file named log.jsonl in the work_dir.

  • Makea a script which will execute the tasks on the compute nodes and dumps the script named script.py into the work_dir. The script contains the path to the work_dir and queries the environment variable SLURM_ARRAY_TASK_ID to determine which task it shall process. It will write its result, stdout and stderr, and potentially a report of raised exceptions into the work_dir.

  • Calls sbatch --array

  • After the initial call of sbatch, we wait for the jobs to return (to write their results) or to get stuck in some error state. With a polling interval of 5s (can be adjusted), the work_dir is searched for results and squeue is searched for jobs in error states. When results are found in the work_dir, they are read and appended into the four zip-files named tasks.results.zip, tasks.stdout.zip, tasks.stderr.zip, and tasks.exceptions.zip. When the individual files writen by a job got appended to the zip-files, the individual files are removed to keep the number of files low.

  • If the poll of squeue indicates tasks with error like flags, these specific tasks will be removed from the queue by calling scancel and then added again by calling sbatch --array until a predefined limit of resubmissions is reached.

  • Finally, either all tasks returned results or got finally stuck in errors and exceptions. The results are read into memory from work_dir/tasks_results.zip and returned by the map() function. If there was non zero stderr or an exception, the work_dir will not be removed after the call of map(), but will stay for potential debugging.

sun_grid_engine.Pool and slurm.Pool

  • map() makes a work_dir because the mapping and reducing takes place in the filesystem. You can set work_dir manually to make sure both the worker nodes and the process node can reach it.

  • map() serializes your tasks using pickle into separate files in work_dir/{ichunk:09d}.pkl.

  • map() reads all environment variables in its process.

  • map() creates the worker-node script in work_dir/worker_node_script.py. It contains and exports the process’ environment variables into the batch job’s context. It reads the chunk of tasks in work_dir/{ichunk:09d}.pkl, imports and runs your func(task), and finally writes the result back to work_dir/{ichunk:09d}.pkl.out.

  • map() submits queue jobs. The stdout and stderr of the tasks are written to work_dir/{ichunk:09d}.pkl.o and work_dir/{ichunk:09d}.pkl.e respectively. By default, shutil.which("python") is used to process the worker-node-script.

  • When all queue jobs are submitted, map() monitors their progress. In case a queue-job runs into an error-state, the job will be deleted and resubmitted until a maximum number of resubmissions is reached.

  • When no more queue jobs are running or pending, map() will reduce the results from work_dir/{ichunk:09d}.pkl.out.

  • In case of non-zero stderr in any task, a missing result, or on the user’s request, the work_dir will be kept for inspection. Otherwise its removed.

Environment Variables

All the user’s environment variables in the process where map() is called will be exported in the queue job’s context.

The worker-node script explicitly sets the environment variables. This package does not rely on the batch system’s ability (slurm/sge) to do so.

Wording

  • task is a valid input to func. The tasks are the actual payload to be processed.

  • iterable is an iterable (list) of tasks. It is the naming adopted from multiprocessing.Pool.map.

  • itask is the index of a task in iterable.

  • chunk is a chunk of tasks which is processed on a worker-node in serial.

  • ichunk is the index of a chunk. It is used to create the chunks’s filenames such as work_dir/{ichunk:09d}.pkl.

  • queue-job is what we submit into the queue. Each queue-job processes the tasks in a single chunk in series.

  • jobname or job["name"] is assigned to a queue job by our map(). It is composed of our map()’s session-id, and ichunk. E.g. "q"%Y-%m-%dT%H:%M:%S"#{ichunk:09d}"

Testing for developers

The tests have an option --debug_dir which allows to make the otherwise temporary output and working directories to remain after the tests have run.

pytest -s --debug_dir path/to/do/debugging pypoolparty

dummy queue

To test our map() we provide a dummy qsub, qstat, and qdel for the sun-grid-engine, and a dummy sbatch, squeue, and scancel for slurm. These are individual python scripts which all act on a common state file named queue_state.json in order to imitate the workload managers.

  • qsub/sbatch only append pening jobs to the list of jobs in queue_state.json.

  • qdel/scancel only remove jobs from the list of jobs in queue_state.json.

  • qstat/squeue changes the state of jobs from pending to running, and triggers the actual processing of the jobs. Each time qstat.py is called it performs a single action on queue_state.json. So it must be called multiple times to process all jobs. It can intentionally bring jobs into error states when this is set accordingly in the queue_state.json.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pypoolparty-0.3.1.tar.gz (42.7 kB view details)

Uploaded Source

Built Distribution

pypoolparty-0.3.1-py3-none-any.whl (49.3 kB view details)

Uploaded Python 3

File details

Details for the file pypoolparty-0.3.1.tar.gz.

File metadata

  • Download URL: pypoolparty-0.3.1.tar.gz
  • Upload date:
  • Size: 42.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.0 CPython/3.12.4

File hashes

Hashes for pypoolparty-0.3.1.tar.gz
Algorithm Hash digest
SHA256 172ba617ee0aefe8a4e4a5189aed387999fa8fb61830bf3ba329a50ddf1128b2
MD5 47205d29bea548e11c1d137ace916d97
BLAKE2b-256 5e5f7a42a8ed11b2bd7242e05a36cfa86e4264c53fde13a668f77920fbefc68c

See more details on using hashes here.

File details

Details for the file pypoolparty-0.3.1-py3-none-any.whl.

File metadata

  • Download URL: pypoolparty-0.3.1-py3-none-any.whl
  • Upload date:
  • Size: 49.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.0 CPython/3.12.4

File hashes

Hashes for pypoolparty-0.3.1-py3-none-any.whl
Algorithm Hash digest
SHA256 8e7b5eb2a106fc87394b077eab18e0403fcb818b0f004205a7c1df06ba56651e
MD5 0fca68467490ad63bffb1f53da7f43a3
BLAKE2b-256 865c5eaa8bdd84bbeee8a7ceca8ad82cd290af2b743d5d800e037b0cdf56d5c5

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page