A task scheduler for cyclic and acyclic graphs
Project description
pypushflow
A task scheduler for cyclic and acyclic graphs
Install
pip install pypushflow[mx]
Use the mx
option for installation at MX beamlines.
Run tests
pip install pypushflow[test]
pytest
Getting started
import logging
from pypushflow.Workflow import Workflow
from pypushflow.StopActor import StopActor
from pypushflow.StartActor import StartActor
from pypushflow.PythonActor import PythonActor
from pypushflow.ThreadCounter import ThreadCounter
class MyWorkflow(Workflow):
def __init__(self, name):
super().__init__(name, level=logging.DEBUG)
ctr = ThreadCounter(parent=self)
self.startActor = StartActor(parent=self, thread_counter=ctr)
self.pythonActor = PythonActor(
parent=self,
script="pypushflow.tests.tasks.pythonActorTest.py",
name="Python Actor Test",
thread_counter=ctr,
)
self.stopActor = StopActor(parent=self, thread_counter=ctr)
self.startActor.connect(self.pythonActor)
self.pythonActor.connect(self.stopActor)
testMyWorkflow = MyWorkflow("Test workflow")
inData = {"name": "World"}
outData = testMyWorkflow.run(inData, timeout=15, pool_type="process")
assert outData["reply"] == "Hello World!"
Documentation
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
pypushflow-0.6.2.tar.gz
(30.9 kB
view hashes)