Function pipelines
Project description
py-pipelines
pipeline in python3
Install via pip
$ pip3 install py-pipelines
Clone this repository and local install
$ pip3 install --upgrade .
Run test
$ python3 -m unittest -v tests/main.py
pipelines
All functions accept the return of the previous one as their first parameter.
Examples
Simple pipeline
create a pipeline with functions
def pipeline(input:Any=None, pipe:Union[List[Callable], List[Union[List, Callable]]]=[]) -> Any:
Example
from pypipelines import pipeline
def one(input):
return input + " ONE "
def two(input):
return input + " TWO "
def three(input, *args):
return input + " THREE " + "".join(args)
def four(input, a, b, c):
return input + " FOUR " + f"{a}" + f"{b}" + f"{c}"
if __name__ == "__main__":
result = pipeline(
input="TEST ",
pipe=[
one,
two,
[three, " DD ", " EE ", " FF "], # function with paramiters
[four, " XX ", " YY ", " ZZ "] # function with paramiters
]
)
print(result)
$ python3 main.py
TEST ONE TWO THREE DD EE FF FOUR XX YY ZZ
Simple async pipeline
Create a pipeline with async functions
async def asyncpipeline(input:Any=None, pipe:Union[List[Callable], List[Union[List, Callable]]]=[]) -> Any:
Example
from pypipelines import asyncpipeline
import asyncio
async def asyncone(input):
return input + " ONE "
async def asynctwo(input):
return input + " TWO "
async def asyncthree(input, *args):
return input + " THREE " + "".join(args)
async def asyncfour(input, a, b, c):
return input + " FOUR " + f"{a}" + f"{b}" + f"{c}"
if __name__ == "__main__":
result = asyncio.run(asyncpipeline(
input="TEST ",
pipe=[
asyncone,
asynctwo,
[asyncthree, " DD ", " EE ", " FF "],
[asyncfour, " XX ", " YY ", " ZZ "]
]
))
print(result)
$ python3 main.py
TEST ONE TWO THREE DD EE FF FOUR XX YY ZZ
Simple event loop pipeline
Create a event loop pipeline with functions and run a callback when pipeline finish
def eventlooppipeline(input:Any=None, pipe:Union[List[Callable], List[Union[List, Callable]]]=[], callback:Union[Callable, None] = None) -> None:
Example
from pypipelines import eventlooppipeline
from time import sleep
def one(input):
return input + " ONE "
def two(input):
return input + " TWO "
def three(input, *args):
return input + " THREE " + "".join(args)
def four(input, a, b, c):
return input + " FOUR " + f"{a}" + f"{b}" + f"{c}"
def twoSleep(input):
sleep(2)
return input + " BB (sleep 2) "
def callback(input):
print("CALLBACK", input)
if __name__ == "__main__":
eventlooppipeline(
input="ciao",
pipe=[
one,
two,
twoSleep,
[three, " DD ", " EE ", " FF "],
[four, " XX ", " YY ", " ZZ "]
],
callback=callback)
print("Event loop running...")
$ python3 main.py
Event loop running...
CALLBACK ciao ONE TWO BB (sleep 2) THREE DD EE FF FOUR XX YY ZZ
Simple event loop async pipeline
Create a event loop pipeline with async functions and run a callback when pipeline finish
def asynceventlooppipeline(input:Any=None, pipe:Union[List[Callable], List[Union[List, Callable]]]=[], callback:Union[Callable, None] = None) -> None:
Example
from pypipelines import asynceventlooppipeline
from time import sleep
async def asyncone(input):
return input + " ONE "
async def asynctwo(input):
return input + " TWO "
async def asyncthree(input, *args):
return input + " THREE " + "".join(args)
async def asyncfour(input, a, b, c):
return input + " FOUR " + f"{a}" + f"{b}" + f"{c}"
async def asynctwoSleep(input):
sleep(2)
return input + " BB (async sleep 1) "
def callback(input):
print("CALLBACK", input)
if __name__ == "__main__":
asynceventlooppipeline(
input="TEST ",
pipe=[
asyncone,
asynctwo,
asynctwoSleep,
[asyncthree, " DD ", " EE ", " FF "],
[asyncfour, " XX ", " YY ", " ZZ "]
],
callback=callback
)
print("Event loop running...")
$ python3 main.py
Event loop running...
CALLBACK TEST ONE TWO BB (async sleep 1) THREE DD EE FF FOUR XX YY ZZ
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
No source distribution files available for this release.See tutorial on generating distribution archives.
Built Distribution
Close
Hashes for py_pipelines-1.0.3-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | eddf0b09d6f781e63b61893ddb75eb2f91f11d0df43a469669aa5bb0ed689014 |
|
MD5 | f64fc3960c4373281d3147c2ce4eaf07 |
|
BLAKE2b-256 | 9beacb2eb364e565ad712a7afbe96ae042a33ece2e4bdbd85f4ca23da9af6d45 |