Scalable async pipelines in Python, made easy
Project description
(pronounced "a-i-o-pipes")
Scalable asyncio pipelines in Python, made easy.
📝 Table of Contents
🧐 About
This package is designed to make building asynchronous streams that balance and scale automatically easy. Built on pure Python -- no dependencies -- this framework can be used for variable, decoupled, task-based workloads like web scraping, database management operations, and more. Scale this out-of-the-box, with minimal hardware and coding, to process 10k+/s on production loads.
Simple pipelines
import aiopypes
app = aiopypes.App()
@app.task(interval=1.0)
async def every_second():
return datetime.utcnow()
@app.task()
async def task1(stream):
async for s in stream:
print(f"streaming from task1: {s}")
yield obj
if __name__ == '__main__':
pipeline = every_second \
.map(task1)
pipeline.run()
To scaled pipelines
import aiopypes
import aiohttp
app = aiopypes.App()
@app.task(interval=0.1)
async def every_second():
return "http://www.google.com"
@app.task(scaler=aiopypes.scale.TanhTaskScaler()) # this scales workers automatically to consume incoming requests
async def task1(stream):
async for s in stream:
yield await aiohttp.get(s)
async def task2(stream):
async for s in stream:
if s.response_code != 200:
print("failed request: {s}")
yield
if __name__ == '__main__':
pipeline = every_second \
.map(task1)
.reduce(task2)
pipeline.run()
🏁 Getting Started
Start with a simple pipeline, and build out from there!
import aiopypes
app = aiopypes.App()
@app.task(interval=1.0)
async def every_second():
return datetime.utcnow()
@app.task()
async def task1(stream):
async for s in stream:
print(f"streaming from task1: {s}")
yield obj
if __name__ == '__main__':
pipeline = every_second \
.map(task1)
pipeline.run()
For more, see readthedocs
Prerequisites
aiopypes
is based on pure Python (3.5+) and does not require other dependencies.
Installing
Available on PyPi here, installed with pip:
pip install aiopypes
🔧 Running the tests
To be created!
Break down into end to end tests
To be created!
And coding style tests
To be created!
🎈 Usage
Import the library
import aiopypes
Create an App object
app = aiopypes.App()
Create a trigger task
@app.task(interval=1.0)
async def every_second():
return 1
Create downtream tasks
@app.task()
async def task_n(stream):
async for s in stream:
# process "s" here
yield
Create + configure the pipeline
pipeline = every_second \
.map(task_n)
Run the pipeline
pipeline.run()
This will run continuously until interrupted.
⛏️ Built Using
✔️ TODO
- Extend to multithreads
- Extend to multiprocess
- Build visualization server
- Add pipeline pipe functions (join, head, ...)
✍️ Authors
- @mroumanos
- Contributors: you?
🎉 Acknowledgements
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.