Skip to main content

StreamPipe is a straightforward multi-worker pipeline for streaming data in Python.

Project description

StreamPipe

StreamPipe is a simple Python library (a single file!) for running multi-worker pipelines on infinite data streams. The best part about StreamPipe is that the order of data is preserved throughout the pipeline. You can chain order-independent workers (think of image processing functions) and order-dependent workers (think of an object video tracker) on the same StreamPipe!

Installation

pip3 install streampipe

Install from source:

git clone https://github.com/highvight/streampipe
cd streampipe
pip3 install .
pip3 install .[testing]  # for testing and benchmarks

Usage

Here is a starter. Let's check if you can speed up a reshaping and standardization pipeline for images.

import timeit
import numpy as np
from streampipe import StreamPipe

IMG = np.random.rand(1000, 1000, 3) * 255

def _reshape(img):
    return img[::2, ::2]

def _standardize(img):
    mean = np.mean(img, axis=(0, 1), keepdims=True)
    std = np.std(img, axis=(0, 1), keepdims=True)
    return (img - mean) / std

pipe = StreamPipe()
pipe.add(_reshape, n_workers=4, maxsize=4)
pipe.add(_standardize, n_workers=4, maxsize=4)

def measure_pipe():
    # Starts the workers
    with pipe:
        for _ in range(100):
            pipe.put(IMG)
    # All workers stopped, all items done

def measure_normal():
    for _ in range(100):
        x = _reshape(IMG)
        x = _standardize(x)

print(timeit.timeit(measure_pipe, number=1))  # 1.214164252000046
print(timeit.timeit(measure_normal, number=1))  # 3.083142001007218

License

MIT

Project details


Release history Release notifications | RSS feed

This version

0.1

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

streampipe-0.1.tar.gz (6.9 kB view details)

Uploaded Source

Built Distribution

streampipe-0.1-py3-none-any.whl (6.0 kB view details)

Uploaded Python 3

File details

Details for the file streampipe-0.1.tar.gz.

File metadata

  • Download URL: streampipe-0.1.tar.gz
  • Upload date:
  • Size: 6.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.9.18

File hashes

Hashes for streampipe-0.1.tar.gz
Algorithm Hash digest
SHA256 be493d7f50ac33874d7b4e42ced152f7198773f39f2038d5a3b29eebfbfa15fb
MD5 b00e6abb9e097e2a48dd0069a15ad991
BLAKE2b-256 a50ab32cc52d5ef7482ea68d471c5eb7ddbdbcfb34b5d662da652954593c8c53

See more details on using hashes here.

File details

Details for the file streampipe-0.1-py3-none-any.whl.

File metadata

  • Download URL: streampipe-0.1-py3-none-any.whl
  • Upload date:
  • Size: 6.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.9.18

File hashes

Hashes for streampipe-0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 129dd135034a59797e1ba0d8150d0b2b1e1776f78a00e1eec9f38b83012c1ec0
MD5 d5a34ee4abe4bef310ddfbdb243f7584
BLAKE2b-256 902eac6e62a9be125e400d4c41ca22b67bd5c315c2930dd9d8d11564275c01b7

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page