Skip to main content

pacer is a lightweight Python package for implementing distributed data processing workflows.

Project description

pacer

About

pacer is a lightweight Python package for implementing distributed data processing workflows. Instead of defining a DAG which models the data flow from sources to a final result pacer uses a pull model which is very similar to nesting function calls. Running such a workflow starts on the result node and recursively delegates work to the inputs.

Originally we developed pacer for running analysis pipelines in emzed, a framework for analyzing LCMS data.

How does pacer work ?

Under the hood pacer has two core components:

  • one for managing distributed computations of chained computations:

    Processing steps in pacer are just Python functions with some additional annotations. pacer tries to compute as many processing steps as possible in parallel, either because such a function has to be applied to different data sets, or it has more than one input and those are computed concurrently.

  • a distributed cache which is retained on the file system

    In case of partial modifications of the inputs a pacer workflow does not determine needed update computations but uses a distributed cache for mapping the input values of single processing steps to their final result. So a repeated run of the workflow with unchanged inputs will run the full workflow with all processing steps returning already known results immediately. Running the workflow with unknown or modified inputs will only execute the needed computations and update the cache.

These two components are independent and can be used seperately.

Examples

We provide some simple examples which show how easy it is to use pacer. You find these examples which we extended to print more logging information in the examples/ folder in the git repository.

In a real world LCMS workflow we would not use as simple functions as used below but longer running computation steps such as running a LCMS peak picker and a subsequent peak aligner.

How to declare a pipeline

In this case our input sources are a list of Python strings ["a", "bc", "def"] and a tuple of numbers (1, 2). The very simple example workflow computes the length of each string and multiplies it with every number from the tuple. This very simple example could be implemented in pure Python as follows:

import itertools

def main():

    def length(what):
        return len(what)

    def multiply(a, b):
        return a * b

    words = ["a", "bc", "def"]
    multipliers = (1, 2)

    result = [multiply(length(w), v) for (w, v) in itertools.product(words, multipliers)]
    assert result == [1, 2, 2, 4, 3, 6]

if __name__ == "__main__":
    main()

In order to transform this computations to a smart parallel processing pipeline we use the apply and join function decorators from pacer and declare the dependencies among the single steps using function calls.

from pacer import apply, output, join, Engine

def main():

    @apply
    def length(what):
        return len(what)

    @output
    @join
    def multiply(a, b):
        return a * b

    words = ["a", "bc", "def"]
    multipliers = (1, 2)

    # now we DECLARE the workflow (no execution at that time):
    workflow = multiply(length(words), multipliers)

if __name__ == "__main__":
    main()

Running this workflow on three CPU cores is easy now. In this case the computation steps are run in parallel:

Engine.set_number_of_processes(3)
workflow.start_computations()
result = workflow.get_all_in_order()

assert result == [1, 2, 2, 4, 3, 6]

pacers approach to compute needed updates in case of modified input data

As already stated above, pacer does not determine needed update computations in case of modified input data but uses a distributed cache instead. So running a workflow a second time will fetch the already known results of computations not affected by changes, and start computations with unknown input arguments.

We use decorators again. Leveraging the example above only needs few adjustments:

from pacer import apply, join, output, Engine, CacheBuilder

cache = CacheBuiler("/tmp/cache_000")

@apply
@cache
def length(what):
    return len(what)

@output
@join
@cache
def multiply(a, b):
    return a * b

# inputs to workflow
words = ["a", "bc", "def"]
multipliers = (1, 2)

workflow = multiply(length(words), multipliers)

# run workflow
Engine.set_number_of_processes(3)
workflow.start_computations()
result = workflow.get_all_in_order()

assert result == [1, 2, 2, 4, 3, 6]

If you run these examples from a command line you see logging results showing the parallel execution of single steps and cache hits avoiding recomputations.

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pacer-0.19.3.tar.gz (13.0 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page