Skip to main content

A simple interface to multiprocessing

Project description

PyPI version Anaconda-Server Badge DOI Build Status codecov

Mantichora

A simple interface to multiprocessing


Mantichora provides a simple interface to multiprocessing.

from mantichora import mantichora

with mantichora() as mcore:
    mcore.run(func1)
    mcore.run(func2)
    mcore.run(func3)
    mcore.run(func4)
    results = mcore.returns()
 100.00% :::::::::::::::::::::::::::::::::::::::: |    12559 /    12559 |:  func1
  71.27% ::::::::::::::::::::::::::::             |    28094 /    39421 |:  func2
  30.34% ::::::::::::                             |    28084 /    92558 |:  func3
  35.26% ::::::::::::::                           |    27282 /    77375 |:  func4

You can simply give Mantichora as many functions as you need to run. Mantichora will run them concurrently in background processes by using multiprocessing and give you the return values of the functions. The return values are sorted in the order of the functions you have originally given to Mantichora. Progress bars from atpbar can be used in the functions.

The code in this package was originally developed in the sub-package concurrently of alphatwirl.

The examples in this file can be also run on Jupyter Notebook.
Binder



Requirement

  • Python 2.7, 3.6, or 3.7

Install

You can install with conda from conda-forge:

conda install -c conda-forge mantichora

or with pip:

pip install -U mantichora

User guide

Quick start

I will show here how to use Mantichora by simple examples.

Import libraries

We are going use two python standard libraries time and random in an example task function. In the example task function, we are also going to use atpbar for progress bars. Import these packages and mantichora.

import time, random
from atpbar import atpbar
from mantichora import mantichora

Define a task function

Let us define a simple task function.

def task_loop(name, ret=None):
    n = random.randint(1000, 10000)
    for i in atpbar(range(n), name=name):
        time.sleep(0.0001)
    return ret

The task in this function is to sleep for 0.0001 seconds as many times as the number randomly selected from between 1000 and 10000. atpbar is used to show a progress bar. The function takes two arguments: name, the label on the progress bar, and ret, the return value of the function.

Note: Mantichora uses multiprocessing to run task functions in background processes. As a result, task functions, their arguments, and their return values need to be picklable.

You can just try running this function without using Mantichora.

result = task_loop('task1', 'result1')

This doesn't return immediately. It waits for the function to finish. You will see a progress bar.

 100.00% :::::::::::::::::::::::::::::::::::::::: |    58117 /    58117 |:  task1

The return value is stored in result.

print(result)
 'result1'

Run tasks concurrently with Mantichora

Now, we run multiple tasks concurrently with Mantichora.

with mantichora(nworkers=3) as mcore:
    mcore.run(task_loop, 'task', ret='result1')
    mcore.run(task_loop, 'another task', ret='result2')
    mcore.run(task_loop, 'still another task', ret='result3')
    mcore.run(task_loop, 'yet another task', ret='result4')
    mcore.run(task_loop, 'task again', ret='result5')
    mcore.run(task_loop, 'more task', ret='result6')
    results = mcore.returns()

In the example code above, mantichora is initialized with an optional argument nworkers. The nworkers specifies the number of the workers. It is 3 in the above example. The default is 4. At most as many tasks as nworkers can run concurrently.

The with statement is used in the example. This ensures that mantichora properly ends the workers.

You can give task functions and their arguments to mcore.run(). You can call mcore.run() as many times as you need. In the above example, mcore.run() is called with the same task function with different arguments. You can also use a different function each time. mcore.run() returns immediately; it doesn't wait for the task to finish or even to start. In each call, mcore.run() only puts a task in a queue. The workers in background processes pick up tasks from the queue and run them.

The mcore.returns() waits until all tasks finish and returns their return values, which are sorted in the order of the tasks you have originally given to mcore.run().

Progress bars will be shown by atpbar.

 100.00% :::::::::::::::::::::::::::::::::::::::: |     1415 /     1415 |:  still another task
 100.00% :::::::::::::::::::::::::::::::::::::::: |     7770 /     7770 |:  task again
 100.00% :::::::::::::::::::::::::::::::::::::::: |    18431 /    18431 |:  yet another task
 100.00% :::::::::::::::::::::::::::::::::::::::: |    25641 /    25641 |:  more task
 100.00% :::::::::::::::::::::::::::::::::::::::: |    74669 /    74669 |:  task
 100.00% :::::::::::::::::::::::::::::::::::::::: |    87688 /    87688 |:  another task

The results are sorted in the original order regardless of the order in which the tasks have finished.

print(results)
['result1', 'result2', 'result3', 'result4', 'result5', 'result6']

Features

Without the with statement

end()

If you don't use the with statement, you need to call end().

mcore = mantichora()

