Concurrent Python made simple
Project description
Concurrent Python made simple
Pyper is a flexible framework for concurrent and parallel data-processing, based on functional programming patterns. Used for 🔀 ETL Systems, ⚙️ Data Microservices, and 🌐 Data Collection
See the Documentation
Key features:
- 💡Intuitive API: Easy to learn, easy to think about. Implements clean abstractions to seamlessly unify threaded, multiprocessed, and asynchronous work.
- 🚀 Functional Paradigm: Python functions are the building blocks of data pipelines. Let's you write clean, reusable code naturally.
- 🛡️ Safety: Hides the heavy lifting of underlying task execution and resource clean-up. No more worrying about race conditions, memory leaks, or thread-level error handling.
- ⚡ Efficiency: Designed from the ground up for lazy execution, using queues, workers, and generators.
- ✨ Pure Python: Lightweight, with zero sub-dependencies.
Installation
Install the latest version using pip:
$ pip install python-pyper
Note that python-pyper is the pypi registered package.
Usage
In Pyper, the task decorator is used to transform functions into composable pipelines.
Let's simulate a pipeline that performs a series of transformations on some data.
import asyncio
import time
from pyper import task
def get_data(limit: int):
for i in range(limit):
yield i
async def step1(data: int):
await asyncio.sleep(1)
print("Finished async wait", data)
return data
def step2(data: int):
time.sleep(1)
print("Finished sync wait", data)
return data
def step3(data: int):
for i in range(10_000_000):
_ = i*i
print("Finished heavy computation", data)
return data
async def main():
# Define a pipeline of tasks using `pyper.task`
pipeline = task(get_data, branch=True) \
| task(step1, workers=20) \
| task(step2, workers=20) \
| task(step3, workers=20, multiprocess=True)
# Call the pipeline
total = 0
async for output in pipeline(limit=20):
total += output
print("Total:", total)
if __name__ == "__main__":
asyncio.run(main())
Pyper provides an elegant abstraction of the execution of each task, allowing you to focus on building out the logical functions of your program. In the main function:
pipelinedefines a function; this takes the parameters of its first task (get_data) and yields each output from its last task (step3)- Tasks are piped together using the
|operator (motivated by Unix's pipe operator) as a syntactic representation of passing inputs/outputs between tasks.
In the pipeline, we are executing three different types of work:
-
task(step1, workers=20)spins up 20asyncio.Tasks to handle asynchronous IO-bound work -
task(step2, workers=20)spins up 20threadsto handle synchronous IO-bound work -
task(step3, workers=20, multiprocess=True)spins up 20processesto handle synchronous CPU-bound work
task acts as one intuitive API for unifying the execution of each different type of function.
Each task has workers that submit outputs to the next task within the pipeline via queue-based data structures; this is the mechanism underpinning how concurrency and parallelism are achieved. See the docs for a breakdown of what a pipeline looks like under the hood.
See a non-async example
Pyper pipelines are by default non-async, as long as their tasks are defined as synchronous functions. For example:
import time
from pyper import task
def get_data(limit: int):
for i in range(limit):
yield i
def step1(data: int):
time.sleep(1)
print("Finished sync wait", data)
return data
def step2(data: int):
for i in range(10_000_000):
_ = i*i
print("Finished heavy computation", data)
return data
def main():
pipeline = task(get_data, branch=True) \
| task(step1, workers=20) \
| task(step2, workers=20, multiprocess=True)
total = 0
for output in pipeline(limit=20):
total += output
print("Total:", total)
if __name__ == "__main__":
main()
A pipeline consisting of at least one asynchronous function becomes an AsyncPipeline, which exposes the same usage API, provided async and await syntax in the obvious places. This makes it effortless to combine synchronously defined and asynchronously defined functions where need be.
Examples
To explore more of Pyper's features, see some further examples
Dependencies
Pyper is implemented in pure Python, with no sub-dependencies. It is built on top of the well-established built-in Python modules:
- threading for thread-based concurrency
- multiprocessing for parallelism
- asyncio for async-based concurrency
- concurrent.futures for unifying threads, processes, and async code
License
This project is licensed under the terms of the MIT license.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file python_pyper-0.4.4.tar.gz.
File metadata
- Download URL: python_pyper-0.4.4.tar.gz
- Upload date:
- Size: 349.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.0.1 CPython/3.12.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fbf9f4f5f95f8263a608770424f11558a07896a3d1b1a2699e4ba295b5100f7b
|
|
| MD5 |
2436d3f2f0b0c535e6d46aa874fc2cba
|
|
| BLAKE2b-256 |
18bc7e67eff590248554dfb4e325a979ccfa188c80259923cd3d255c4ac291c5
|
Provenance
The following attestation bundles were made for python_pyper-0.4.4.tar.gz:
Publisher:
publish.yml on pyper-dev/pyper
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
python_pyper-0.4.4.tar.gz -
Subject digest:
fbf9f4f5f95f8263a608770424f11558a07896a3d1b1a2699e4ba295b5100f7b - Sigstore transparency entry: 163719705
- Sigstore integration time:
-
Permalink:
pyper-dev/pyper@1e0cd0410fc285426af913e9b30705a02320ca24 -
Branch / Tag:
refs/tags/v0.4.4 - Owner: https://github.com/pyper-dev
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@1e0cd0410fc285426af913e9b30705a02320ca24 -
Trigger Event:
release
-
Statement type:
File details
Details for the file python_pyper-0.4.4-py3-none-any.whl.
File metadata
- Download URL: python_pyper-0.4.4-py3-none-any.whl
- Upload date:
- Size: 18.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.0.1 CPython/3.12.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f324cb2bdd0ee4153a0fb7d6011d54c12a9ea2dd3b74fd01652bcc067774ffe8
|
|
| MD5 |
e3cb2e121a0404722b38ca3a7d3d0fe8
|
|
| BLAKE2b-256 |
9bd2f8ebc2d7e1cd48403a8e9aa0855739c97e0fdd007448582c0dc2ca7a3915
|
Provenance
The following attestation bundles were made for python_pyper-0.4.4-py3-none-any.whl:
Publisher:
publish.yml on pyper-dev/pyper
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
python_pyper-0.4.4-py3-none-any.whl -
Subject digest:
f324cb2bdd0ee4153a0fb7d6011d54c12a9ea2dd3b74fd01652bcc067774ffe8 - Sigstore transparency entry: 163719707
- Sigstore integration time:
-
Permalink:
pyper-dev/pyper@1e0cd0410fc285426af913e9b30705a02320ca24 -
Branch / Tag:
refs/tags/v0.4.4 - Owner: https://github.com/pyper-dev
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@1e0cd0410fc285426af913e9b30705a02320ca24 -
Trigger Event:
release
-
Statement type: