Pypeline is a data pipeline builder library.
Project description
Pypeline
This is a package for creating iterative data processing pipelines. Note that this is NOT a general purpose stream processing library. It is only designed as being a low overhead and simple-to-setup stream processing library. So for large scale production applications, use something like kafka instead.
Warning
This library is still at an ALPHA stage. So things may not work as intended and the api is not final!
Trivial Example
from pypeline import build_action, Pypeline, ForkingPypelineExecutor, wrap
import asyncio
async def step1():
results = []
for i in range(1000):
results.append(wrap(i))
return results
async def step2(i):
return i * 10
async def step3(i):
return i + 1
async def run_pipeline():
pypeline = Pypeline()
# Adding actions to the pipeline
pypeline.add_action(build_action("Step1", step1)) \
.add_action(build_action("Step2", step2)) \
.add_action(build_action("Step3", step3, serialize_dir="./example")) # Serialize results so future runs will skip this step entirely
results = await pypeline.run(executor=ForkingPypelineExecutor()) # Custom executor that avoids the GIL
# Results are wrapped in a utility namedtuple, so let's flatten it.
results = [r.args[0] for r in results]
return results
results = asyncio.get_event_loop().run_until_complete(run_pipeline())
for result in results:
print(result)
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
data-pypeline-0.1.1.tar.gz
(12.1 kB
view hashes)
Built Distribution
Close
Hashes for data_pypeline-0.1.1-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 48eea7ea476080a09c044d2893c601bcc246fa500df8f0ecb611dba096645144 |
|
MD5 | a35e988810c24ace8757ec9b61856ab0 |
|
BLAKE2b-256 | 520af76b211ccee06c6ca2c0950051a8d24999ecb39c02993ebd1bfc7065065a |