Scalable async pipelines in Python, made easy
Project description
(pronounced "a-i-o-pipes")
Scalable asyncio pipelines in Python, made easy.
📝 Table of Contents
🧐 About
This package is designed to make building asynchronous streams that balance and scale automatically easy. Built on pure Python -- no dependencies -- this framework can be used for variable, decoupled, task-based workloads like web scraping, database management operations, and more. Scale this out-of-the-box, with minimal hardware and coding, to process 10k+/s on production loads.
Simple pipelines
import aiopypes
app = aiopypes.App()
@app.task(interval=1.0)
async def every_second():
return datetime.utcnow()
@app.task()
async def task1(stream):
async for s in stream:
print(f"streaming from task1: {s}")
yield obj
if __name__ == '__main__':
pipeline = every_second \
.map(task1)
pipeline.run()
To scaled pipelines
import aiopypes
import aiohttp
app = aiopypes.App()
@app.task(interval=0.1)
async def every_second():
return "http://www.google.com"
@app.task(scaler=aiopypes.scale.TanhTaskScaler()) # this scales workers automatically to consume incoming requests
async def task1(stream):
async for s in stream:
yield await aiohttp.get(s)
async def task2(stream):
async for s in stream:
if s.response_code != 200:
print("failed request: {s}")
yield
if __name__ == '__main__':
pipeline = every_second \
.map(task1)
.reduce(task2)
pipeline.run()
🏁 Getting Started
Start with a simple pipeline, and build out from there!
import aiopypes
app = aiopypes.App()
@app.task(interval=1.0)
async def every_second():
return datetime.utcnow()
@app.task()
async def task1(stream):
async for s in stream:
print(f"streaming from task1: {s}")
yield obj
if __name__ == '__main__':
pipeline = every_second \
.map(task1)
pipeline.run()
For more, see readthedocs
Prerequisites
aiopypes
is based on pure Python (3.5+) and does not require other dependencies.
Installing
Available on PyPi here, installed with pip:
pip install aiopypes
🔧 Running the tests
To be created!
Break down into end to end tests
To be created!
And coding style tests
To be created!
🎈 Usage
Import the library
import aiopypes
Create an App object
app = aiopypes.App()
Create a trigger task
@app.task(interval=1.0)
async def every_second():
return 1
Create downtream tasks
@app.task()
async def task_n(stream):
async for s in stream:
# process "s" here
yield
Create + configure the pipeline
pipeline = every_second \
.map(task_n)
Run the pipeline
pipeline.run()
This will run continuously until interrupted.
⛏️ Built Using
✔️ TODO
- Extend to multithreads
- Extend to multiprocess
- Build visualization server
- Add pipeline pipe functions (join, head, ...)
✍️ Authors
- @mroumanos
- Contributors: you?
🎉 Acknowledgements
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file aiopypes-1.1.0.tar.gz
.
File metadata
- Download URL: aiopypes-1.1.0.tar.gz
- Upload date:
- Size: 14.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.11.7
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 62eaad0417842b18d169e12732c34caac034612c7f6969e4845e93107cde7e73 |
|
MD5 | ec9aeb7a3f608cc039ce79c7b8ff5359 |
|
BLAKE2b-256 | e89af85df50e58d88520ee7e47b9db8e661ed2543beddfc406a6e6646d2d9d2e |
File details
Details for the file aiopypes-1.1.0-py3-none-any.whl
.
File metadata
- Download URL: aiopypes-1.1.0-py3-none-any.whl
- Upload date:
- Size: 14.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.11.7
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 202ec76a0f869df80cd78c42b9e838453e07eab39eb248743ef5c634471c9074 |
|
MD5 | d92ef500513afd2945b0485008971a2f |
|
BLAKE2b-256 | 1f323f57c171bff60709a6d7ea329ceb6d772cd66ec5076d389a36e48d2d1391 |