Skip to main content

A task scheduler for cyclic and acyclic graphs

Project description

pypushflow

A task scheduler for cyclic and acyclic graphs

Install

python -m pip install pypushflow[mx]

Use the mx option for installation at MX beamlines.

Run tests

python -m pip install pypushflow[test]
pytest

Getting started

import logging
from pypushflow.Workflow import Workflow
from pypushflow.StopActor import StopActor
from pypushflow.StartActor import StartActor
from pypushflow.PythonActor import PythonActor
from pypushflow.ThreadCounter import ThreadCounter


class MyWorkflow(Workflow):
    """
    Workflow containing one start actor,
    one python actor and one stop actor.
    """

    def __init__(self, name):
        super().__init__(name, level=logging.DEBUG)
        ctr = ThreadCounter(parent=self)
        self.startActor = StartActor(parent=self, thread_counter=ctr)
        self.pythonActor = PythonActor(
            parent=self,
            script="pypushflow.test.pythonActorTest.py",
            name="Python Actor Test",
            thread_counter=ctr,
        )
        self.stopActor = StopActor(parent=self, thread_counter=ctr)
        self.startActor.connect(self.pythonActor)
        self.pythonActor.connect(self.stopActor)


testMyWorkflow = MyWorkflow("Test workflow 1")
inData = {"name": "Jerry"}
outData = testMyWorkflow.run(inData, timeout=15, shared_pool=True)
assert outData["reply"] == "Hello Jerry!"

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pypushflow-0.3.0rc1.tar.gz (18.6 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page