Polymer
Project description
Summary
A simple framework to run tasks in parallel. It’s similar to multiprocessing.Pool, but has a few enhancements over that. For example, mp.Pool is only useful for multiprocessing functions (not objects). You can wrap a function around the object, but it’s nicer just to deal with task objects themselves.
Polymer is mostly useful for its Worker error logging and run-time statistics. It also restarts crashed multiprocessing workers automatically (not true with multiprocessing.Pool). When a worker crashes, Polymer knows what the worker was doing and resubmits that task as well. This definitely is not fool-proof; however, it’s a helpful feature.
Usage
import time
from polymer.Polymer import ControllerQueue, TaskMgr
from polymer.abc_task import BaseTask
class SimpleTask(BaseTask):
def __init__(self, text="", wait=0.0):
super(SimpleTask, self).__init__()
self.text = text
self.wait = wait
def run(self):
"""run() is where all the work is done; this is called by TaskMgr()"""
## WARNING... using try / except in run() could squash Polymer's
## internal error logging...
time.sleep(float(self.wait/10))
print self.text
def __eq__(self, other):
"""Define how tasks are uniquely identified"""
if other.text==self.text:
return True
return False
def __repr__(self):
return """<{0}, wait: {1}>""".format(self.text, self.wait)
def Controller():
"""Controller() builds a list of tasks, and queues them to the TaskMgr
There is nothing special about the name Controller()... it's just some
code to build a list of SimpleTask() instances."""
tasks = list()
## Build ten tasks... do *not* depend on execution order...
num_tasks = 10
for ii in range(0, num_tasks):
tasks.append(SimpleTask(text="Task {0}".format(ii), wait=ii))
targs = {
'work_todo': tasks, # a list of SimpleTask() instances
'hot_loop': False, # If True, continuously loop over the tasks
'log_level': 0, # Logging off (debugging=3)
'worker_count': 3, # Number of workers (default: 5)
'resubmit_on_error': False, # Do not retry errored jobs...
'queue': ControllerQueue(),
'worker_cycle_sleep': 0.001, # Worker sleep time after a task
'log_stdout': False, # Don't log to stdout (default: True)
'log_path': "taskmgr.log", # Log file name
'log_interval': 10, # Statistics logging interval
}
## task_mgr reads and executes the queued tasks
task_mgr = TaskMgr(**targs)
## a set() of completed task objects are returned after supervise()
results = task_mgr.supervise()
return results
if __name__=='__main__':
Controller()
License
GPLv3
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for polymer-0.0.27-py2.py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 20553525c1605a72e4f87b5177be628095d6abc19c78581cc3abfa99fc550893 |
|
MD5 | dc1754e2a777039b1921183a699d6832 |
|
BLAKE2b-256 | 30305fb289a6b72bfa55fac468e5e2f6cac6aaf28da05b0337f7167a886ef318 |