Skip to main content

Stream4Py: Functional, chainable streams for Python with lazy evaluation

Project description

Stream4Py

PyPI - Version PyPI - Python Version pre-commit.ci status

A Python library inspired by Java Streams, Haskell lists, and Linux pipes, providing a powerful, lazy-evaluated Stream class for functional-style data manipulation. Stream4Py makes it easy to work with iterables, files, subprocess output, and general data pipelines.


Features

  • Lazy and eager evaluation methods.
  • Chainable operations like map, filter, flat_map, unique, enumerate, flatten, sections.
  • File I/O operations including text files, binary files, CSV, and JSON Lines (JSONL).
  • File parsing and subprocess piping with from_io and pipe.
  • Collection helpers like to_list, to_dict, to_set, and cache.
  • Built-in itertools utilities (islice, zip_longest, accumulate, etc.).
  • Inspired by Java Streams, Haskell functional programming, and Python itertools.

Installation

Install via pip:

pip install stream4py

Quick Start

from stream4py import Stream

# Create a stream
s = Stream([1, 2, 3, 4, 5])

# Lazy operations
result = (
    s.filter(lambda x: x % 2 == 0)
     .map(lambda x: x * 10)
     .unique()
)

# Convert to list (triggers evaluation)
print(result.to_list())  # [20, 40]

# File I/O operations
Stream.open("data.txt").filter(lambda x: "error" in x).for_each(print)
Stream([{"name": "Alice", "age": 30}]).to_csv("output.csv")
users = Stream.open_csv("users.csv").map(lambda row: row["name"])

# Stream lines from a file
lines = Stream.from_io(open("file.txt"))
lines.filter(lambda x: "error" in x).for_each(print)

# Subprocess streaming
Stream.subprocess_run(("seq", "100")).pipe(("grep", "1")).for_each(print)

File I/O Operations

Stream4Py provides convenient methods for working with various file formats:

Text Files

# Reading text files
content = Stream.open("input.txt").to_list()

# Writing text files
Stream(["line 1\n", "line 2\n"]).to_file("output.txt")

# Processing large files lazily
(Stream.open("large_file.txt")
    .filter(lambda line: "ERROR" in line)
    .map(str.upper)
    .to_file("errors.txt"))

Binary Files

# Reading binary files
binary_data = Stream.open_binary("data.bin").to_list()

# Processing binary content
(Stream.open_binary("image.jpg")
    .take(1024)  # First 1KB
    .to_list())

CSV Files

# Reading CSV files as dictionaries
users = Stream.open_csv("users.csv")
adult_names = users.filter(lambda row: int(row["age"]) >= 18).map(lambda row: row["name"])

# Writing CSV files from dictionaries
data = [
    {"name": "Alice", "age": 30, "city": "New York"},
    {"name": "Bob", "age": 25, "city": "London"}
]
Stream(data).to_csv("output.csv")

# Processing large CSV files efficiently
(Stream.open_csv("large_dataset.csv")
    .filter(lambda row: row["status"] == "active")
    .map(lambda row: {"id": row["id"], "score": float(row["score"]) * 1.1})
    .to_csv("processed.csv"))

JSON Lines (JSONL) Files

# Reading JSONL files
events = Stream.open_jsonl("events.jsonl")
user_events = events.filter(lambda obj: obj["type"] == "user_action")

# Type casting for better type hints
from typing import TypedDict

class Event(TypedDict):
    type: str
    user_id: int
    timestamp: str

typed_events = Stream.open_jsonl("events.jsonl").typing_cast(Event)

Working with IO Objects

import io

# From StringIO
content = io.StringIO("line1\nline2\nline3")
lines = Stream.from_io(content).to_list()

# From file handles (automatically closed)
with open("data.txt") as f:
    processed = Stream.from_io(f).map(str.strip).to_list()

Quick Reference

