pacer is a lightweight Python package for implementing distributed data processing workflows.
Project description
pacer
About
pacer is a lightweight Python package for implementing distributed data processing workflows. Instead of defining a DAG which models the data flow from sources to a final result pacer uses a pull model which is very similar to nesting function calls. Running such a workflow starts on the result node and recursively delegates work to the inputs.
Originally we developed pacer for running analysis pipelines in emzed, a framework for analyzing LCMS data.
How does pacer work ?
Under the hood pacer has two core components:
one for managing distributed computations of chained computations:
Processing steps in pacer are just Python functions with some additional annotations. pacer tries to compute as many processing steps as possible in parallel, either because such a function has to be applied to different data sets, or it has more than one input and those are computed concurrently.
a distributed cache which is retained on the file system
In case of partial modifications of the inputs a pacer workflow does not determine needed update computations but uses a distributed cache for mapping the input values of single processing steps to their final result. So a repeated run of the workflow with unchanged inputs will run the full workflow with all processing steps returning already known results immediately. Running the workflow with unknown or modified inputs will only execute the needed computations and update the cache.
These two components are independent and can be used seperately.
Examples
We provide some simple examples which show how easy it is to use pacer. You find these examples which we extended to print more logging information in the examples/ folder in the git repository.
In a real world LCMS workflow we would not use as simple functions as used below but longer running computation steps such as running a LCMS peak picker and a subsequent peak aligner.
How to declare a pipeline
In this case our input sources are a list of Python strings ["a", "bc", "def"] and a tuple of numbers (1, 2). The very simple example workflow computes the length of each string and multiplies it with every number from the tuple. This very simple example could be implemented in pure Python as follows:
import itertools def main(): def length(what): return len(what) def multiply(a, b): return a * b words = ["a", "bc", "def"] multipliers = (1, 2) result = [multiply(length(w), v) for (w, v) in itertools.product(words, multipliers)] assert result == [1, 2, 2, 4, 3, 6] if __name__ == "__main__": main()
In order to transform this computations to a smart parallel processing pipeline we use the apply and join function decorators from pacer and declare the dependencies among the single steps using function calls.
from pacer import apply, join, Engine def main(): @apply def length(what): return len(what) @join def multiply(a, b): return a * b words = ["a", "bc", "def"] multipliers = (1, 2) # now we DECLARE the workflow (no execution at that time): workflow = multiply(length(words), multipliers) if __name__ == "__main__": main()
Running this workflow on three CPU cores is easy now. In this case the computation steps are run in parallel:
Engine.set_number_of_processes(3) workflow.start_computations() result = workflow.get_all_in_order() assert result == [1, 2, 2, 4, 3, 6]
pacers approach to compute needed updates in case of modified input data
As already stated above, pacer does not determine needed update computations in case of modified input data but uses a distributed cache instead. So running a workflow a second time will fetch the already known results of computations not affected by changes, and start computations with unknown input arguments.
We use decorators again. Leveraging the example above only needs few adjustments:
from pacer import apply, join, Engine, CacheBuilder cache = CacheBuiler("/tmp/cache_000") @apply @cache def length(what): return len(what) @join @cache def multiply(a, b): return a * b # inputs to workflow words = ["a", "bc", "def"] multipliers = (1, 2) workflow = multiply(length(words), multipliers) # run workflow Engine.set_number_of_processes(3) workflow.start_computations() result = workflow.get_all_in_order() assert result == [1, 2, 2, 4, 3, 6]
If you run these examples from a command line you see logging results showing the parallel execution of single steps and cache hits avoiding recomputations.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.