Scalable async pipelines in Python, made easy
Project description
(pronounced "pipes")
Scalable async pipelines in Python, made easy.
📝 Table of Contents
🧐 About
This package is designed to make building asynchronous streams that balance and scale automatically easy. Built on pure Python -- no dependencies -- this framework can be used for variable, decoupled, task-based workloads like web scraping, database management operations, and more. Scale this out-of-the-box, with minimal hardware and coding, to process 10k+/s on production loads.
Simple pipelines
import pypes
app = pypes.App()
@app.task(interval=1.0)
async def every_second():
return datetime.utcnow()
@app.task()
async def task1(stream):
async for s in stream:
print(f"streaming from task1: {s}")
yield obj
if __name__ == '__main__':
pipeline = every_second \
.map(task1)
pipeline.run()
To scaled pipelines
import pypes
import aiohttp
app = pypes.App()
@app.task(interval=0.1)
async def every_second():
return "http://www.google.com"
@app.task(scaler=pypes.scale.TanhTaskScaler()) # this scales workers automatically to consume incoming requests
async def task1(stream):
async for s in stream:
yield await aiohttp.get(s)
async def task2(stream):
async for s in stream:
if s.response_code != 200:
print("failed request: {s}")
yield
if __name__ == '__main__':
pipeline = every_second \
.map(task1)
.reduce(task2)
pipeline.run()
🏁 Getting Started
Start with a simple pipeline, and build out from there!
import pypes
app = pypes.App()
@app.task(interval=1.0)
async def every_second():
return datetime.utcnow()
@app.task()
async def task1(stream):
async for s in stream:
print(f"streaming from task1: {s}")
yield obj
if __name__ == '__main__':
pipeline = every_second \
.map(task1)
pipeline.run()
For more, see readthedocs
Prerequisites
pypes
is based on pure Python (3.5+) and does not require other dependencies.
Installing
pip install pypes
🔧 Running the tests
To be created!
Break down into end to end tests
To be created!
And coding style tests
To be created!
🎈 Usage
Import the library
import pypes
Create an App object
app = pypes.App()
Create a trigger task
@app.task(interval=1.0)
async def every_second():
return 1
Create downtream tasks
@app.task()
async def task_n(stream):
async for s in stream:
# process "s" here
yield
Create + configure the pipeline
pipeline = every_second \
.map(task_n)
Run the pipeline
pipeline.run()
This will run continuously until interrupted.
⛏️ Built Using
✔️ TODO
- Extend to multithreads
- Extend to multiprocess
- Build visualization server
- Add pipeline pipe functions (join, head, ...)
✍️ Authors
- @mroumanos
- Contributors: you?
🎉 Acknowledgements
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file asyncio-pypes-0.0.4.tar.gz
.
File metadata
- Download URL: asyncio-pypes-0.0.4.tar.gz
- Upload date:
- Size: 12.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.9.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 87dc6d9228b6ea2090dbe455d2c9ecf9e5d421b567f6ef1a187e508520a76f79 |
|
MD5 | cfb382d5859fd1c10b1ae9d81db60a53 |
|
BLAKE2b-256 | e3f7f065d4ad8eb6e38fbdb7727b0d5d02f1e6481b0f103510768c4884c9271e |
File details
Details for the file asyncio_pypes-0.0.4-py3-none-any.whl
.
File metadata
- Download URL: asyncio_pypes-0.0.4-py3-none-any.whl
- Upload date:
- Size: 14.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.9.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 42dc8b55ec15c5f57b667aac92e28cf2ed376cfcf867bf64d56f1497ce3b783d |
|
MD5 | c2cb0a9346b5c576fc0ccb87f8717839 |
|
BLAKE2b-256 | d6d19884e65304074217107c778cc50c1e9fd71902565f22801bef74026fc11b |