Skip to main content

Implementation of the futures and multiprocessing Pool interfaces based on MPI.

Project description

Documentation Status

About

mpipool offers MPI based parallel execution of tasks through implementations of Python's standard library interfaces such as multiprocessing and concurrent.futures. multiprocessing is a slightly older library and its interface is not adapted to how asynchronous programming has evolved in recent years. We advise to use the MPIExecutor pool instead from the more recent concurrent.futures interface.

MPIExecutor

Executors are pool objects that return Futures when tasks are submitted. The MPIExecutor runs each task on an MPI (worker) process and listens for its reply on a thread that controls the Future object that was returned to the user. The user's program can continue asynchronously and inspect or block for the Futures result.

Control flow with pools

Pools usually have a single master script that spawns and controls additional workers who wait idly for instructions. When using static process management under MPI this is not the case. n identical scripts are started and execute the same code. Once they reach the pool constructor their flow diverges:

  • The workers enter the constructor and enter into a loop where they idle for instructions or the shutdown signal.
  • The master enters the pool constructor and immediately exits it, code execution continues on the master process and it reads the pool instructions to dispatch to the workers.

Once the master process encounters the shutdown command (or when Python shuts down) it sends the shutdown signal to the workers. These workers will now exit their work loop inside of the constructor, exit the constructor and if control flow is not well managed they will also attempt, and fail, to dispatch the pool instructions.

To prevent this using pool objects will usually follow these idioms:

from mpipool import MPIExecutor
from mpi4py.MPI import COMM_WORLD

pool = MPIExecutor()
if pool.is_main():
  try:
    pool.map(len, ([],[]))
  finally:
    pool.shutdown()

# Wait for all the workers to finish and continue together
COMM_WORLD.Barrier()
print("All processes continue code execution")

We recommend using a context manager, which guarantees proper shutdown and makes things slightly neater. The workers_exit makes the workers skip the instructions inside of the with block:

from mpipool import MPIExecutor
from mpi4py.MPI import COMM_WORLD

with MPIExecutor() as pool:
  pool.workers_exit()
  pool.map(len, ([], []))

# Wait for all the workers to finish and continue together
COMM_WORLD.Barrier()
print("All processes continue code execution"

Note: mpipool currently does not support dynamic process management, but mpi4py does. If your MPI implementation supports MPI_Spawn you can use dynamic process management instead.

Job submission

To submit a job to the pool use future = pool.submit(fn, *args, **kwargs) and future.result() to block and wait for its result.

from mpipool import MPIExecutor
from mpi4py import MPI

def menial_task(x):
  return x ** MPI.COMM_WORLD.Get_rank()

with MPIExecutor() as pool:
  pool.workers_exit()
  # Submit all the task and store their `future`s
  fs = [pool.submit(menial_task, i) for i in range(100)]
  # Wait for all of the `future` results and print them
  print([f.result() for f in fs])

Batch submission

To launch a batch of jobs use the submit_batch or map function. Both require the arguments to be given as iterables and will lazily consume the iterators as previous jobs finish and the next set of arguments is required.

Using map is a shorter notation to dispatch the same function with different args and to wait for all results:

  results = pool.map(menial_task, range(100))

Note: submit_batch and the Batch objects are not part of the standard library interfaces.

MPIPool

Pools execute tasks using worker processes. Use apply or map to block for task results or apply_async and map_async to obtain an AsyncResult that you can check or wait for asynchronously.

Example usage

from mpipool import MPIPool
from mpi4py import MPI

def menial_task(x):
  return x ** MPI.COMM_WORLD.Get_rank()

with MPIPool() as pool:
  pool.workers_exit()

  # Sync
  results = pool.map(menial_task, range(100))

  # Async
  map_obj = pool.map_async(menial_task, range(100))
  print("Done already?", map_obj.ready())
  results = map_obj.get()

Note: Use MPI helpers such as mpirun, mpiexec or SLURM's srun:

$ mpirun -n 4 python example.py

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mpipool-2.0.0.tar.gz (10.8 kB view details)

Uploaded Source

Built Distribution

mpipool-2.0.0-py3-none-any.whl (10.5 kB view details)

Uploaded Python 3

File details

Details for the file mpipool-2.0.0.tar.gz.

File metadata

  • Download URL: mpipool-2.0.0.tar.gz
  • Upload date:
  • Size: 10.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.9.1

File hashes

Hashes for mpipool-2.0.0.tar.gz
Algorithm Hash digest
SHA256 1411e54b9e97ac09996721917e0d4f50f022461c353a4717a79ad39268b61d56
MD5 7f5c850c5b8df731ee01e12b9f361cad
BLAKE2b-256 01ca7f470e3aef3e85388cd1628a1cf0737cbc98deb577541c9af4c9c47c3b5a

See more details on using hashes here.

File details

Details for the file mpipool-2.0.0-py3-none-any.whl.

File metadata

  • Download URL: mpipool-2.0.0-py3-none-any.whl
  • Upload date:
  • Size: 10.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.9.1

File hashes

Hashes for mpipool-2.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 a6106af1e4d4e130b55e82972f4e72325205f3f46c6ec09b0c73d2feb1f0ab9d
MD5 b9909c19f3ddb7672b3a30ae79601a48
BLAKE2b-256 c3748749bfa19483968466c4bb72cf72bfb46740d4bcbad48e306b7576817fb4

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page