A simple synchronous job runner for parallel processing tasks in Python.
Project description
spychronous
A simple synchronous job runner for parallel processing tasks.
spychronous in action
TL;DR Here's a quick example:
from spychronous import SynchronousJob as Job
def get_plus_one(num):
return num + 1
nums = [1, 2, 3]
plus_one_job = Job(func=get_plus_one, items=nums)
print plus_one_job.run_multi_processed()
# OUTPUT
# [2, 3, 4]
Working with spychronous
Say you've written a function that will be repeatedly called to transform a list:
>>> def get_plus_one(item):
... return item + 1
...
>>> items = [1, 2, 3]
>>> for item in items:
... get_plus_one(item)
...
2
3
4
Using a spychronous Job
instead, you can parallel process the list-transformation (and repeatedly apply your function to each item in your list):
>>> from spychronous import SynchronousJob as Job
>>> plus_one_job = Job(func=get_plus_one, items=items)
>>> plus_one_job.run_multi_processed()
[2, 3, 4]
Now imagine your function changes to require more arguments than simply an item from your list, like a multiplier
:
>>> from time import sleep
>>> from random import random
>>>
>>> def get_number(item, multiplier):
... sleep(random()*5) # your function executes at variable speed
... return item * multiplier
Notice the first parameter will hold a single item
from Job
's items
and additional parameters (like multiplier
) are listed in the signature afterwards.
Job
accounts for this by using args
:
>>> multiplier = 2
>>> additional_function_args = [multiplier]
>>> numbers_job = Job(func=get_number, items=items, args=additional_function_args)
>>> numbers_job.run_multi_processed()
[8, 4, 6]
Notice there's no guaranteed order of the output. See Coupling output to input for tips on working with this behavior.
Important Configurable Features
If you need to set process pool size, set worker timeouts, handle Ctrl-C, or run your job single-processed (to debug, for instance), the following features come in handy.
Process Pool Size
Job(...processes=20) # default is 4
Worker Timeouts
minutes = 60
Job(...timeout=5*minutes)
Non-Daemonic Process Pool
The default multiprocessing.pool.Pool
disallows spawning processes within processes. This can be properly circumvented with the no_daemon
attribute.
Job(...no_daemon=True)
Child-Exception Suppression
If you don't want a Job
's children (i.e. workers) to die if another raises an exception i.e. you want remaining items in your list to be processed, you can suppress those exceptions and log them instead.
Job(...suppress_worker_exceptions=True)
Predictable Ctrl-C Handling
Job
also handles SIGINT
gracefully by intentionally killing its workers ASAP, and then killing itself afterwards:
>>> from time import sleep, strftime, gmtime
>>> from random import random
>>> def print_item(item):
... sleep(random()*50)
... print strftime("%H:%M:%S", gmtime()), item
...
>>> printing_job = Job(func=print_item, items=items)
>>> items = [1, 2, 3]
>>> # GOAL: print the following: 1) item and item's timestamp 2) Ctrl-C's timestamp
>>> try:
... printing_job.run_multi_processed()
... finally:
... from time import gmtime, strftime, sleep
... print '', strftime("%H:%M:%S", gmtime()), 'user issued Ctrl-C'
...
20:39:50 3
20:39:52 1
^C 20:41:53 user issued Ctrl-C
Traceback (most recent call last):
... Your typical stacktrace here ...
KeyboardInterrupt
Run Job with Single Process (for dev-ing, debugging, etc.)
When you plan to parallelize a job, it's helpful to develop with single-processed job execution first (and debug likewise) and then switch to multi-processed job execution when you're ready. The spychronous Job
can facilitate this with the run_single_processed
instance method.
Utilizing run_single_processed: A Use Case...
- The following example illustrates the aforementioned proposal for development, debugging, and deployment with
run_single_processed
. - Illustrated using 3 different iterations of the same program:
# 1st iteration: Development
from spychronous import SynchronousJob as Job
def get_plus_one(num):
return num + 1/0
nums = [1, 2, 3]
job = Job(func=get_plus_one, items=nums)
job.run_single_processed()
# OUTPUT
# Traceback (most recent call last):
# File "<stdin>", line 1, in <module>
# File "spychronous.py", line 33, in run_single_processed
# raise e
# ZeroDivisionError: integer division or modulo by zero
# 2nd iteration: Debugging
from spychronous import SynchronousJob as Job
def get_plus_one(num):
import pdb;pdb.set_trace()
return num + 1/0
nums = [1, 2, 3]
job = Job(func=get_plus_one, items=nums)
job.run_single_processed()
# OUTPUT
# <stdin>(5)get_plus_one()
# (Pdb) print item
# 3
# 3rd iteration: Multiprocessing the Job
from spychronous import SynchronousJob as Job
def get_plus_one(num):
return num
nums = [1, 2, 3]
job = Job(func=get_plus_one, items=nums)
job.run_multi_processed() # notice method name changed from 'single' to 'multi'
# OUTPUT
# [1, 3, 2]
Working with instance methods
In order to call an instance method on a list of objects, simply wrap the instance method call in a trivial method:
>>> class Cat(object):
... def __init__(self, name):
... self.name = name
... def meow(self):
... print 'meow, my name is', self.name
...
>>> def make_cat_meow(cat):
... cat.meow()
...
>>> dave = Cat('Dave')
>>> meow_job = Job(func=make_cat_meow, items=[dave])
>>> meow_job.run_multi_processed()
meow, my name is Dave
Coupling output to input
In order to preserve a relationship between input and output, simply wrap your function in one that couples the IO:
>>> def get_num_times_2(num):
... return num * 2
...
>>> def double_num(num): # this is the coupling function.
... return (num, get_num_times_2(num))
...
>>> doubling_job = Job(func=double_num, items=[5])
>>> doubling_job.run_multi_processed()
[(5, 10)]
TODO: AsynchronousJob, Thread Pools
The next step for spychronous is an asynchrounous job runner. This is being developed in the develop branch. After that, thread pools will be implemented as an alternative to process pools.
Why spychronous?
I made spychronous because I wanted a clean out-of-the-box solution to quickly replace loops that I wanted to parallel process. I wanted to hide burdensome configuration and process management from the user. I wanted a solution that would gracefully handle SIGINT
.
Lastly, it was a fun programming exercise for me.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file spychronous-1.0.post4.tar.gz
.
File metadata
- Download URL: spychronous-1.0.post4.tar.gz
- Upload date:
- Size: 10.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/1.13.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/40.8.0 requests-toolbelt/0.9.1 tqdm/4.32.1 CPython/2.7.10
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 196cab6fba098d83bed36eb6f226427909837db7a69f50ad66611dfb60862905 |
|
MD5 | cd4488af663c2e1612c25caa76bb0a10 |
|
BLAKE2b-256 | f6cfeb1ea055cf872967e22eaa5a38381f59112f3bdffc2b24ea89f2c6bbe330 |
File details
Details for the file spychronous-1.0.post4-py2-none-any.whl
.
File metadata
- Download URL: spychronous-1.0.post4-py2-none-any.whl
- Upload date:
- Size: 7.6 kB
- Tags: Python 2
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/1.13.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/40.8.0 requests-toolbelt/0.9.1 tqdm/4.32.1 CPython/2.7.10
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | fa53ca100e0b425e3c1175df891a6e109ab0c4380207bc9598b9c03e2b9f6153 |
|
MD5 | a855a2c77b9988645a09570d47e243aa |
|
BLAKE2b-256 | 0e2fbfc0a2218d138fe1acd4e49f1ee9d2a31c5f986421237fc3a337c88594d0 |