Skip to main content

StreamPipe is a straightforward multi-worker pipeline for streaming data in Python.

Project description

StreamPipe

StreamPipe is a simple Python library (a single file!) for running multi-worker pipelines on infinite data streams. The best part about StreamPipe is that the order of data is preserved throughout the pipeline. You can chain order-independent workers (think of image processing functions) and order-dependent workers (think of an object video tracker) on the same StreamPipe!

Installation

pip3 install streampipe

Install from source:

git clone https://github.com/highvight/streampipe
cd streampipe
pip3 install .
pip3 install .[testing]  # for testing and benchmarks

Usage

Here is a starter. Let's check if you can speed up a reshaping and standardization pipeline for images.

import timeit
import numpy as np
from streampipe import StreamPipe

IMG = np.random.rand(1000, 1000, 3) * 255

def _reshape(img):
    return img[::2, ::2]

def _standardize(img):
    mean = np.mean(img, axis=(0, 1), keepdims=True)
    std = np.std(img, axis=(0, 1), keepdims=True)
    return (img - mean) / std

pipe = StreamPipe()
pipe.add(_reshape, n_workers=4, maxsize=4)
pipe.add(_standardize, n_workers=4, maxsize=4)

def measure_pipe():
    # Starts the workers
    with pipe:
        for _ in range(100):
            pipe.put(IMG)
    # All workers stopped, all items done

def measure_normal():
    for _ in range(100):
        x = _reshape(IMG)
        x = _standardize(x)

print(timeit.timeit(measure_pipe, number=1))  # 1.214164252000046
print(timeit.timeit(measure_normal, number=1))  # 3.083142001007218

License

MIT

Project details


Release history Release notifications | RSS feed

This version

0.1

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

streampipe-0.1.tar.gz (6.9 kB view hashes)

Uploaded Source

Built Distribution

streampipe-0.1-py3-none-any.whl (6.0 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page