Skip to main content

Map and reduce for batch-jobs in distributed computing.

Project description

TravisBuildStatus PyPIStatus BlackStyle

Queues for batch-jobs distribute your compute-tasks over multiple machines in parallel. This pool maps your tasks onto a queue and reduces the results.

import queue_map_reduce as qmr

pool = qmr.Pool()
results = pool.map(sum, [[1, 2], [2, 3], [4, 5], ])

A drop-in-replacement for builtins’ map(), and multiprocessing.Pool()’s map().

Requirements

  • Programs qsub, qstat, and qdel are required to submit, monitor, and delete queue-jobs.

  • Your func(task) must be part of an importable python module.

  • Your tasks and their results must be able to serialize using pickle.

  • Both worker-nodes and process-node can read and write from and to a common path in the file-system.

Queue flavor

Tested flavors are:

  • Sun Grid Engine (SGE) 8.1.9

Features

  • Respects fair-share, i.e. slots are only occupied when the compute is done.

  • No spawning of additional threads. Neither on the process-node, nor on the worker-nodes.

  • No need for databases or web-servers.

  • Queue-jobs with error-state 'E' can be deleted, and resubmitted until your predefined upper limit is reached.

  • Can bundle tasks on worker-nodes to avoid start-up-overhead with many small tasks.

Alternatives

When you do not share resources with other users, and when you have some administrative power you might want to use one of these:

  • Dask has a job_queue which also supports other flavors such as PBS, SLURM.

  • pyABC.sge has a sge.map() very much like our one.

  • ipyparallel

Inner workings

  • map() makes a work_dir because the mapping and reducing takes place in the file-system. You can set work_dir manually to make sure both worker-nodes and process-node can reach it.

  • map() serializes your tasks using pickle into separate files in work_dir/{ichunk:09d}.pkl.

  • map() reads all environment-variables in its process.

  • map() creates the worker-node-script in work_dir/worker_node_script.py. It contains and exports the process’ environment-variables into the batch-job’s context. It reads the chunk of tasks in work_dir/{ichunk:09d}.pkl, imports and runs your func(task), and finally writes the result back to work_dir/{ichunk:09d}.pkl.out.

  • map() submits queue-jobs. The stdout and stderr of the tasks are written to work_dir/{ichunk:09d}.pkl.o and work_dir/{ichunk:09d}.pkl.e respectively. By default, shutil.which("python") is used to process the worker-node-script.

  • When all queue-jobs are submitted, map() monitors their progress. In case a queue-job runs into an error-state ('E' by default) the job wFill be deleted and resubmitted until a maximum number of resubmissions is reached.

  • When no more queue-jobs are running or pending, map() will reduce the results from work_dir/{ichunk:09d}.pkl.out.

  • In case of non zero stderr in any task, a missing result, or on the user’s request, the work_dir will be kept for inspection. Otherwise its removed.

Wording

  • task is a valid input to func. The tasks are the actual payload to be processed.

  • iterable is an iterable (list) of tasks. It is the naming adopted from multiprocessing.Pool.map.

  • itask is the index of a task in iterable.

  • chunk is a chunk of tasks which is processed on a worker-node in serial.

  • ichunk is the index of a chunk. It is used to create the chunks’s filenames such as work_dir/{ichunk:09d}.pkl.

  • queue-job is what we submitt into the queue. Each queue-job processes the tasks in a single chunk in series.

  • JB_job_number is assigned to a queue-job by the queue-system for its own book-keeping.

  • JB_name is assigned to a queue-job by our map(). It is composed of our map()’s session-id, and ichunk. E.g. "q"%Y-%m-%dT%H:%M:%S"#{ichunk:09d}"

Environment Variables

All the user’s environment-variables in the process where map() is called will be exported in the queue-job’s context.

The worker-node-script sets the environment-variables. We do not use qsub’s argument -V because on some clusters this will not set all variables. Apparently some administrators fear security issues when using qsub -V to set LD_LIBRARY_PATH.

Testing

py.test -s .

dummy queue

To test our map() we provide a dummy qsub, qstat, and qdel. These are individual python-scripts which all act on a common state-file in tests/resources/dummy_queue_state.json in order to fake the sun-grid-engine’s queue.

  • dummy_qsub.py only appends queue-jobs to the list of pending jobs in the state-file.

  • dummy_qdel.py only removes queue-jobs from the state-file.

  • dummy_qstat.py does move the queue-jobs from the pending to the running list, and does trigger the actual processing of the jobs. Each time dummy_qstat.py is called it performs a single action on the state-file. So it must be called multiple times to process all jobs. It can intentionally bring jobs into the error-state when this is set in the state-file.

Before running the dummy-queue, its state-file must be initialized:

from queue_map_reduce import dummy_queue

dummy_queue.init_queue_state(
    path="tests/resources/dummy_queue_state.json"
)

When testing our map() you set its arguments qsub_path, qdel_path, and qstat_path to point to the dummy-queue.

See tests/test_full_chain_with_dummy_qsub.py.

Because of the global state-file, only one instance of dummy_queue must run at a time.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

Built Distribution

File details

Details for the file queue_map_reduce_sebastian-achim-mueller-1.1.1.tar.gz.

File metadata

File hashes

Hashes for queue_map_reduce_sebastian-achim-mueller-1.1.1.tar.gz
Algorithm Hash digest
SHA256 e8c61fce1523dcf8e55f455b79447ae9b4ceee00695897b4dbef4ee4cc5296f1
MD5 9c7fb7ffb302d9a22cba0ce96e6c5c19
BLAKE2b-256 1e5ce02276702628e67842914be909b593cd2bc32beddffb0f35bc5b7043ef5b

See more details on using hashes here.

File details

Details for the file queue_map_reduce_sebastian_achim_mueller-1.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for queue_map_reduce_sebastian_achim_mueller-1.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 2972b881426d101da9d5337850ec33d34e0812f233d78ea7eecc7fabaddf9817
MD5 6e545436cbfbf5e985980c59ff079504
BLAKE2b-256 42f5d91c973ee51c24187e35979576b68584d2c1988a0724fdfec1a03da9bbc2

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page