Skip to main content

No project description provided

Project description

Documentation Status CircleCI CodeFactor

RxPipes

A thin wrapper around RxPy for data-flow programming.

Documentation

Install

# install from git
git clone https://github.com/shirecoding/RxPipes.git
cd RxPipes
pip3 install ./

# install from pypi
pip install rxpipes

Example: Static Data

from rxpipes import Pipeline

# create pipeline
class Multiply(Pipeline):

    def setup(self, mul):
        self.mul = 2

    def transform(self):
        from rx import operators as ops
        return ops.map(lambda x: x * self.mul)

# execute a pipeline
Multiply(2)(2) # -> 4
Multiply(2)([1,2,3]) # -> [2, 4, 6]

# compose larger pipelines
mul2 = Multiply(2)
mul8 = mul2.pipe(
    mul2,
    mul2
)

mul8(2) # -> 16

# alternatively
mul8 = Pipeline.pipe(
    Multiply(2),
    Multiply(2),
    Multiply(2),
)(2)

mul8(2) # -> 16

# create pipeline from lamba
mul2 = Pipeline.map(lambda x: 2*x)

mul2(2) # -> 4

Example: Reactive Data Streams

import rx
from rxpipes import Pipeline

mul2 = Pipeline.map(lambda x: 2*x)
x = mul2(rx.interval(1), subscribe=lambda x: print(x)) # -> 0, 2, 4, 6, ....

x.dispose() # unsubscribe to observable

Example: Parallel Processling

import multiprocessing
from rx.scheduler import ThreadPoolScheduler
from rxpipes.concurrency import Parallel

optimal_thread_count = multiprocessing.cpu_count()
pool_scheduler = ThreadPoolScheduler(optimal_thread_count)

def intense_calculation(value):
    time.sleep(1)
    return value

Multiply(1).pipe(
    Parallel(intense_calculation, pool_scheduler)
)([1,2,3,4,5,6])

# -> [[1,2,3,4,5,6]]

Example: Image Processing Pipeline

from rxpipes import Pipeline
import numpy as np

class Normalize(Pipeline):

    def setup(self, low, high):
        self.low = low
        self.high = high

    def transform(self):
        from rx import operators as ops

        def _f(x):
            _max = x.max()
            _min = x.min()
            factor =  ((self.high - self.low) + 1e-12)/ ((_max - _min) + 1e-12)
            return (x - _min) * factor + self.low

        return ops.map(_f)

class Rescale(Pipeline):

    def setup(self, shape):
        self.shape = shape

    def transform(self):
        import cv2
        from rx import operators as ops

        def _f(x):
            return cv2.resize(x.astype('float32'), self.shape)

        return ops.map(_f)

p = Pipeline.pipe(
    Normalize(0,1),
    Rescale((3,3))
)
im = np.arange(5*5).reshape((5,5))

p(im)

# array([[0.08333334, 0.15277778, 0.22222222],
#        [0.43055555, 0.5       , 0.5694444 ],
#        [0.7777778 , 0.8472222 , 0.9166667 ]], dtype=float32)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rxpipes-0.0.3.tar.gz (4.8 kB view details)

Uploaded Source

File details

Details for the file rxpipes-0.0.3.tar.gz.

File metadata

  • Download URL: rxpipes-0.0.3.tar.gz
  • Upload date:
  • Size: 4.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.3.0 pkginfo/1.7.0 requests/2.25.1 setuptools/47.1.0 requests-toolbelt/0.9.1 tqdm/4.56.0 CPython/3.7.9

File hashes

Hashes for rxpipes-0.0.3.tar.gz
Algorithm Hash digest
SHA256 06fb2570502c5b6473d0e7f9a48e5db97d140a44c82a23f69aa71acf091e4915
MD5 95313416f9c8f7a9a8b9540a48f5c612
BLAKE2b-256 6f5c3f7b4ecd375190afc48b910506bb49aaa4d91e13ab01bb7157a5795c204f

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page