Implementation of the futures and multiprocessing Pool interfaces based on MPI.
Project description
About
mpipool
offers MPI based parallel execution of tasks through implementations of
Python's standard library interfaces such as multiprocessing
and concurrent.futures
.
multiprocessing
is a slightly older library and its interface is not adapted to how
asynchronous programming has evolved in recent years. We advise
to use the MPIExecutor
pool instead from the more recent concurrent.futures
interface.
MPIExecutor
Executor
s are pool objects that return Future
s when tasks are submitted. The MPIExecutor
runs
each task on an MPI (worker) process and listens for its reply on a thread that controls the Future
object that was returned to the user. The user's program can continue asynchronously and inspect or
block for the Future
s result.
Control flow with pools
Pools usually have a single master script that spawns and controls additional workers who wait idly
for instructions. When using static process management under MPI this is not the case. n
identical
scripts are started and execute the same code. Once they reach the pool constructor their flow diverges:
- The workers enter the constructor and enter into a loop where they idle for instructions or the shutdown signal.
- The master enters the pool constructor and immediately exits it, code execution continues on the master process and it reads the pool instructions to dispatch to the workers.
Once the master process encounters the shutdown command (or when Python shuts down) it sends the shutdown signal to the workers. These workers will now exit their work loop inside of the constructor, exit the constructor and if control flow is not well managed they will also attempt, and fail, to dispatch the pool instructions.
To prevent this using pool objects will usually follow these idioms:
from mpipool import MPIExecutor
from mpi4py.MPI import COMM_WORLD
pool = MPIExecutor()
if pool.is_main():
try:
pool.map(len, ([],[]))
finally:
pool.shutdown()
# Wait for all the workers to finish and continue together
COMM_WORLD.Barrier()
print("All processes continue code execution")
We recommend using a context manager, which guarantees proper shutdown and makes things
slightly neater. The workers_exit
makes the workers skip the instructions inside of
the with
block:
from mpipool import MPIExecutor
from mpi4py.MPI import COMM_WORLD
with MPIExecutor() as pool:
pool.workers_exit()
pool.map(len, ([], []))
# Wait for all the workers to finish and continue together
COMM_WORLD.Barrier()
print("All processes continue code execution"
Note: mpipool
currently does not support dynamic process management, but mpi4py
does. If
your MPI implementation supports MPI_Spawn
you can use dynamic process management instead.
Job submission
To submit a job to the pool use future = pool.submit(fn, *args, **kwargs)
and future.result()
to block and wait for its result.
from mpipool import MPIExecutor
from mpi4py import MPI
def menial_task(x):
return x ** MPI.COMM_WORLD.Get_rank()
with MPIExecutor() as pool:
pool.workers_exit()
# Submit all the task and store their `future`s
fs = [pool.submit(menial_task, i) for i in range(100)]
# Wait for all of the `future` results and print them
print([f.result() for f in fs])
Batch submission
To launch a batch of jobs use the submit_batch
or map
function.
Both require the arguments to be given as iterables and will lazily
consume the iterators as previous jobs finish and the next set of arguments
is required.
Using map
is a shorter notation to dispatch the same function with different args
and to wait for all results:
results = pool.map(menial_task, range(100))
Note: submit_batch
and the Batch
objects are not part of the
standard library interfaces.
MPIPool
Pools execute tasks using worker processes. Use apply
or map
to block for task results
or apply_async
and map_async
to obtain an AsyncResult
that you can check or wait for
asynchronously.
Example usage
from mpipool import MPIPool
from mpi4py import MPI
def menial_task(x):
return x ** MPI.COMM_WORLD.Get_rank()
with MPIPool() as pool:
pool.workers_exit()
# Sync
results = pool.map(menial_task, range(100))
# Async
map_obj = pool.map_async(menial_task, range(100))
print("Done already?", map_obj.ready())
results = map_obj.get()
Note: Use MPI helpers such as mpirun
, mpiexec
or SLURM's srun
:
$ mpirun -n 4 python example.py
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file mpipool-2.2.1.tar.gz
.
File metadata
- Download URL: mpipool-2.2.1.tar.gz
- Upload date:
- Size: 11.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.12.1
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | dc735b994349ae3e06fce7c3601523ba062125ffa6dd4c6c51a94c168c9ff92c |
|
MD5 | 112803adf64cfa346698609e6380cf17 |
|
BLAKE2b-256 | 2ca72c26e993d48bfcfbeeef9e1f727d11edf843e18186a72083da8dcde3a43c |
File details
Details for the file mpipool-2.2.1-py3-none-any.whl
.
File metadata
- Download URL: mpipool-2.2.1-py3-none-any.whl
- Upload date:
- Size: 10.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.12.1
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 4fb912ddab2fcac2706c45c77a659513cc27ebaebbdbddcee7d81307d77a84ba |
|
MD5 | 48a39c1468fd2ca94f0720f77f4c3368 |
|
BLAKE2b-256 | 8b5ff22a36ccd59486857a24af7bc458b8d333854a9469cce99ef22298367706 |