Scalable async pipelines in Python, made easy
Project description
(pronounced "pipes")
Scalable async pipelines in Python, made easy.
📝 Table of Contents
🧐 About
This package is designed to make building asynchronous streams that balance and scale automatically easy. Built on pure Python -- no dependencies -- this framework can be used for variable, decoupled, task-based workloads like web scraping, database management operations, and more. Scale this out-of-the-box, with minimal hardware and coding, to process 10k+/s on production loads.
Simple pipelines
import pypes
app = pypes.App()
@app.task(interval=1.0)
async def every_second():
return datetime.utcnow()
@app.task()
async def task1(stream):
async for s in stream:
print(f"streaming from task1: {s}")
yield obj
if __name__ == '__main__':
pipeline = every_second \
.map(task1)
pipeline.run()
To scaled pipelines
import pypes
import aiohttp
app = pypes.App()
@app.task(interval=0.1)
async def every_second():
return "http://www.google.com"
@app.task(scaler=pypes.scale.TanhTaskScaler()) # this scales workers automatically to consume incoming requests
async def task1(stream):
async for s in stream:
yield await aiohttp.get(s)
async def task2(stream):
async for s in stream:
if s.response_code != 200:
print("failed request: {s}")
yield
if __name__ == '__main__':
pipeline = every_second \
.map(task1)
.reduce(task2)
pipeline.run()
🏁 Getting Started
Start with a simple pipeline, and build out from there!
import pypes
app = pypes.App()
@app.task(interval=1.0)
async def every_second():
return datetime.utcnow()
@app.task()
async def task1(stream):
async for s in stream:
print(f"streaming from task1: {s}")
yield obj
if __name__ == '__main__':
pipeline = every_second \
.map(task1)
pipeline.run()
For more, see readthedocs
Prerequisites
pypes
is based on pure Python (3.5+) and does not require other dependencies.
Installing
pip install pypes
🔧 Running the tests
To be created!
Break down into end to end tests
To be created!
And coding style tests
To be created!
🎈 Usage
Import the library
import pypes
Create an App object
app = pypes.App()
Create a trigger task
@app.task(interval=1.0)
async def every_second():
return 1
Create downtream tasks
@app.task()
async def task_n(stream):
async for s in stream:
# process "s" here
yield
Create + configure the pipeline
pipeline = every_second \
.map(task_n)
Run the pipeline
pipeline.run()
This will run continuously until interrupted.
⛏️ Built Using
✔️ TODO
- Extend to multithreads
- Extend to multiprocess
- Build visualization server
- Add pipeline pipe functions (join, head, ...)
✍️ Authors
- @mroumanos
- Contributors: you?
🎉 Acknowledgements
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for asyncio_pypes-0.0.4-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 42dc8b55ec15c5f57b667aac92e28cf2ed376cfcf867bf64d56f1497ce3b783d |
|
MD5 | c2cb0a9346b5c576fc0ccb87f8717839 |
|
BLAKE2b-256 | d6d19884e65304074217107c778cc50c1e9fd71902565f22801bef74026fc11b |