Skip to main content

Scalable async pipelines in Python, made easy

Project description

a-i-o-p-y-p-e-s

(pronounced "a-i-o-pipes")

Status GitHub Issues GitHub Pull Requests License


Scalable asyncio pipelines in Python, made easy.

📝 Table of Contents

🧐 About

This package is designed to make building asynchronous streams that balance and scale automatically easy. Built on pure Python -- no dependencies -- this framework can be used for variable, decoupled, task-based workloads like web scraping, database management operations, and more. Scale this out-of-the-box, with minimal hardware and coding, to process 10k+/s on production loads.

Simple pipelines

import aiopypes

app = aiopypes.App()

@app.task(interval=1.0)
async def every_second():
    return datetime.utcnow()

@app.task()
async def task1(stream):
    async for s in stream:
        print(f"streaming from task1: {s}")
        yield obj

if __name__ == '__main__':

    pipeline = every_second \
               .map(task1)

    pipeline.run()

To scaled pipelines

import aiopypes
import aiohttp

app = aiopypes.App()

@app.task(interval=0.1)
async def every_second():
    return "http://www.google.com"

@app.task(scaler=aiopypes.scale.TanhTaskScaler()) #  this scales workers automatically to consume incoming requests
async def task1(stream):
    async for s in stream:
        yield await aiohttp.get(s)

async def task2(stream):
    async for s in stream:
        if s.response_code != 200:
            print("failed request: {s}")
        yield

if __name__ == '__main__':

    pipeline = every_second \
               .map(task1)
               .reduce(task2)

    pipeline.run()

🏁 Getting Started

Start with a simple pipeline, and build out from there!

import aiopypes

app = aiopypes.App()

@app.task(interval=1.0)
async def every_second():
    return datetime.utcnow()

@app.task()
async def task1(stream):
    async for s in stream:
        print(f"streaming from task1: {s}")
        yield obj

if __name__ == '__main__':

    pipeline = every_second \
               .map(task1)

    pipeline.run()

For more, see readthedocs

Prerequisites

aiopypes is based on pure Python (3.5+) and does not require other dependencies.

Installing

Available on PyPi here, installed with pip:

pip install aiopypes

🔧 Running the tests

To be created!

Break down into end to end tests

To be created!

And coding style tests

To be created!

🎈 Usage

Import the library

import aiopypes

Create an App object

app = aiopypes.App()

Create a trigger task

@app.task(interval=1.0)
async def every_second():
    return 1

Create downtream tasks

@app.task()
async def task_n(stream):
    async for s in stream:
        # process "s" here
        yield

Create + configure the pipeline

pipeline = every_second \
            .map(task_n)

Run the pipeline

pipeline.run()

This will run continuously until interrupted.

⛏️ Built Using

✔️ TODO

  • Extend to multithreads
  • Extend to multiprocess
  • Build visualization server
  • Add pipeline pipe functions (join, head, ...)

✍️ Authors

🎉 Acknowledgements

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aiopypes-1.1.0.tar.gz (14.9 kB view details)

Uploaded Source

Built Distribution

aiopypes-1.1.0-py3-none-any.whl (14.3 kB view details)

Uploaded Python 3

File details

Details for the file aiopypes-1.1.0.tar.gz.

File metadata

  • Download URL: aiopypes-1.1.0.tar.gz
  • Upload date:
  • Size: 14.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.11.7

File hashes

Hashes for aiopypes-1.1.0.tar.gz
Algorithm Hash digest
SHA256 62eaad0417842b18d169e12732c34caac034612c7f6969e4845e93107cde7e73
MD5 ec9aeb7a3f608cc039ce79c7b8ff5359
BLAKE2b-256 e89af85df50e58d88520ee7e47b9db8e661ed2543beddfc406a6e6646d2d9d2e

See more details on using hashes here.

File details

Details for the file aiopypes-1.1.0-py3-none-any.whl.

File metadata

  • Download URL: aiopypes-1.1.0-py3-none-any.whl
  • Upload date:
  • Size: 14.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.11.7

File hashes

Hashes for aiopypes-1.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 202ec76a0f869df80cd78c42b9e838453e07eab39eb248743ef5c634471c9074
MD5 d92ef500513afd2945b0485008971a2f
BLAKE2b-256 1f323f57c171bff60709a6d7ea329ceb6d772cd66ec5076d389a36e48d2d1391

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page