Skip to main content

Scalable async pipelines in Python, made easy

Project description

p-y-p-e-s

(pronounced "pipes")

Status GitHub Issues GitHub Pull Requests License


Scalable async pipelines in Python, made easy.

📝 Table of Contents

🧐 About

This package is designed to make building asynchronous streams that balance and scale automatically easy. Built on pure Python -- no dependencies -- this framework can be used for variable, decoupled, task-based workloads like web scraping, database management operations, and more. Scale this out-of-the-box, with minimal hardware and coding, to process 10k+/s on production loads.

Simple pipelines

import pypes

app = pypes.App()

@app.task(interval=1.0)
async def every_second():
    return datetime.utcnow()

@app.task()
async def task1(stream):
    async for s in stream:
        print(f"streaming from task1: {s}")
        yield obj

if __name__ == '__main__':

    pipeline = every_second \
               .map(task1)

    pipeline.run()

To scaled pipelines

import pypes
import aiohttp

app = pypes.App()

@app.task(interval=0.1)
async def every_second():
    return "http://www.google.com"

@app.task(scaler=pypes.scale.TanhTaskScaler()) #  this scales workers automatically to consume incoming requests
async def task1(stream):
    async for s in stream:
        yield await aiohttp.get(s)

async def task2(stream):
    async for s in stream:
        if s.response_code != 200:
            print("failed request: {s}")
        yield

if __name__ == '__main__':

    pipeline = every_second \
               .map(task1)
               .reduce(task2)

    pipeline.run()

🏁 Getting Started

Start with a simple pipeline, and build out from there!

import pypes

app = pypes.App()

@app.task(interval=1.0)
async def every_second():
    return datetime.utcnow()

@app.task()
async def task1(stream):
    async for s in stream:
        print(f"streaming from task1: {s}")
        yield obj

if __name__ == '__main__':

    pipeline = every_second \
               .map(task1)

    pipeline.run()

For more, see readthedocs

Prerequisites

pypes is based on pure Python (3.5+) and does not require other dependencies.

Installing

pip install pypes

🔧 Running the tests

To be created!

Break down into end to end tests

To be created!

And coding style tests

To be created!

🎈 Usage

Import the library

import pypes

Create an App object

app = pypes.App()

Create a trigger task

@app.task(interval=1.0)
async def every_second():
    return 1

Create downtream tasks

@app.task()
async def task_n(stream):
    async for s in stream:
        # process "s" here
        yield

Create + configure the pipeline

pipeline = every_second \
            .map(task_n)

Run the pipeline

pipeline.run()

This will run continuously until interrupted.

⛏️ Built Using

✔️ TODO

  • Extend to multithreads
  • Extend to multiprocess
  • Build visualization server
  • Add pipeline pipe functions (join, head, ...)

✍️ Authors

🎉 Acknowledgements

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

asyncio-pypes-0.0.4.tar.gz (12.5 kB view details)

Uploaded Source

Built Distribution

asyncio_pypes-0.0.4-py3-none-any.whl (14.3 kB view details)

Uploaded Python 3

File details

Details for the file asyncio-pypes-0.0.4.tar.gz.

File metadata

  • Download URL: asyncio-pypes-0.0.4.tar.gz
  • Upload date:
  • Size: 12.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.9.0

File hashes

Hashes for asyncio-pypes-0.0.4.tar.gz
Algorithm Hash digest
SHA256 87dc6d9228b6ea2090dbe455d2c9ecf9e5d421b567f6ef1a187e508520a76f79
MD5 cfb382d5859fd1c10b1ae9d81db60a53
BLAKE2b-256 e3f7f065d4ad8eb6e38fbdb7727b0d5d02f1e6481b0f103510768c4884c9271e

See more details on using hashes here.

File details

Details for the file asyncio_pypes-0.0.4-py3-none-any.whl.

File metadata

File hashes

Hashes for asyncio_pypes-0.0.4-py3-none-any.whl
Algorithm Hash digest
SHA256 42dc8b55ec15c5f57b667aac92e28cf2ed376cfcf867bf64d56f1497ce3b783d
MD5 c2cb0a9346b5c576fc0ccb87f8717839
BLAKE2b-256 d6d19884e65304074217107c778cc50c1e9fd71902565f22801bef74026fc11b

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page