Skip to main content

Concurrent Python made simple

Project description

Pyper

Concurrent Python made simple

Test Coverage Package version Supported Python versions


Pyper is a flexible framework for concurrent and parallel data-processing, based on functional programming patterns. Used for 🔀 ETL Systems, ⚙️ Data Microservices, and 🌐 Data Collection

See the Documentation

Key features:

  • 💡Intuitive API: Easy to learn, easy to think about. Implements clean abstractions to seamlessly unify threaded, multiprocessed, and asynchronous work.
  • 🚀 Functional Paradigm: Python functions are the building blocks of data pipelines. Let's you write clean, reusable code naturally.
  • 🛡️ Safety: Hides the heavy lifting of underlying task execution and resource clean-up. No more worrying about race conditions, memory leaks, or thread-level error handling.
  • Efficiency: Designed from the ground up for lazy execution, using queues, workers, and generators.
  • Pure Python: Lightweight, with zero sub-dependencies.

Installation

Install the latest version using pip:

$ pip install python-pyper

Note that python-pyper is the pypi registered package.

Usage

In Pyper, the task decorator is used to transform functions into composable pipelines.

Let's simulate a pipeline that performs a series of transformations on some data.

import asyncio
import time

from pyper import task


def get_data(limit: int):
    for i in range(limit):
        yield i


async def step1(data: int):
    await asyncio.sleep(1)
    print("Finished async wait", data)
    return data


def step2(data: int):
    time.sleep(1)
    print("Finished sync wait", data)
    return data


def step3(data: int):
    for i in range(10_000_000):
        _ = i*i
    print("Finished heavy computation", data)
    return data


async def main():
    # Define a pipeline of tasks using `pyper.task`
    pipeline = task(get_data, branch=True) \
        | task(step1, workers=20) \
        | task(step2, workers=20) \
        | task(step3, workers=20, multiprocess=True)

    # Call the pipeline
    total = 0
    async for output in pipeline(limit=20):
        total += output
    print("Total:", total)


if __name__ == "__main__":
    asyncio.run(main())

Pyper provides an elegant abstraction of the execution of each task, allowing you to focus on building out the logical functions of your program. In the main function:

  • pipeline defines a function; this takes the parameters of its first task (get_data) and yields each output from its last task (step3)
  • Tasks are piped together using the | operator (motivated by Unix's pipe operator) as a syntactic representation of passing inputs/outputs between tasks.

In the pipeline, we are executing three different types of work:

  • task(step1, workers=20) spins up 20 asyncio.Tasks to handle asynchronous IO-bound work

  • task(step2, workers=20) spins up 20 threads to handle synchronous IO-bound work

  • task(step3, workers=20, multiprocess=True) spins up 20 processes to handle synchronous CPU-bound work

task acts as one intuitive API for unifying the execution of each different type of function.

Each task has workers that submit outputs to the next task within the pipeline via queue-based data structures; this is the mechanism underpinning how concurrency and parallelism are achieved. See the docs for a breakdown of what a pipeline looks like under the hood.


See a non-async example

Pyper pipelines are by default non-async, as long as their tasks are defined as synchronous functions. For example:

import time

from pyper import task


def get_data(limit: int):
    for i in range(limit):
        yield i

def step1(data: int):
    time.sleep(1)
    print("Finished sync wait", data)
    return data

def step2(data: int):
    for i in range(10_000_000):
        _ = i*i
    print("Finished heavy computation", data)
    return data


def main():
    pipeline = task(get_data, branch=True) \
        | task(step1, workers=20) \
        | task(step2, workers=20, multiprocess=True)
    total = 0
    for output in pipeline(limit=20):
        total += output
    print("Total:", total)


if __name__ == "__main__":
    main()

A pipeline consisting of at least one asynchronous function becomes an AsyncPipeline, which exposes the same usage API, provided async and await syntax in the obvious places. This makes it effortless to combine synchronously defined and asynchronously defined functions where need be.

Examples

To explore more of Pyper's features, see some further examples

Dependencies

Pyper is implemented in pure Python, with no sub-dependencies. It is built on top of the well-established built-in Python modules:

License

This project is licensed under the terms of the MIT license.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

python_pyper-0.4.4.tar.gz (349.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

python_pyper-0.4.4-py3-none-any.whl (18.3 kB view details)

Uploaded Python 3

File details

Details for the file python_pyper-0.4.4.tar.gz.

File metadata

  • Download URL: python_pyper-0.4.4.tar.gz
  • Upload date:
  • Size: 349.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.0.1 CPython/3.12.8

File hashes

Hashes for python_pyper-0.4.4.tar.gz
Algorithm Hash digest
SHA256 fbf9f4f5f95f8263a608770424f11558a07896a3d1b1a2699e4ba295b5100f7b
MD5 2436d3f2f0b0c535e6d46aa874fc2cba
BLAKE2b-256 18bc7e67eff590248554dfb4e325a979ccfa188c80259923cd3d255c4ac291c5

See more details on using hashes here.

Provenance

The following attestation bundles were made for python_pyper-0.4.4.tar.gz:

Publisher: publish.yml on pyper-dev/pyper

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file python_pyper-0.4.4-py3-none-any.whl.

File metadata

  • Download URL: python_pyper-0.4.4-py3-none-any.whl
  • Upload date:
  • Size: 18.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.0.1 CPython/3.12.8

File hashes

Hashes for python_pyper-0.4.4-py3-none-any.whl
Algorithm Hash digest
SHA256 f324cb2bdd0ee4153a0fb7d6011d54c12a9ea2dd3b74fd01652bcc067774ffe8
MD5 e3cb2e121a0404722b38ca3a7d3d0fe8
BLAKE2b-256 9bd2f8ebc2d7e1cd48403a8e9aa0855739c97e0fdd007448582c0dc2ca7a3915

See more details on using hashes here.

Provenance

The following attestation bundles were made for python_pyper-0.4.4-py3-none-any.whl:

Publisher: publish.yml on pyper-dev/pyper

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page