Distributed/parallel computing in modern Python based on the multiprocessing.Pool API (map, imap, imap_unordered).
Project description
achilles
Distributed/parallel computing in modern Python based on the multiprocessing.Pool
API (map
, imap
, imap_unordered
).
What/why is it?
The purpose of achilles
is to make distributed/parallel computing as easy as possible by limiting the required configuration, hiding the details (server/node/controller architecture) and exposing a simple interface based on the popular multiprocessing.Pool
API.
achilles
provides developers with entry-level capabilities for concurrency across a network of machines (see PEP 372 on the intent behind addingmultiprocessing
to the standard library -> https://www.python.org/dev/peps/pep-0371/) using a server/node/controller architecture.
The achilles_server
, achilles_node
and achilles_controller
are designed to run cross-platform/cross-architecture. The server/node/controller may be hosted on a single machine (for development) or deployed across heterogeneous resources.
achilles
is comparable to excellent Python packages like pathos/pyina
, Parallel Python
and SCOOP
, but different in certain ways:
- Designed for developers familiar with the
multiprocessing
module in the standard library with simplicity and ease of use in mind. - In addition to the blocking
map
API which requires that developers wait for all computation to be finished before accessing results (common in such packages),imap
/imap_unordered
allow developers to process results as they are returned to theachilles_controller
by theachilles_server
. achilles
allows for composable scalability and novel design patterns as:- Iterables including lists, lists of lists and generator functions (as first-class object - generator expressions will not work as generators cannot be serialized by
pickle
/dill
) are accepted as arguments.- TIP: Use generator functions together with
imap
orimap_unordered
to perform distributed computation on arbitrarily large data.
- TIP: Use generator functions together with
- The
dill
serializer is used to transfer data between the server/node/controller andmultiprocess
(fork ofmultiprocessing
that uses thedill
serializer instead ofpickle
) is used to performPool.map
on theachilles_nodes
, so developers are freed from some of the constraints of thepickle
serializer.
- Iterables including lists, lists of lists and generator functions (as first-class object - generator expressions will not work as generators cannot be serialized by
Install
pip install achilles
Quick Start
Start an achilles_server
listening for connections from achilles_nodes
at a certain endpoint specified as arguments or in an .env
file in the achilles
package's directory.
Then simply import map
, imap
, and/or imap_unordered
from achilles_main
and use them dynamically in your own code (under the hood they create and close achilles_controller
s).
map
, imap
and imap_unordered
will distribute your function to each achilles_node
connected to the achilles_server
. Then, the achilles_server
will distribute arguments to each achilles_node
(load balanced and made into a list of arguments if the arguments' type is not already a list) which will then perform your function on the arguments using multiprocess.Pool.map
.
Each achilles_node
finishes its work, returns the results to the achilles_server
and waits to receive another argument. This process is repeated until all of the arguments have been exhausted.
-
runAchillesServer(host=None, port=None, username=None, secret_key=None)
-> run on your local machine or on another machine connected to your networkin:
from achilles.lineReceiver.achilles_server import runAchillesServer # host = IP address of the achilles_server # port = port to listen on for connections from achilles_nodes (must be an int) # username, secret_key used for authentication with achilles_controller runAchillesServer(host='127.0.0.1', port=9999, username='foo', secret_key='bar')
# OR generate an .env file with a default configuration so that # arguments are no longer required to runAchillesServer() # use genConfig() to overwrite from achilles.lineReceiver.achilles_server import runAchillesServer, genConfig genConfig(host='127.0.0.1', port=9999, username='foo', secret_key='bar') runAchillesServer()
out:
ALERT: achilles_server initiated at 127.0.0.1:9999 Listening for connections...
-
runAchillesNode(host=None, port=None)
-> run on your local machine or on another machine connected to your networkin:
from achilles.lineReceiver.achilles_node import runAchillesNode # genConfig() is also available in achilles_node, but only expects host and port arguments runAchillesNode(host='127.0.0.1', port=9999)
out:
GREETING: Welcome! There are currently 1 open connections. Connected to achilles_server running at 127.0.0.1:9999 CLIENT_ID: 0
-
Examples of how to use the 3 most commonly used
multiprocessing.Pool
methods inachilles
:Note:
map
,imap
andimap_unordered
currently accept iterables including - but not limited - to lists, lists of lists, and generator functions asachilles_args
.Also note: if there isn't already a
.env
configuration file in theachilles
package directory, must usegenConfig(host, port, username, secret_key)
before using or includehost
,port
,username
andsecret_key
as arguments when usingmap
,imap
,imap_unordered
.-
map(func, args, callback=None, chunksize=1, host=None, port=None, username=None, secret_key=None)
in:
from achilles.lineReceiver.achilles_main import map def achilles_function(arg): return arg ** 2 def achilles_callback(result): return result ** 2 if __name__ == "__main__": results = map(achilles_function, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], achilles_callback, chunksize=1) print(results)
out:
ALERT: Connection to achilles_server at 127.0.0.1:9999 and authentication successful. [[1, 16, 81, 256, 625, 1296, 2401, 4096], [6561, 10000]]
-
imap(func, args, callback=None, chunksize=1, host=None, port=None, username=None, secret_key=None)
in:
from achilles.lineReceiver.achilles_main import imap def achilles_function(arg): return arg ** 2 def achilles_callback(result): return result ** 2 if __name__ == "__main__": for result in imap(achilles_function, [1,2,3,4,5,6,7,8,9,10], achilles_callback, chunksize=1): print(result)
out:
ALERT: Connection to achilles_server at 127.0.0.1:9999 and authentication successful. {'ARGS_COUNTER': 0, 'RESULT': [1, 16, 81, 256, 625, 1296, 2401, 4096]} {'ARGS_COUNTER': 8, 'RESULT': [6561, 10000]}
-
imap_unordered(func, args, callback=None, chunksize=1, host=None, port=None, username=None, secret_key=None)
in:
from achilles.lineReceiver.achilles_main import imap_unordered def achilles_function(arg): return arg ** 2 def achilles_callback(result): return result ** 2 if __name__ == "__main__": for result in imap_unordered(achilles_function, [1,2,3,4,5,6,7,8,9,10], achilles_callback, chunksize=1): print(result)
out:
ALERT: Connection to achilles_server at 127.0.0.1:9999 and authentication successful. {'ARGS_COUNTER': 8, 'RESULT': [6561, 10000]} {'ARGS_COUNTER': 0, 'RESULT': [1, 16, 81, 256, 625, 1296, 2401, 4096]}
-
How achilles
works
Under the hood
Twisted
- An event-driven networking engine written in Python and MIT licensed.
dill
dill
extends Python’spickle
module for serializing and de-serializing Python objects to the majority of the built-in Python types.
multiprocess
- multiprocess is a fork of multiprocessing that uses
dill
instead ofpickle
for serialization.multiprocessing
is a package for the Python language which supports the spawning of processes using the API of the standard library’s threading module.
- multiprocess is a fork of multiprocessing that uses
Examples
See the examples
directory for tutorials on various use cases, including:
- Square numbers/run multiple jobs sequentially
- Word count (TO DO)
How to kill cluster
from achilles.lineReceiver.achilles_main import killCluster
# simply use the killCluster() command and verify your intent at the prompt
# killCluster() will search for an .env configuration file in the achilles package's directory
# if it does not exist, specify host, port, username and secret_key as arguments
# a command is sent to all connected achilles_nodes to stop the Twisted reactor and exit() the process
# optionally, you can pass command_verified=True to proceed directly with killing the cluster
killCluster(command_verified=True)
Caveats/Things to know
achilles_node
s use all of the CPU cores available on the host machine to performmultiprocess.Pool.map
(pool = multiprocess.Pool(multiprocess.cpu_count())
).achilles
leaves it up to the developer to ensure that the correct packages are installed onachilles_node
s to perform the function distributed by theachilles_server
on behalf of theachilles_controller
. Current recommended solution is to SSH into each machine andpip install
arequirements.txt
file.- All import statements required by the developer's function, arguments and callback must be included in the definition of the function.
- The
achilles_server
is currently designed to handle one job at a time. For more complicated projects, I highly recommend checking outDask
(especiallydask.distributed
) and learning more about directed acyclic graphs (DAGs). - Fault tolerance: if some
achilles_node
disconnects before returning expected results, the argument will be distributed to anotherachilles_node
for computation instead of being lost. callback_error
argument has yet to be implemented, so detailed information regarding errors can only be gleaned from the interpreter used to launch theachilles_server
,achilles_node
orachilles_controller
. Deploying the server/node/controller on a single machine is recommended for development.achilles
performs load balancing at runtime and assignsachilles_node
s arguments bycpu_count
*chunksize
.- Default
chunksize
is 1. - Increasing the
chunksize
is an easy way to speed up computation and reduce the amount of time spent transferring data between the server/node/controller.
- Default
- If your arguments are already lists, the
chunksize
argument is not used.- Instead, one argument/list will be distributed to the connected
achilles_node
s at a time.
- Instead, one argument/list will be distributed to the connected
- If your arguments are load balanced, the results returned are contained in lists of length
achilles_node's cpu_count
*chunksize
.map
:- Final result of
map
is an ordered list of load balanced lists (the final result is not flattened).
- Final result of
imap
:- Results are returned as computation is finished in dictionaries that include the following keys:
RESULT
: load balanced list of results.ARGS_COUNTER
: index of first argument (0-indexed).
- Results are ordered.
- The first result will correspond to the next result after the last result in the preceding results packet's list of results.
- Likely to be slower than
immap_unordered
due toachilles_controller
yielding ordered results.imap_unordered
(see below) yields results as they are received, whileimap
yields results as they are received only if the argument'sARGS_COUNTER
is expected based on the length of theRESULT
list in the preceding results packet. Otherwise, aresult_buffer
is checked for the results packet with the expectedARGS_COUNTER
and the current results packet is added to theresult_buffer
. If it is not found,achilles_controller
will not yield results until a results packet with the expectedARGS_COUNTER
is received.
- Results are returned as computation is finished in dictionaries that include the following keys:
imap_unordered
:- Results are returned as computation is finished in dictionaries that include the following keys:
RESULT
: load balanced list of results.ARGS_COUNTER
: index of first argument (0-indexed).
- Results are not ordered.
- Results packets are yielded as they are received (after any
achilles_callback
has been performed on it). - Fastest way of consuming results received from the
achilles_server
.
- Results packets are yielded as they are received (after any
- Results are returned as computation is finished in dictionaries that include the following keys:
achilles
is in the early stages of active development and your suggestions/contributions are kindly welcomed.
achilles
is written and maintained by Alejandro Peña. Email me at adpena at gmail dot com.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for achilles-0.0.195-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 576f045ce38a0eb1817d1079b11ca7f1536f827e15578904d5e0ffa0323d78ce |
|
MD5 | 40b9ad925163b068b35f7f2e8bc895b5 |
|
BLAKE2b-256 | d671ce0e97639f1dc29653ab3ed63ea6d0a2d850b3cfe78c35c9516351ab2f1f |