mcore.run(task_loop, 'task', ret='result1')
mcore.run(task_loop, 'another task', ret='result2')
mcore.run(task_loop, 'still another task', ret='result3')
mcore.run(task_loop, 'yet another task', ret='result4')
mcore.run(task_loop, 'task again', ret='result5')
mcore.run(task_loop, 'more task', ret='result6')

results = mcore.returns()

mcore.end()
print(results)
 100.00% :::::::::::::::::::::::::::::::::::::::: |     4695 /     4695 |:  yet another task
 100.00% :::::::::::::::::::::::::::::::::::::::: |     7535 /     7535 |:  still another task
 100.00% :::::::::::::::::::::::::::::::::::::::: |     9303 /     9303 |:  another task
 100.00% :::::::::::::::::::::::::::::::::::::::: |     9380 /     9380 |:  task
 100.00% :::::::::::::::::::::::::::::::::::::::: |     5812 /     5812 |:  more task
 100.00% :::::::::::::::::::::::::::::::::::::::: |     9437 /     9437 |:  task again
['result1', 'result2', 'result3', 'result4', 'result5', 'result6']
terminate()

mantichora can be terminated with terminate(). After terminate() is called, end() still needs to be called. In the example below, terminate() is called after 0.5 seconds of sleep while some tasks are still running.

mcore = mantichora()

mcore.run(task_loop, 'task', ret='result1')
mcore.run(task_loop, 'another task', ret='result2')
mcore.run(task_loop, 'still another task', ret='result3')
mcore.run(task_loop, 'yet another task', ret='result4')
mcore.run(task_loop, 'task again', ret='result5')
mcore.run(task_loop, 'more task', ret='result6')

time.sleep(0.5)

mcore.terminate()
mcore.end()

The progress bars stop when the tasks are terminated.

 100.00% :::::::::::::::::::::::::::::::::::::::: |     2402 /     2402 |:  still another task
 100.00% :::::::::::::::::::::::::::::::::::::::: |     3066 /     3066 |:  another task
  59.28% :::::::::::::::::::::::                  |     2901 /     4894 |:  task
  69.24% :::::::::::::::::::::::::::              |     2919 /     4216 |:  yet another task
   0.00%                                          |        0 /     9552 |:  task again
   0.00%                                          |        0 /     4898 |:  more task

Receive results as tasks finish

Instead of waiting for all tasks to finish beofre receiving the reulsts, you can get results as tasks finish with the method receive_one() or receive_receive().

receive_one()

The method receive_one() returns a pair of the run ID and the return value of a task function. If no task has finished, receive_one() waits until one task finishes. receive_one() returns None if no tasks are outstanding. The method run() returns the run ID for the task.

with mantichora() as mcore:
    runids = [ ]
    runids.append(mcore.run(task_loop, 'task1', ret='result1'))
    runids.append(mcore.run(task_loop, 'task2', ret='result2'))
    runids.append(mcore.run(task_loop, 'task3', ret='result3'))
    runids.append(mcore.run(task_loop, 'task4', ret='result4'))
    runids.append(mcore.run(task_loop, 'task5', ret='result5'))
    runids.append(mcore.run(task_loop, 'task6', ret='result6'))
    #
    pairs = [ ]
    for i in range(len(runids)):
        pairs.append(mcore.receive_one())
 100.00% :::::::::::::::::::::::::::::::::::::::: |     1748 /     1748 |:  task3
 100.00% :::::::::::::::::::::::::::::::::::::::: |     4061 /     4061 |:  task1
 100.00% :::::::::::::::::::::::::::::::::::::::: |     2501 /     2501 |:  task5
 100.00% :::::::::::::::::::::::::::::::::::::::: |     2028 /     2028 |:  task6
 100.00% :::::::::::::::::::::::::::::::::::::::: |     8206 /     8206 |:  task4
 100.00% :::::::::::::::::::::::::::::::::::::::: |     9157 /     9157 |:  task2

The runid is the list of the run IDs in the order of the tasks that have been given to run().

print(runids)
[0, 1, 2, 3, 4, 5]

The pairs are in the order in which the tasks have finished.

print(pairs)
[(2, 'result3'), (0, 'result1'), (4, 'result5'), (5, 'result6'), (3, 'result4'), (1, 'result2')]
receive_finished()

The method receive_finished() returns a list of pairs of the run ID and the return value of finished task functions. The method receive_finished() doesn't wait for a task to finish. It returns an empty list if no task has finished.

