Skip to main content

Readable, lazy data-transformation pipelines for Python — no more nested function calls. pip install streamchain

Project description

pipeflow

Readable, lazy data-transformation pipelines for Python — no more nested function calls.

# Before pipeflow
result = list(filter(lambda x: x > 5, map(lambda x: x ** 2, range(10))))

# With pipeflow
result = (
    Stream(range(10))
    .map(lambda x: x ** 2)
    .filter(lambda x: x > 5)
    .to(list)
)

Installation

pip install pipeflow

Quick Start

from pipeflow import Stream, pipe, to

# Method chaining style
result = (
    Stream([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
    .filter(lambda x: x % 2 == 0)
    .map(lambda x: x ** 2)
    .take(3)
    .to(list)
)
# [4, 16, 36]

# Pipe operator style (works on plain lists too)
result = (
    range(10)
    | pipe(filter, lambda x: x % 2 == 0)
    | pipe(map, lambda x: x ** 2)
    | to(list)
)
# [0, 4, 16, 36, 64]

Why pipeflow?

Python lacks a native pipe operator (|>). The workaround — nesting function calls — is hard to read:

# Hard to follow — you read inside-out
list(filter(pred, map(func, filter(other_pred, data))))

# pipeflow — you read top-to-bottom, left-to-right
Stream(data).filter(other_pred).map(func).filter(pred).to(list)

Zero dependencies. pipeflow uses only the Python standard library.
Fully lazy. Elements are computed on demand — no intermediate lists.
Fully typed. Every method ships with precise type annotations.


API Reference

Stream(iterable)

Wrap any iterable in a lazy pipeline.

Transformations

Method Description
.map(func) Apply func to every element
.filter(func) Keep elements where func is True
.flatmap(func) Map then flatten one level
.flatten() Flatten one level of nesting
.take(n) Keep the first n elements
.skip(n) Drop the first n elements
.take_while(func) Take while func is True
.drop_while(func) Drop while func is True
.enumerate(start=0) Pair elements with their index
.zip(*others) Zip with other iterables
.chain(*others) Append other iterables
.batch(n) Yield lists of n elements
.unique(key=None) Deduplicate (order-preserving)
.sort(key=None, reverse=False) Sort (forces evaluation)
.reverse() Reverse (forces evaluation)
.tap(func) Side-effects without changing elements
.starmap(func) Map with tuple unpacking
.group_by(key) Group consecutive elements

Terminals

Method Description
.to(collector) Collect into list, set, dict, tuple…
.reduce(func, initial=None) Fold to a single value
.fold(func, initial) Fold with explicit initial value
.count() Count elements
.first(default=None) First element
.last(default=None) Last element
.any(func=None) Any element satisfies func
.all(func=None) All elements satisfy func
.sum() Sum of elements
.min(key=None) Minimum element
.max(key=None) Maximum element
.for_each(func) Call func for side-effects

pipe(func, *args) and to(collector)

Use the | operator on plain lists or generators:

from pipeflow import pipe, to

result = [1, 2, 3, 4, 5] | pipe(filter, lambda x: x % 2 == 0) | to(list)
# [2, 4]

Real-world Examples

ETL Pipeline

users = [
    {"name": "Alice", "age": 30, "active": True},
    {"name": "Bob",   "age": 17, "active": True},
    {"name": "Carol", "age": 25, "active": False},
]

result = (
    Stream(users)
    .filter(lambda u: u["active"])
    .filter(lambda u: u["age"] >= 18)
    .map(lambda u: u["name"].upper())
    .sort()
    .to(list)
)
# ["ALICE"]

Word Count

text = "hello world hello python world hello"

counts = (
    Stream(text.split())
    .group_by(lambda w: w)
    .map(lambda kv: (kv[0], len(kv[1])))
    .sort(key=lambda kv: kv[1], reverse=True)
    .to(dict)
)
# {"hello": 3, "world": 2, "python": 1}

Debugging mid-pipeline

result = (
    Stream(range(20))
    .filter(lambda x: x % 3 == 0)
    .tap(lambda x: print(f"after filter: {x}"))
    .map(lambda x: x * 10)
    .to(list)
)

Batch processing

Stream(records).batch(100).for_each(bulk_insert)

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

streamchain-0.1.0.tar.gz (9.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

streamchain-0.1.0-py3-none-any.whl (8.5 kB view details)

Uploaded Python 3

File details

Details for the file streamchain-0.1.0.tar.gz.

File metadata

  • Download URL: streamchain-0.1.0.tar.gz
  • Upload date:
  • Size: 9.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for streamchain-0.1.0.tar.gz
Algorithm Hash digest
SHA256 2d68ff76037db31d761c585a1f7f04a57a73ab591c755638170e5dd72b5404c5
MD5 7f2697447c4f23c96a3c82f75927c651
BLAKE2b-256 bd987865143fe0ffb90b608919a55c92a8e3191373c7835f2081a53f53a91c4f

See more details on using hashes here.

Provenance

The following attestation bundles were made for streamchain-0.1.0.tar.gz:

Publisher: publish.yml on Sheheryar-byte/pipeflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file streamchain-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: streamchain-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 8.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for streamchain-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 06fb585675f730ff75a4c94c10874eb7c4268a39ba59df1f6dad6cf30281a358
MD5 8569650926532e93f8eed2294957b87e
BLAKE2b-256 b631dcae4e26a5cd1812c0c9a9640ec437670709d63f167293a4747d5aa13268

See more details on using hashes here.

Provenance

The following attestation bundles were made for streamchain-0.1.0-py3-none-any.whl:

Publisher: publish.yml on Sheheryar-byte/pipeflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page