Skip to main content

Simple data processing pipelines

Project description

Python package

Simple data processing pipelines

Create simple data pipelines with sink/source modules that can process or drop elements. Each pipeline step receives a dictionary of metadata+data for the element, and it can add/remove fields to the element, or terminate processing of the element.

An example pipeline would be a generator (the source) which reads image files from a directory, followed by a pipe that resizes each image, followed by a pipe that saves each image.

Stages

Each stage can contain a source, a sink, or both. Sources generate elements, while sinks process and optionally drop elements from the pipeline.

Source Stage

A source stage is defined by creating a subclass of 'Pipeline' class with a 'source' function at a minimum, as the example below.

class ArraySource(pipeline.Pipeline):
    def __init__(self, sink, arr):
        self.arr = arr
        super(ArraySource, self).__init__(sink)

    def source(self):
        for i in range(len(self.arr)):

            element = {
                "word_id": i,
                "word": self.arr[i]
            }

            yield (element)

Sink Stage

A sink stage is define by creating a subclass of 'Pipeline' class with a sink function at a minimum, as in the example below

class DropSmallWord(pipeline.Pipeline):
    def __init__(self, sink, min):
        self.min = min
        super(DropSmallWord, self).__init__(sink)

    def sink(self, element):
        if len(element['word']) < self.min:
            return None
        else:
            return element

Elements

Elements are the units of data passed through the processing pipeline. An element is a dictionary that can contain any number of fields. Both data and meta-data about the data unit can be contained in the element.

Creating the Pipeline

In order to create a pipeline, the stages are created and linked to each other, starting from the final stage and working back to the source, as follows:

    pw = PrintWord(None) # Save image
    ds = DropSmallWord(pw, 5)
    a = ArraySource(ds, words)

As can be seen, the final stage is initiated with None as sink, while all other stages receive their subsequent stage as sink

Examples

Look in the examples subdirectory for these examples

  • feeder.py: Feeds an array of words into a filter stage that drops small words, followed by a stage that prints the remaining words
  • resizer.py: Reads image files from the command-line, passes them through a resizer stage, followed by an image-save stage.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

procpipe-2020.10.tar.gz (2.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

procpipe-2020.10-py3-none-any.whl (3.7 kB view details)

Uploaded Python 3

File details

Details for the file procpipe-2020.10.tar.gz.

File metadata

  • Download URL: procpipe-2020.10.tar.gz
  • Upload date:
  • Size: 2.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.24.0 setuptools/49.2.1 requests-toolbelt/0.9.1 tqdm/4.50.2 CPython/3.9.0

File hashes

Hashes for procpipe-2020.10.tar.gz
Algorithm Hash digest
SHA256 919b34a33b421bbcdac196fb3049d40564ca9a4d9ec4c9be299ad1d8d3939c2d
MD5 cc419c3dbbb6392db9b1f27e82345fb4
BLAKE2b-256 f86a3c35dab82830eafefedd2739957de2c140ff1b2a3c6709044b9c35bc4d49

See more details on using hashes here.

File details

Details for the file procpipe-2020.10-py3-none-any.whl.

File metadata

  • Download URL: procpipe-2020.10-py3-none-any.whl
  • Upload date:
  • Size: 3.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.24.0 setuptools/49.2.1 requests-toolbelt/0.9.1 tqdm/4.50.2 CPython/3.9.0

File hashes

Hashes for procpipe-2020.10-py3-none-any.whl
Algorithm Hash digest
SHA256 761d2e12332f4293365d8ac383fe41a34794eb712e81dda1071dcc7dd8d3f3ca
MD5 d0c0f283cfa51cdb3ed9db98c2703c63
BLAKE2b-256 d644f6193d15fcb01d10bae9cc45a1bc4df7ee5aec65ca13a3bcae5587823e7e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page