Skip to main content

No project description provided

Project description

Pypeln

Pypeln (pronounced as "pypeline") is a simple yet powerful python library for creating concurrent data pipelines.

  • Simplicity: Pypeln was designed to solve medium data tasks that require parallelism and concurrency but where using frameworks like Spark or Dask feels exaggerated or unnatural.
  • Easy-to-use: Pypeln exposes a familiar functional API compatible with regular Python code.
  • Flexible: Pypeln enables you to build pipelines using Processes, Threads and asyncio.Tasks via the exact same API.
  • Fine-grained Control: Pypeln allows you to have control over the memory and cpu resources used at each stage of your pipeline.

Installation

Install Pypeln using pip:

pip install pypeln

Basic Usage

With Pypeln you can easily create multi-stage data pipelines using 3 type of workers:

Processes

You can create a pipeline based on multiprocessing.Process workers by using the process module:

import pypeln as pl
import time
from random import random

def slow_add1(x):
    time.sleep(random()) # <= some slow computation
    return x + 1

def slow_gt3(x):
    time.sleep(random()) # <= some slow computation
    return x > 3

data = range(10) # [0, 1, 2, ..., 9] 

stage = pl.process.map(slow_add1, data, workers = 3, maxsize = 4)
stage = pl.process.filter(slow_gt3, stage, workers = 2)

data = list(stage) # e.g. [5, 6, 9, 4, 8, 10, 7]

At each stage the you can specify the numbers of workers. The maxsize parameter limits the maximum amount of elements that the stage can hold simultaneously.

Threads

You can create a pipeline based on threading.Thread workers by using the thread module:

import pypeln as pl
import time
from random import random

def slow_add1(x):
    time.sleep(random()) # <= some slow computation
    return x + 1

def slow_gt3(x):
    time.sleep(random()) # <= some slow computation
    return x > 3

data = range(10) # [0, 1, 2, ..., 9] 

stage = pl.thread.map(slow_add1, data, workers = 3, maxsize = 4)
stage = pl.thread.filter(slow_gt3, stage, workers = 2)

data = list(stage) # e.g. [5, 6, 9, 4, 8, 10, 7]

Here we have the exact same situation as in the previous case except that the worker are Threads.

Tasks

You can create a pipeline based on asyncio.Task workers by using the task module:

import pypeln as pl
import asyncio
from random import random

async def slow_add1(x):
    await asyncio.sleep(random()) # <= some slow computation
    return x + 1

async def slow_gt3(x):
    await asyncio.sleep(random()) # <= some slow computation
    return x > 3

data = range(10) # [0, 1, 2, ..., 9] 

stage = pl.task.map(slow_add1, data, workers = 3, maxsize = 4)
stage = pl.task.filter(slow_gt3, stage, workers = 2)

data = list(stage) # e.g. [5, 6, 9, 4, 8, 10, 7]

Conceptually similar but everything is running in a single thread and Task workers are created dynamically.

Mixed Pipelines

You can create pipelines using different worker types such that each type is the best for its given task so you can get the maximum performance out of your code:

data = get_iterable()
data = pl.task.map(f1, data, workers = 100)
data = pl.thread.flat_map(f2, data, workers = 10)
data = filter(f3, data)
data = pl.process.map(f4, data, workers = 5, maxsize = 200)

Notice that here we even used a regular python filter, since stages are iterables Pypeln integrates smoothly with any python code, just be aware of how each stage behaves.

Pipe Operator

In the spirit of being a true pipeline library, Pypeln also lets you create your pipelines using the pipe | operator:

data = (
    range(10)
    | pl.process.map(slow_add1, workers = 3, maxsize = 4)
    | pl.process.filter(slow_gt3, workers = 2)
    | list
)

Benchmarks

Related Stuff

Contributors

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pypeln-0.2.0.tar.gz (33.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pypeln-0.2.0-py3-none-any.whl (44.0 kB view details)

Uploaded Python 3

File details

Details for the file pypeln-0.2.0.tar.gz.

File metadata

  • Download URL: pypeln-0.2.0.tar.gz
  • Upload date:
  • Size: 33.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.0.3 CPython/3.7.6 Linux/5.3.0-7625-generic

File hashes

Hashes for pypeln-0.2.0.tar.gz
Algorithm Hash digest
SHA256 55719f8b9bb2d92b6978a562b91e23abbe2b512f7ab420a5b2258dc1ea1dbbf4
MD5 eae9f67d5d6cbaeda4a313284f06751c
BLAKE2b-256 31b2177c53d15367a51aea8bf0383555e429642b18d82b1ef1b5fc337f9dbe22

See more details on using hashes here.

File details

Details for the file pypeln-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: pypeln-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 44.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.0.3 CPython/3.7.6 Linux/5.3.0-7625-generic

File hashes

Hashes for pypeln-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 12b21a0d49dc0ebe0737c06105dc40524d9e337a340b5d0c69b3f40b55d97b4d
MD5 4db74daedaf59777d248f702cd3fa9ea
BLAKE2b-256 ff0195bb9be5987ae6fde94b097b0355ecd7f65843bf09deed9b0a78a9668cf9

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page