Skip to main content

Implementation of the futures and multiprocessing Pool interfaces based on MPI.

Project description

Documentation Status

About

mpipool offers MPI based parallel execution of tasks through implementations of Python's standard library interfaces such as multiprocessing and concurrent.futures. multiprocessing is a slightly older library and its interface is not adapted to how asynchronous programming has evolved in recent years. We advise to use the MPIExecutor pool instead from the more recent concurrent.futures interface.

MPIExecutor

Executors are pool objects that return Futures when tasks are submitted. The MPIExecutor runs each task on an MPI (worker) process and listens for its reply on a thread that controls the Future object that was returned to the user. The user's program can continue asynchronously and inspect or block for the Futures result.

Control flow with pools

Pools usually have a single master script that spawns and controls additional workers who wait idly for instructions. When using static process management under MPI this is not the case. n identical scripts are started and execute the same code. Once they reach the pool constructor their flow diverges:

  • The workers enter the constructor and enter into a loop where they idle for instructions or the shutdown signal.
  • The master enters the pool constructor and immediately exits it, code execution continues on the master process and it reads the pool instructions to dispatch to the workers.

Once the master process encounters the shutdown command (or when Python shuts down) it sends the shutdown signal to the workers. These workers will now exit their work loop inside of the constructor, exit the constructor and if control flow is not well managed they will also attempt, and fail, to dispatch the pool instructions.

To prevent this using pool objects will usually follow these idioms:

from mpipool import MPIExecutor
from mpi4py.MPI import COMM_WORLD

pool = MPIExecutor()
if pool.is_main():
  try:
    pool.map(len, ([],[]))
  finally:
    pool.shutdown()

# Wait for all the workers to finish and continue together
COMM_WORLD.Barrier()
print("All processes continue code execution")

We recommend using a context manager, which guarantees proper shutdown and makes things slightly neater. The workers_exit makes the workers skip the instructions inside of the with block:

from mpipool import MPIExecutor
from mpi4py.MPI import COMM_WORLD

with MPIExecutor() as pool:
  pool.workers_exit()
  pool.map(len, ([], []))

# Wait for all the workers to finish and continue together
COMM_WORLD.Barrier()
print("All processes continue code execution"

Note: mpipool currently does not support dynamic process management, but mpi4py does. If your MPI implementation supports MPI_Spawn you can use dynamic process management instead.

Job submission

To submit a job to the pool use future = pool.submit(fn, *args, **kwargs) and future.result() to block and wait for its result.

from mpipool import MPIExecutor
from mpi4py import MPI

def menial_task(x):
  return x ** MPI.COMM_WORLD.Get_rank()

with MPIExecutor() as pool:
  pool.workers_exit()
  # Submit all the task and store their `future`s
  fs = [pool.submit(menial_task, i) for i in range(100)]
  # Wait for all of the `future` results and print them
  print([f.result() for f in fs])

Batch submission

To launch a batch of jobs use the submit_batch or map function. Both require the arguments to be given as iterables and will lazily consume the iterators as previous jobs finish and the next set of arguments is required.

Using map is a shorter notation to dispatch the same function with different args and to wait for all results:

  results = pool.map(menial_task, range(100))

Note: submit_batch and the Batch objects are not part of the standard library interfaces.

MPIPool

Pools execute tasks using worker processes. Use apply or map to block for task results or apply_async and map_async to obtain an AsyncResult that you can check or wait for asynchronously.

Example usage

from mpipool import MPIPool
from mpi4py import MPI

def menial_task(x):
  return x ** MPI.COMM_WORLD.Get_rank()

with MPIPool() as pool:
  pool.workers_exit()

  # Sync
  results = pool.map(menial_task, range(100))

  # Async
  map_obj = pool.map_async(menial_task, range(100))
  print("Done already?", map_obj.ready())
  results = map_obj.get()

Note: Use MPI helpers such as mpirun, mpiexec or SLURM's srun:

$ mpirun -n 4 python example.py

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mpipool-2.2.1.tar.gz (11.1 kB view details)

Uploaded Source

Built Distribution

mpipool-2.2.1-py3-none-any.whl (10.8 kB view details)

Uploaded Python 3

File details

Details for the file mpipool-2.2.1.tar.gz.

File metadata

  • Download URL: mpipool-2.2.1.tar.gz
  • Upload date:
  • Size: 11.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.12.1

File hashes

Hashes for mpipool-2.2.1.tar.gz
Algorithm Hash digest
SHA256 dc735b994349ae3e06fce7c3601523ba062125ffa6dd4c6c51a94c168c9ff92c
MD5 112803adf64cfa346698609e6380cf17
BLAKE2b-256 2ca72c26e993d48bfcfbeeef9e1f727d11edf843e18186a72083da8dcde3a43c

See more details on using hashes here.

File details

Details for the file mpipool-2.2.1-py3-none-any.whl.

File metadata

  • Download URL: mpipool-2.2.1-py3-none-any.whl
  • Upload date:
  • Size: 10.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.12.1

File hashes

Hashes for mpipool-2.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 4fb912ddab2fcac2706c45c77a659513cc27ebaebbdbddcee7d81307d77a84ba
MD5 48a39c1468fd2ca94f0720f77f4c3368
BLAKE2b-256 8b5ff22a36ccd59486857a24af7bc458b8d333854a9469cce99ef22298367706

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page