with mantichora() as mcore:
    runids = [ ]
    runids.append(mcore.run(task_loop, 'task1', ret='result1'))
    runids.append(mcore.run(task_loop, 'task2', ret='result2'))
    runids.append(mcore.run(task_loop, 'task3', ret='result3'))
    runids.append(mcore.run(task_loop, 'task4', ret='result4'))
    runids.append(mcore.run(task_loop, 'task5', ret='result5'))
    runids.append(mcore.run(task_loop, 'task6', ret='result6'))
    #
    pairs = [ ]
    while len(pairs) < len(runids):
        pairs.extend(mcore.receive_finished())
 100.00% :::::::::::::::::::::::::::::::::::::::: |     3979 /     3979 |:  task3
 100.00% :::::::::::::::::::::::::::::::::::::::: |     6243 /     6243 |:  task2
 100.00% :::::::::::::::::::::::::::::::::::::::: |     6640 /     6640 |:  task1
 100.00% :::::::::::::::::::::::::::::::::::::::: |     8632 /     8632 |:  task4
 100.00% :::::::::::::::::::::::::::::::::::::::: |     6235 /     6235 |:  task5
 100.00% :::::::::::::::::::::::::::::::::::::::: |     8325 /     8325 |:  task6

The runid is again the list of the run IDs in the order of the tasks that have been given to run().

print(runids)
[0, 1, 2, 3, 4, 5]

The pairs are also again in the order in which the tasks have finished.

print(pairs)
[(2, 'result3'), (1, 'result2'), (0, 'result1'), (3, 'result4'), (4, 'result5'), (5, 'result6')]

Logging

Logging in background processes is propagated to the main process in the way described in a section of Logging Cookbook.

Here is a simple example task function that uses logging. The task function does logging just before returning.

import logging

def task_log(name, ret=None):
    n = random.randint(1000, 10000)
    for i in atpbar(range(n), name=name):
        time.sleep(0.0001)
    logging.info('finishing "{}"'.format(name))
    return ret

Set the logging stream to a string stream so that we can later retrieve the logging as a string.

import io
stream = io.StringIO()
logging.basicConfig(level=logging.INFO, stream=stream)

Run the tasks.

with mantichora() as mcore:
    mcore.run(task_log, 'task1', ret='result1')
    mcore.run(task_log, 'task2', ret='result2')
    mcore.run(task_log, 'task3', ret='result3')
    mcore.run(task_log, 'task4', ret='result4')
    results = mcore.returns()
 100.00% :::::::::::::::::::::::::::::::::::::::: |     4217 /     4217 |:  task2
 100.00% :::::::::::::::::::::::::::::::::::::::: |     7691 /     7691 |:  task3
 100.00% :::::::::::::::::::::::::::::::::::::::: |     8140 /     8140 |:  task1
 100.00% :::::::::::::::::::::::::::::::::::::::: |     9814 /     9814 |:  task4

Logging made in the task function in background processes is sent to the main process and written in the string stream.

print(stream.getvalue())
INFO:root:finishing "task2"
INFO:root:finishing "task3"
INFO:root:finishing "task1"
INFO:root:finishing "task4"

License

  • mantichora is licensed under the BSD license.

Contact

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mantichora-0.9.5.tar.gz (30.0 kB view details)

Uploaded Source

Built Distribution

mantichora-0.9.5-py2.py3-none-any.whl (11.1 kB view details)

Uploaded Python 2 Python 3

File details

Details for the file mantichora-0.9.5.tar.gz.

File metadata

  • Download URL: mantichora-0.9.5.tar.gz
  • Upload date:
  • Size: 30.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.13.0 pkginfo/1.5.0.1 requests/2.21.0 setuptools/40.8.0 requests-toolbelt/0.9.1 tqdm/4.31.1 CPython/3.6.8

File hashes

Hashes for mantichora-0.9.5.tar.gz
Algorithm Hash digest
SHA256 ffd02503e06d23b1a700bf4350d9b0894cefa4556bd5b6f952abb25b79a89012
MD5 c2030ed29c01c4acf1d04a30e73ae431
BLAKE2b-256 01fde7e1e0582686222386d403ffd847f0cd0e0644af4fc8492ff41e2db17024

See more details on using hashes here.

File details

Details for the file mantichora-0.9.5-py2.py3-none-any.whl.

File metadata

  • Download URL: mantichora-0.9.5-py2.py3-none-any.whl
  • Upload date:
  • Size: 11.1 kB
  • Tags: Python 2, Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.13.0 pkginfo/1.5.0.1 requests/2.21.0 setuptools/40.8.0 requests-toolbelt/0.9.1 tqdm/4.31.1 CPython/3.6.8

File hashes

Hashes for mantichora-0.9.5-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 b89a1337c7a344ac7588f5a55e25103863b6a324e80b65f78498dd768b9190a8
MD5 47e5d3997bf8e0fdd96042199464d987
BLAKE2b-256 c218642ff4a8cffa5504ce77c180f53e7e8b34f46e92ee5de29b1d96d5903de5

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page