No project description provided
Project description
RxPipes
A thin wrapper around RxPy for data-flow programming.
Install
# install from git
git clone https://github.com/shirecoding/RxPipes.git
cd RxPipes
pip3 install ./
# install from pypi
pip install rxpipes
Example: Static Data
from rxpipes import Pipeline
# create pipeline
class Multiply(Pipeline):
def setup(self, mul):
self.mul = 2
def transform(self):
from rx import operators as ops
return ops.map(lambda x: x * self.mul)
# execute a pipeline
Multiply(2)(2) # -> 4
Multiply(2)([1,2,3]) # -> [2, 4, 6]
# compose larger pipelines
mul2 = Multiply(2)
mul8 = mul2.pipe(
mul2,
mul2
)
mul8(2) # -> 16
# alternatively
mul8 = Pipeline.pipe(
Multiply(2),
Multiply(2),
Multiply(2),
)(2)
mul8(2) # -> 16
# create pipeline from lamba
mul2 = Pipeline.from_lambda(lambda x: 2*x)
mul2(2) # -> 4
Example: Reactive Data Streams
import rx
from rxpipes import Pipeline
mul2 = Pipeline.from_lambda(lambda x: 2*x)
x = mul2(rx.interval(1), subscribe=lambda x: print(x)) # -> 0, 2, 4, 6, ....
x.dispose() # unsubscribe to observable
Example: Parallel Processling
import multiprocessing
from rx.scheduler import ThreadPoolScheduler
from rxpipes.concurrency import Parallel
optimal_thread_count = multiprocessing.cpu_count()
pool_scheduler = ThreadPoolScheduler(optimal_thread_count)
def intense_calculation(value):
time.sleep(1)
return value
Multiply(1).pipe(
Parallel(intense_calculation, pool_scheduler)
)([1,2,3,4,5,6])
# -> [[1,2,3,4,5,6]]
Example: Image Processing Pipeline
from rxpipes import Pipeline
import numpy as np
class Normalize(Pipeline):
def setup(self, low, high):
self.low = low
self.high = high
def transform(self):
from rx import operators as ops
def _f(x):
_max = x.max()
_min = x.min()
factor = ((self.high - self.low) + 1e-12)/ ((_max - _min) + 1e-12)
return (x - _min) * factor + self.low
return ops.map(_f)
class Rescale(Pipeline):
def setup(self, shape):
self.shape = shape
def transform(self):
import cv2
from rx import operators as ops
def _f(x):
return cv2.resize(x.astype('float32'), self.shape)
return ops.map(_f)
p = Pipeline.pipe(
Normalize(0,1),
Rescale((3,3))
)
im = np.arange(5*5).reshape((5,5))
p(im)
# array([[0.08333334, 0.15277778, 0.22222222],
# [0.43055555, 0.5 , 0.5694444 ],
# [0.7777778 , 0.8472222 , 0.9166667 ]], dtype=float32)
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
rxpipes-0.0.2.tar.gz
(4.0 kB
view details)
File details
Details for the file rxpipes-0.0.2.tar.gz
.
File metadata
- Download URL: rxpipes-0.0.2.tar.gz
- Upload date:
- Size: 4.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.3.0 pkginfo/1.7.0 requests/2.25.1 setuptools/47.1.0 requests-toolbelt/0.9.1 tqdm/4.56.0 CPython/3.7.9
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 3265913664fa87ca20e15b34e0af5a7ace052c0c05007a3b8b3859852e012662 |
|
MD5 | 1316cb48a1a8d39eadecad200d0d1635 |
|
BLAKE2b-256 | 8580708f0cc65058dc546432dc1b4bd97bcda7bde379a02408d2be6edf85d711 |