Skip to main content

No project description provided

Project description

RxPipes

A thin wrapper around RxPy for data-flow programming.

Install

# install from git
git clone https://github.com/shirecoding/RxPipes.git
cd RxPipes
pip3 install ./

# install from pypi
pip install rxpipes

Example: Static Data

from rxpipes import Pipeline

# create pipeline
class Multiply(Pipeline):
    
    def setup(self, mul):
        self.mul = 2
    
    def transform(self):
        from rx import operators as ops
        return ops.map(lambda x: x * self.mul)

# execute a pipeline
Multiply(2)(2) # -> 4
Multiply(2)([1,2,3]) # -> [2, 4, 6]

# compose larger pipelines
mul2 = Multiply(2)
mul8 = mul2.pipe(
    mul2,
    mul2
)

mul8(2) # -> 16

# alternatively
mul8 = Pipeline.pipe(
    Multiply(2),
    Multiply(2),
    Multiply(2),
)(2)

mul8(2) # -> 16

# create pipeline from lamba
mul2 = Pipeline.from_lambda(lambda x: 2*x)

mul2(2) # -> 4

Example: Reactive Data Streams

import rx
from rxpipes import Pipeline

mul2 = Pipeline.from_lambda(lambda x: 2*x)
x = mul2(rx.interval(1), subscribe=lambda x: print(x)) # -> 0, 2, 4, 6, ....

x.dispose() # unsubscribe to observable

Example: Parallel Processling

import multiprocessing
from rx.scheduler import ThreadPoolScheduler
from rxpipes.concurrency import Parallel

optimal_thread_count = multiprocessing.cpu_count()
pool_scheduler = ThreadPoolScheduler(optimal_thread_count)

def intense_calculation(value):
    time.sleep(1)
    return value

Multiply(1).pipe(
    Parallel(intense_calculation, pool_scheduler)
)([1,2,3,4,5,6])

# -> [[1,2,3,4,5,6]]

Example: Image Processing Pipeline

from rxpipes import Pipeline
import numpy as np

class Normalize(Pipeline):

    def setup(self, low, high):
        self.low = low
        self.high = high

    def transform(self):
        from rx import operators as ops

        def _f(x):
            _max = x.max()
            _min = x.min()
            factor =  ((self.high - self.low) + 1e-12)/ ((_max - _min) + 1e-12)
            return (x - _min) * factor + self.low

        return ops.map(_f)

class Rescale(Pipeline):

    def setup(self, shape):
        self.shape = shape

    def transform(self):
        import cv2
        from rx import operators as ops
        
        def _f(x):
            return cv2.resize(x.astype('float32'), self.shape)

        return ops.map(_f)

p = Pipeline.pipe(
    Normalize(0,1),
    Rescale((3,3))
)
im = np.arange(5*5).reshape((5,5))

p(im)

# array([[0.08333334, 0.15277778, 0.22222222],
#        [0.43055555, 0.5       , 0.5694444 ],
#        [0.7777778 , 0.8472222 , 0.9166667 ]], dtype=float32)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rxpipes-0.0.2.tar.gz (4.0 kB view details)

Uploaded Source

File details

Details for the file rxpipes-0.0.2.tar.gz.

File metadata

  • Download URL: rxpipes-0.0.2.tar.gz
  • Upload date:
  • Size: 4.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.3.0 pkginfo/1.7.0 requests/2.25.1 setuptools/47.1.0 requests-toolbelt/0.9.1 tqdm/4.56.0 CPython/3.7.9

File hashes

Hashes for rxpipes-0.0.2.tar.gz
Algorithm Hash digest
SHA256 3265913664fa87ca20e15b34e0af5a7ace052c0c05007a3b8b3859852e012662
MD5 1316cb48a1a8d39eadecad200d0d1635
BLAKE2b-256 8580708f0cc65058dc546432dc1b4bd97bcda7bde379a02408d2be6edf85d711

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page