Skip to main content

Produce a plan that dispatches calls based on a graph of functions, satisfying data dependencies.

Project description

About schedula

schedula is a dynamic flow-based programming environment for python, that handles automatically the control flow of the program. The control flow generally is represented by a Directed Acyclic Graph (DAG), where nodes are the operations/functions to be executed and edges are the dependencies between them.

The algorithm of schedula dates back to 2014, when a colleague asked for a method to automatically populate the missing data of a database. The imputation method chosen to complete the database was a system of interdependent physical formulas - i.e., the inputs of a formula are the outputs of other formulas. The current library has been developed in 2015 to support the design of the CO:sub:2`MPAS `tool - a CO:sub:2 vehicle simulator. During the developing phase, the physical formulas (more than 700) were known on the contrary of the software inputs and outputs.

Why schedula?

The design of flow-based programs begins with the definition of the control flow graph, and implicitly of its inputs and outputs. If the program accepts multiple combinations of inputs and outputs, you have to design and code all control flow graphs. With normal schedulers, it can be very demanding.

While with schedula, giving whatever set of inputs, it automatically calculates any of the desired computable outputs, choosing the most appropriate DAG from the dataflow execution model.

Note: The DAG is determined at runtime and it is extracted using the

shortest path from the provided inputs. The path is calculated based on a weighted directed graph (dataflow execution model) with a modified Dijkstra algorithm.

schedula makes the code easy to debug, to optimize, and to present it to a non-IT audience through its interactive graphs and charts. It provides the option to run a model asynchronously or in parallel managing automatically the Global Interpreter Lock (GIL), and to convert a model into a web API service.

Installation

To install it use (with root privileges):

$ pip install schedula

or download the last git version and use (with root privileges):

$ python setup.py install

Install extras

Some additional functionality is enabled installing the following extras:

  • io: enables to read/write functions.

  • plot: enables the plot of the Dispatcher model and workflow (see plot()).

  • web: enables to build a dispatcher Flask app (see web()).

  • sphinx: enables the sphinx extension directives (i.e., autosummary and dispatcher).

  • parallel: enables the parallel execution of Dispatcher model.

To install schedula and all extras, do:

$ pip install 'schedula[all]'
Note: plot extra requires Graphviz. Make sure that the directory

containing the dot executable is on your systems’ path. If you have not you can install it from its download page.

Tutorial

Let’s assume that we want develop a tool to automatically manage the symmetric cryptography. The base idea is to open a file, read its content, encrypt or decrypt the data and then write them out to a new file. This tutorial shows how to:

  1. define and execute a dataflow execution model,

  2. extract a sub-model, and

  3. deploy a web API service.

Note: You can find more examples, on how to use the schedula library,

into the folder examples.

Model definition

First of all we start defining an empty Dispatcher named symmetric_cryptography that defines the dataflow execution model:

>>> import schedula as sh
>>> dsp = sh.Dispatcher(name='symmetric_cryptography')

There are two main ways to get a key, we can either generate a new one or use one that has previously been generated. Hence, we can define three functions to simply generate, save, and load the key. To automatically populate the model inheriting the arguments names, we can use the decorator add_function() as follow:

>>> import os.path as osp
>>> from cryptography.fernet import Fernet
>>> @sh.add_function(dsp, outputs=['key'], weight=2)
... def generate_key():
...     return Fernet.generate_key().decode()
>>> @sh.add_function(dsp)
... def write_key(key_fpath, key):
...     with open(key_fpath, 'w') as f:
...         f.write(key)
>>> @sh.add_function(dsp, outputs=['key'], input_domain=osp.isfile)
... def read_key(key_fpath):
...     with open(key_fpath) as f:
...         return f.read()
Note: Since Python does not come with anything that can encrypt/decrypt

files, in this tutorial, we use a third party module named cryptography. To install it execute pip install cryptography.

To encrypt/decrypt a message, you will need a key as previously defined and your data encrypted or decrypted. Therefore, we can define two functions and add them, as before, to the model:

>>> @sh.add_function(dsp, outputs=['encrypted'])
... def encrypt_message(key, decrypted):
...     return Fernet(key.encode()).encrypt(decrypted.encode()).decode()
>>> @sh.add_function(dsp, outputs=['decrypted'])
... def decrypt_message(key, encrypted):
...     return Fernet(key.encode()).decrypt(encrypted.encode()).decode()

