No project description provided
Project description
RxPipes
A thin wrapper around RxPy for data-flow programming.
Install
# install from git
git clone https://github.com/shirecoding/RxPipes.git
cd RxPipes
pip3 install ./
Example: Static Data
from rxpipes import Pipeline
# create pipeline
class Multiply(Pipeline):
def setup(self, mul):
self.mul = 2
def operation(self, x):
return x * self.mul
# execute a pipeline
Multiply(2)(2) # -> 4
Multiply(2)([1,2,3]) # -> [2, 4, 6]
# compose larger pipelines
mul2 = Multiply(2)
mul8 = mul2.pipe(
mul2,
mul2
)
mul8(2) # -> 16
# alternatively
mul8 = Pipeline.pipe(
Multiply(2),
Multiply(2),
Multiply(2),
)(2)
mul8(2) # -> 16
# create pipeline from lamba
mul2 = Pipeline.from_lambda(lambda x: 2*x)
mul2(2) # -> 4
Example: Reactive Data Streams
import rx
mul2 = Pipeline.from_lambda(lambda x: 2*x)
x = mul2(rx.interval(1), subscribe=lambda x: print(x)) # -> 0, 2, 4, 6, ....
x.dispose() # unsubscribe to observable
Example: Parallel Processling
import multiprocessing
from rx.scheduler import ThreadPoolScheduler
optimal_thread_count = multiprocessing.cpu_count()
pool_scheduler = ThreadPoolScheduler(optimal_thread_count)
def intense_calculation(value):
time.sleep(1)
return value
Multiply(1).pipe(
Parallel(intense_calculation, pool_scheduler)
)([1,2,3,4,5,6])
# -> [[1,2,3,4,5,6]]
Example: Image Processing Pipeline
preprocessing = Pipeline.pipe(
Rescale2D(50, 50),
Normalize(0, 1),
Pipeline.from_(lambda x: np.expand_dims(x, axis=-1))
)
postprocessing = Pipeline.pipe(
Pipeline.from_(lambda x: np.argmax(x, axis=-1))
)
predict = Pipeline.pipe(
preprocessing,
Pipeline.from_(lambda x: model.predict(x)),
postprocessing
)
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
rxpipes-0.0.1.tar.gz
(3.5 kB
view details)
File details
Details for the file rxpipes-0.0.1.tar.gz
.
File metadata
- Download URL: rxpipes-0.0.1.tar.gz
- Upload date:
- Size: 3.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.23.0 setuptools/45.2.0 requests-toolbelt/0.9.1 tqdm/4.40.0 CPython/3.6.4
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 7ccacd974b5814a2be511527c91bf40e42167d9f293dca5ed20411e644befc39 |
|
MD5 | 75f5bc8e0fc432411624fe257765e3b3 |
|
BLAKE2b-256 | 9213c2149d995e3e336992d3fb2405db15ba2b6a8180321e3e3b96c377f68207 |