Skip to main content

MPI based task scheduling for both `multiprocessing` and `concurrent.futures` interfaces.

Project description

Documentation Status

About

mpipool offers MPI based parallel execution of tasks through implementations of Python's standard library interfaces such as multiprocessing and concurrent.futures.

MPIExecutor

Executors are objects that return Futures when tasks are submitted. The MPIExecutor runs each task on an MPI process and listens for its reply on a thread that controls the Future object that was returned to the user.

Example usage

from mpipool import MPIExecutor
from mpi4py import MPI

def menial_task(x):
  return x ** MPI.COMM_WORLD.Get_rank()

with MPIExecutor() as pool:
  pool.workers_exit()
  print("Only the master executes this code.")

  # Submit some tasks to the pool
  fs = [pool.submit(menial_task, i) for i in range(100)]

  # Wait for all of the results and print them
  print([f.result() for f in fs])

  # A shorter notation to dispatch the same function with different args
  # and to wait for all results is the `.map` method:
  results = pool.map(menial_task, range(100))

print("All processes join again here.")

You'll see that some results will have exponentiated either by 1, 2, ..., n depending on which worker they were sent to. It's also important to prevent your workers from running the master code using the pool.workers_exit() call. As a fail safe any attribute access on the pool object made from workers will result in them exiting anyway.

Note: Use MPI helpers such as mpirun, mpiexec or SLURM's srun:

$ mpirun -n 4 python example.py

MPIPool

Pools execute tasks using worker processes. Use apply or map to block for task results or apply_async and map_async to obtain an AsyncResult that you can check or wait for asynchronously.

Example usage

from mpipool import MPIPool
from mpi4py import MPI

def menial_task(x):
  return x ** MPI.COMM_WORLD.Get_rank()

with MPIPool() as pool:
  pool.workers_exit()
  print("Only the master executes this code.")

  # Block for results
  results = pool.map(menial_task, range(100))

  # Async
  result = pool.map_async(menial_task, range(100))
  print("Done already?", result.ready())

print("All processes join again here.")

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

mpipool-1.0.0a1-py3-none-any.whl (7.7 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page