Finally, to read and write the encrypted or decrypted message, according to the functional programming philosophy, we can reuse the previously defined functions read_key and write_key changing the model mapping (i.e., function_id, inputs, and outputs). To add to the model, we can simply use the add_function method as follow:

>>> dsp.add_function(
...     function_id='read_decrypted',
...     function=read_key,
...     inputs=['decrypted_fpath'],
...     outputs=['decrypted']
... )
'read_decrypted'
>>> dsp.add_function(
...     'read_encrypted', read_key, ['encrypted_fpath'], ['encrypted'],
...     input_domain=osp.isfile
... )
'read_encrypted'
>>> dsp.add_function(
...     'write_decrypted', write_key, ['decrypted_fpath', 'decrypted'],
...     input_domain=osp.isfile
... )
'write_decrypted'
>>> dsp.add_function(
...     'write_encrypted', write_key, ['encrypted_fpath', 'encrypted']
... )
'write_encrypted'
Note: For more details on how to create a Dispatcher see:

add_data(), add_func(), add_function(), add_dispatcher(), SubDispatch, MapDispatch, SubDispatchFunction, SubDispatchPipe, and DispatchPipe.

To inspect and visualize the dataflow execution model, you can simply plot the graph as follow:

>>> dsp.plot()

[graph]

Tip: You can explore the diagram by clicking on it.

Dispatching

To see the dataflow execution model in action and its workflow to generate a key, to encrypt a message, and to write the encrypt data, you can simply invoke dispatch() or __call__() methods of the dsp:

>>> import tempfile
>>> tempdir = tempfile.mkdtemp()
>>> message = "secret message"
>>> sol = dsp(inputs=dict(
...     decrypted=message,
...     encrypted_fpath=osp.join(tempdir, 'data.secret'),
...     key_fpath=osp.join(tempdir,'key.key')
... ))
>>> sol.plot(index=True)

[graph]

Note: As you can see from the workflow graph (orange nodes), when some

function’s inputs does not respect its domain, the Dispatcher automatically finds an alternative path to estimate all computable outputs. The same logic applies when there is a function failure.

Now to decrypt the data and verify the message without saving the decrypted message, you just need to execute again the dsp changing the inputs and setting the desired outputs. In this way, the dispatcher automatically selects and executes only a sub-part of the dataflow execution model.

>>> dsp(
...     inputs=sh.selector(('encrypted_fpath', 'key_fpath'), sol),
...     outputs=['decrypted']
... )['decrypted'] == message
True

If you want to visualize the latest workflow of the dispatcher, you can use the plot() method with the keyword workflow=True:

>>> dsp.plot(workflow=True, index=True)

[graph]

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

schedula-1.5.31.tar.gz (5.7 MB view details)

Uploaded Source

Built Distribution

schedula-1.5.31-py2.py3-none-any.whl (5.9 MB view details)

Uploaded Python 2 Python 3

File details

Details for the file schedula-1.5.31.tar.gz.

File metadata

  • Download URL: schedula-1.5.31.tar.gz
  • Upload date:
  • Size: 5.7 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.11.0

File hashes

Hashes for schedula-1.5.31.tar.gz
Algorithm Hash digest
SHA256 9939a72133e9343d37c67dc6d347a1265e799a3b0558c87588d85ae4f501a72d
MD5 beb29cbfa7e40577f005e24a51715f89
BLAKE2b-256 ae32bc6beb0a4627868ebd4c02b03c6c8aec18f8800cddf40f9b6d4c8f2d0301

See more details on using hashes here.

File details

Details for the file schedula-1.5.31-py2.py3-none-any.whl.

File metadata

  • Download URL: schedula-1.5.31-py2.py3-none-any.whl
  • Upload date:
  • Size: 5.9 MB
  • Tags: Python 2, Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.11.0

File hashes

Hashes for schedula-1.5.31-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 54fc118ba4cd5e1f53c7fbfc299bdcfa5eb399a2089a4650a0d884a11465c3d0
MD5 9913eaaeb9edbc89397bfc220aa6682b
BLAKE2b-256 f0173eaee9dfb5670f6b2bdc296ff54833a25d1271cc32e7abc994480351b6e5

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page