Skip to main content

Scalable async pipelines in Python, made easy

Project description

p-y-p-e-s

(pronounced "pipes")

Status GitHub Issues GitHub Pull Requests License


Scalable async pipelines in Python, made easy.

📝 Table of Contents

🧐 About

This package is designed to make building asynchronous streams that balance and scale automatically easy. Built on pure Python -- no dependencies -- this framework can be used for variable, decoupled, task-based workloads like web scraping, database management operations, and more. Scale this out-of-the-box, with minimal hardware and coding, to process 10k+/s on production loads.

Simple pipelines

import pypes

app = pypes.App()

@app.task(interval=1.0)
async def every_second():
    return datetime.utcnow()

@app.task()
async def task1(stream):
    async for s in stream:
        print(f"streaming from task1: {s}")
        yield obj

if __name__ == '__main__':

    pipeline = every_second \
               .map(task1)

    pipeline.run()

To scaled pipelines

import pypes
import aiohttp

app = pypes.App()

@app.task(interval=0.1)
async def every_second():
    return "http://www.google.com"

@app.task(scaler=pypes.scale.TanhTaskScaler()) #  this scales workers automatically to consume incoming requests
async def task1(stream):
    async for s in stream:
        yield await aiohttp.get(s)

async def task2(stream):
    async for s in stream:
        if s.response_code != 200:
            print("failed request: {s}")
        yield

if __name__ == '__main__':

    pipeline = every_second \
               .map(task1)
               .reduce(task2)

    pipeline.run()

🏁 Getting Started

Start with a simple pipeline, and build out from there!

import pypes

app = pypes.App()

@app.task(interval=1.0)
async def every_second():
    return datetime.utcnow()

@app.task()
async def task1(stream):
    async for s in stream:
        print(f"streaming from task1: {s}")
        yield obj

if __name__ == '__main__':

    pipeline = every_second \
               .map(task1)

    pipeline.run()

For more, see readthedocs

Prerequisites

pypes is based on pure Python (3.5+) and does not require other dependencies.

Installing

pip install pypes

🔧 Running the tests

To be created!

Break down into end to end tests

To be created!

And coding style tests

To be created!

🎈 Usage

Import the library

import pypes

Create an App object

app = pypes.App()

Create a trigger task

@app.task(interval=1.0)
async def every_second():
    return 1

Create downtream tasks

@app.task()
async def task_n(stream):
    async for s in stream:
        # process "s" here
        yield

Create + configure the pipeline

pipeline = every_second \
            .map(task_n)

Run the pipeline

pipeline.run()

This will run continuously until interrupted.

⛏️ Built Using

✔️ TODO

  • Extend to multithreads
  • Extend to multiprocess
  • Build visualization server
  • Add pipeline pipe functions (join, head, ...)

✍️ Authors

🎉 Acknowledgements

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

asyncio-pypes-0.0.4.tar.gz (12.5 kB view hashes)

Uploaded Source

Built Distribution

asyncio_pypes-0.0.4-py3-none-any.whl (14.3 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page