Method Type Description Example
map(func) Lazy Apply a function to each item Stream([1,2,3]).map(lambda x: x*2)
filter(predicate) Lazy Keep items satisfying a predicate Stream([1,2,3]).filter(lambda x: x>1)
filterfalse(predicate) Lazy Keep items for which predicate is False Stream([1,2,3]).filterfalse(lambda x: x>1)
flat_map(func) Lazy Map then flatten iterables Stream([1,2]).flat_map(lambda x: (x,x*10))
flatten() Lazy Flatten nested iterables Stream([[1,2],[3]]).flatten()
unique(key=None) Lazy Keep only unique items Stream([1,2,2]).unique()
type_is(cls) Lazy Keep items of a specific type Stream([1,'a']).type_is(int)
enumerate(start=0) Lazy Enumerate items Stream(['a','b']).enumerate(1)
peek(func) Lazy Apply function without changing items Stream([1,2]).peek(print)
islice(start, stop, step) Lazy Slice like itertools.islice Stream([1,2,3]).islice(1,3)
batched(size) Lazy Yield items in batches Stream(range(5)).batched(2)
drop(n) Lazy Drop first n items Stream([1,2,3]).drop(1)
take(n) Lazy Take first n items Stream([1,2,3]).take(2)
dropwhile(predicate) Lazy Drop items while predicate is true Stream([1,2,3]).dropwhile(lambda x: x<2)
takewhile(predicate) Lazy Take items while predicate is true Stream([1,2,3]).takewhile(lambda x: x<3)
reverse() Lazy Reverse the items Stream([1,2,3]).reverse()
zip(*iterables) Lazy Zip with other iterables Stream([1,2]).zip(['a','b'])
zip_longest(*iterables) Lazy Zip with padding Stream([1]).zip_longest([2,3])
accumulate(func=None, initial=None) Lazy Cumulative sums or function Stream([1,2,3]).accumulate()
subprocess_run(command) Lazy Run a subprocess and stream output Stream.subprocess_run(('ls',))
pipe(command) Lazy Pipe stream to subprocess Stream(['foo']).pipe(('grep','f'))
re_search(pattern) Lazy Regex search, yields matches Stream(["foo bar"]).re_search(r"foo")
extend(items) Lazy Concatenate another iterable Stream([1,2]).extend([3,4])
append(item) Lazy Append an item to the end Stream([1,2]).append(3)
prepend(item) Lazy Prepend an item to the beginning Stream([2,3]).prepend(1)
sum(start=0) Eager Sum all items Stream([1,2,3]).sum()
min(key=None, default=None) Eager Minimum value Stream([1,2,3]).min()
max(key=None, default=None) Eager Maximum value Stream([1,2,3]).max()
sorted(key=None, reverse=False) Eager Sort items Stream([3,1,2]).sorted()
first(default=None) Eager First item Stream([1,2]).first()
find(func) Eager Find first item matching function Stream([1,2,3]).find(lambda x: x>1)
group_by(key) Eager Group items by key Stream([1,2,3,4]).group_by(lambda x: x%2)
for_each(func) Eager Apply function to all items Stream([1,2]).for_each(print)
cache() Eager Cache stream items Stream(range(3)).cache()
to_list() Eager Collect as list Stream([1,2]).to_list()
to_tuple() Eager Collect as tuple Stream([1,2]).to_tuple()
to_set() Eager Collect as set Stream([1,2]).to_set()
to_dict() Eager Collect as dict (from tuples) Stream([(1,'a')]).to_dict()
count(item) Eager Count occurrences of an item Stream([1,2,2]).count(2)
collect(func) Eager Apply function to iterable Stream([1,2]).collect(sum)
collect_and_continue(func) Eager Collect result, continue as stream Stream([1,2,3]).collect_and_continue(sum)
from_io(io) Lazy Stream lines from file or binary IO Stream.from_io(open('file.txt'))
open(file) Lazy Open and stream lines from text file Stream.open('data.txt')
open_binary(file) Lazy Open and stream lines from binary file Stream.open_binary('data.bin')
open_csv(file) Lazy Open CSV file as stream of dictionaries Stream.open_csv('data.csv')
open_jsonl(file) Lazy Open JSONL file as stream of objects Stream.open_jsonl('data.jsonl')
to_file(file) Eager Write stream contents to text file Stream(['line1\n']).to_file('out.txt')
to_csv(file) Eager Write stream of dicts to CSV file Stream([{'a':1}]).to_csv('out.csv')
sections(predicate) Lazy Split into sections based on predicate Stream([1,1,2]).sections(lambda x:x==2)
range(start, stop, step=1) Lazy Stream over a range Stream.range(1,5)

Contributing

Contributions are welcome! Please open an issue or pull request with improvements or bug fixes.


License

stream4py is distributed under the terms of the MIT license.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

stream4py-0.1.4.tar.gz (30.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

stream4py-0.1.4-py3-none-any.whl (17.3 kB view details)

Uploaded Python 3

File details

Details for the file stream4py-0.1.4.tar.gz.

File metadata

  • Download URL: stream4py-0.1.4.tar.gz
  • Upload date:
  • Size: 30.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for stream4py-0.1.4.tar.gz
Algorithm Hash digest
SHA256 f15af7a82b950bb1960b875292e70eaa426629bc7fd4b9e44a1a957d2f7f2713
MD5 0b864f3ac0471802102bea615057e5f7
BLAKE2b-256 a41eaa5faee2988b008b281005c903ebf82c0c266887b58b52838da3da29b266

See more details on using hashes here.

Provenance

The following attestation bundles were made for stream4py-0.1.4.tar.gz:

Publisher: main.yaml on FlavioAmurrioCS/stream4py

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file stream4py-0.1.4-py3-none-any.whl.

File metadata

  • Download URL: stream4py-0.1.4-py3-none-any.whl
  • Upload date:
  • Size: 17.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for stream4py-0.1.4-py3-none-any.whl
Algorithm Hash digest
SHA256 0741e53c66b1fccff3a77a062e851493a3e5ec54a638ad23b6b5df97f98604ea
MD5 3ea9fea1f0235e09b543773d1fa87a60
BLAKE2b-256 6e6574f03e025ce5ef7bc910ad5eb4638e402572a8de068057a4c44943f58ee4

See more details on using hashes here.

Provenance

The following attestation bundles were made for stream4py-0.1.4-py3-none-any.whl:

Publisher: main.yaml on FlavioAmurrioCS/stream4py

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page