Skip to main content

A lightweight, general purpose pipeline framework.

Project description

Documentation Status Codacy Badge Travis-CI Build Status Test-coverage PyPI Package latest release

A simplistic, general purpose pipeline framework, which can easily be integrated into existing (analysis) chains and workflows.

Installation

thepipe can be installed via pip:

pip install thepipe

Features

  • Easy to use interface and integration into existing workflows

  • Modules can be either subclasses of Module or bare python functions

  • Data is passed via a simple Python dictionary from module to module (wrapped in a class called Blob which adds some visual candy and error reporting)

  • Integrated hierarchical logging system

  • Colour coded log and print messages (self.log() and self.cprint() in Modules)

  • Performance statistics for the whole pipeline and each module individually

  • Clean exit when interrupting the pipeline with CTRL+C

The Pipeline

Here is a basic example how to create a pipeline, add some modules to it, pass some parameters and drain the pipeline.

Note that pipeline modules can either be vanilla (univariate) Python functions or Classes which derive from thepipe.Module.

import thepipe as tp


class AModule(tp.Module):
    def configure(self):
        self.cprint("Configuring AModule")
        self.max_count = self.get("max_count", default=23)
        self.index = 0

    def process(self, blob):
        self.cprint("This is cycle #%d" % self.index)
        blob['index'] = self.index
        self.index += 1

        if self.index > self.max_count:
            self.log.critical("That's enough...")
            raise StopIteration
        return blob

    def finish(self):
        self.cprint("I'm done!")


def a_function_based_module(blob):
    print("Here is the blob:")
    print(blob)
    return blob


pipe = tp.Pipeline()
pipe.attach(AModule, max_count=5)  # pass any parameters to the module
pipe.attach(a_function_based_module)
pipe.drain()  # without arguments it will drain until a StopIteration is raised

This will produce the following output:

++ AModule: Configuring AModule
Pipeline and module initialisation took 0.000s (CPU 0.000s).
++ AModule: This is cycle #0
Here is the blob:
Blob (1 entries):
 'index' => 0
++ AModule: This is cycle #1
Here is the blob:
Blob (1 entries):
 'index' => 1
++ AModule: This is cycle #2
Here is the blob:
Blob (1 entries):
 'index' => 2
++ AModule: This is cycle #3
Here is the blob:
Blob (1 entries):
 'index' => 3
++ AModule: This is cycle #4
Here is the blob:
Blob (1 entries):
 'index' => 4
++ AModule: This is cycle #5
CRITICAL ++ AModule: That's enough...
++ AModule: I'm done!
============================================================
5 cycles drained in 0.000793s (CPU 0.000793s). Memory peak: 20.56 MB
  wall  mean: 0.000063s  medi: 0.000057s  min: 0.000045s  max: 0.000106s  std: 0.000022s
  CPU   mean: 0.000065s  medi: 0.000057s  min: 0.000046s  max: 0.000112s  std: 0.000024s

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

thepipe-1.1.0.tar.gz (50.4 kB view details)

Uploaded Source

File details

Details for the file thepipe-1.1.0.tar.gz.

File metadata

  • Download URL: thepipe-1.1.0.tar.gz
  • Upload date:
  • Size: 50.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.22.0 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.40.0 CPython/3.7.4

File hashes

Hashes for thepipe-1.1.0.tar.gz
Algorithm Hash digest
SHA256 93c723a8ff7fcdde3691254b3c2e352d2bdb27a22cce4281da4abad07476abd1
MD5 f27fc097f6f38511d8c91e5774ed5990
BLAKE2b-256 d27b6f605f551781554880c76c48ab3bf114320ab6acc04f24a13830f6f79758

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page