Skip to main content

No project description provided

Project description

RxPipes

A thin wrapper around RxPy for data-flow programming.

Install

# install from git
git clone https://github.com/shirecoding/RxPipes.git
cd RxPipes
pip3 install ./

Example: Static Data

from rxpipes import Pipeline

# create pipeline
class Multiply(Pipeline):
    
    def setup(self, mul):
        self.mul = 2
    
    def operation(self, x):
        return x * self.mul

# execute a pipeline
Multiply(2)(2) # -> 4
Multiply(2)([1,2,3]) # -> [2, 4, 6]

# compose larger pipelines
mul2 = Multiply(2)
mul8 = mul2.pipe(
    mul2,
    mul2
)

mul8(2) # -> 16

# alternatively
mul8 = Pipeline.pipe(
    Multiply(2),
    Multiply(2),
    Multiply(2),
)(2)

mul8(2) # -> 16

# create pipeline from lamba
mul2 = Pipeline.from_lambda(lambda x: 2*x)

mul2(2) # -> 4

Example: Reactive Data Streams

import rx

mul2 = Pipeline.from_lambda(lambda x: 2*x)
x = mul2(rx.interval(1), subscribe=lambda x: print(x)) # -> 0, 2, 4, 6, ....

x.dispose() # unsubscribe to observable

Example: Parallel Processling

import multiprocessing
from rx.scheduler import ThreadPoolScheduler

optimal_thread_count = multiprocessing.cpu_count()
pool_scheduler = ThreadPoolScheduler(optimal_thread_count)

def intense_calculation(value):
    time.sleep(1)
    return value

Multiply(1).pipe(
    Parallel(intense_calculation, pool_scheduler)
)([1,2,3,4,5,6])

# -> [[1,2,3,4,5,6]]

Example: Image Processing Pipeline

preprocessing = Pipeline.pipe(
    Rescale2D(50, 50),
    Normalize(0, 1),
    Pipeline.from_(lambda x: np.expand_dims(x, axis=-1))
)

postprocessing = Pipeline.pipe(
    Pipeline.from_(lambda x: np.argmax(x, axis=-1))
)

predict = Pipeline.pipe(
    preprocessing,
    Pipeline.from_(lambda x: model.predict(x)),
    postprocessing
)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rxpipes-0.0.1.tar.gz (3.5 kB view details)

Uploaded Source

File details

Details for the file rxpipes-0.0.1.tar.gz.

File metadata

  • Download URL: rxpipes-0.0.1.tar.gz
  • Upload date:
  • Size: 3.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.23.0 setuptools/45.2.0 requests-toolbelt/0.9.1 tqdm/4.40.0 CPython/3.6.4

File hashes

Hashes for rxpipes-0.0.1.tar.gz
Algorithm Hash digest
SHA256 7ccacd974b5814a2be511527c91bf40e42167d9f293dca5ed20411e644befc39
MD5 75f5bc8e0fc432411624fe257765e3b3
BLAKE2b-256 9213c2149d995e3e336992d3fb2405db15ba2b6a8180321e3e3b96c377f68207

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page