Stream4Py: Functional, chainable streams for Python with lazy evaluation
Project description
Stream4Py
A Python library inspired by Java Streams, Haskell lists, and Linux pipes, providing a powerful, lazy-evaluated Stream class for functional-style data manipulation. Stream4Py makes it easy to work with iterables, files, subprocess output, and general data pipelines.
Features
- Lazy and eager evaluation methods.
- Chainable operations like
map,filter,flat_map,unique,enumerate,flatten,sections. - File I/O operations including text files, binary files, CSV, and JSON Lines (JSONL).
- File parsing and subprocess piping with
from_ioandpipe. - Collection helpers like
to_list,to_dict,to_set, andcache. - Built-in itertools utilities (
islice,zip_longest,accumulate, etc.). - Inspired by Java Streams, Haskell functional programming, and Python itertools.
Installation
Install via pip:
pip install stream4py
Quick Start
from stream4py import Stream
# Create a stream
s = Stream([1, 2, 3, 4, 5])
# Lazy operations
result = (
s.filter(lambda x: x % 2 == 0)
.map(lambda x: x * 10)
.unique()
)
# Convert to list (triggers evaluation)
print(result.to_list()) # [20, 40]
# File I/O operations
Stream.open("data.txt").filter(lambda x: "error" in x).for_each(print)
Stream([{"name": "Alice", "age": 30}]).to_csv("output.csv")
users = Stream.open_csv("users.csv").map(lambda row: row["name"])
# Stream lines from a file
lines = Stream.from_io(open("file.txt"))
lines.filter(lambda x: "error" in x).for_each(print)
# Subprocess streaming
Stream.subprocess_run(("seq", "100")).pipe(("grep", "1")).for_each(print)
File I/O Operations
Stream4Py provides convenient methods for working with various file formats:
Text Files
# Reading text files
content = Stream.open("input.txt").to_list()
# Writing text files
Stream(["line 1\n", "line 2\n"]).to_file("output.txt")
# Processing large files lazily
(Stream.open("large_file.txt")
.filter(lambda line: "ERROR" in line)
.map(str.upper)
.to_file("errors.txt"))
Binary Files
# Reading binary files
binary_data = Stream.open_binary("data.bin").to_list()
# Processing binary content
(Stream.open_binary("image.jpg")
.take(1024) # First 1KB
.to_list())
CSV Files
# Reading CSV files as dictionaries
users = Stream.open_csv("users.csv")
adult_names = users.filter(lambda row: int(row["age"]) >= 18).map(lambda row: row["name"])
# Writing CSV files from dictionaries
data = [
{"name": "Alice", "age": 30, "city": "New York"},
{"name": "Bob", "age": 25, "city": "London"}
]
Stream(data).to_csv("output.csv")
# Processing large CSV files efficiently
(Stream.open_csv("large_dataset.csv")
.filter(lambda row: row["status"] == "active")
.map(lambda row: {"id": row["id"], "score": float(row["score"]) * 1.1})
.to_csv("processed.csv"))
JSON Lines (JSONL) Files
# Reading JSONL files
events = Stream.open_jsonl("events.jsonl")
user_events = events.filter(lambda obj: obj["type"] == "user_action")
# Type casting for better type hints
from typing import TypedDict
class Event(TypedDict):
type: str
user_id: int
timestamp: str
typed_events = Stream.open_jsonl("events.jsonl").typing_cast(Event)
Working with IO Objects
import io
# From StringIO
content = io.StringIO("line1\nline2\nline3")
lines = Stream.from_io(content).to_list()
# From file handles (automatically closed)
with open("data.txt") as f:
processed = Stream.from_io(f).map(str.strip).to_list()
Quick Reference
| Method | Type | Description | Example |
|---|---|---|---|
map(func) |
Lazy | Apply a function to each item | Stream([1,2,3]).map(lambda x: x*2) |
filter(predicate) |
Lazy | Keep items satisfying a predicate | Stream([1,2,3]).filter(lambda x: x>1) |
filterfalse(predicate) |
Lazy | Keep items for which predicate is False | Stream([1,2,3]).filterfalse(lambda x: x>1) |
flat_map(func) |
Lazy | Map then flatten iterables | Stream([1,2]).flat_map(lambda x: (x,x*10)) |
flatten() |
Lazy | Flatten nested iterables | Stream([[1,2],[3]]).flatten() |
unique(key=None) |
Lazy | Keep only unique items | Stream([1,2,2]).unique() |
type_is(cls) |
Lazy | Keep items of a specific type | Stream([1,'a']).type_is(int) |
enumerate(start=0) |
Lazy | Enumerate items | Stream(['a','b']).enumerate(1) |
peek(func) |
Lazy | Apply function without changing items | Stream([1,2]).peek(print) |
islice(start, stop, step) |
Lazy | Slice like itertools.islice |
Stream([1,2,3]).islice(1,3) |
batched(size) |
Lazy | Yield items in batches | Stream(range(5)).batched(2) |
drop(n) |
Lazy | Drop first n items |
Stream([1,2,3]).drop(1) |
take(n) |
Lazy | Take first n items |
Stream([1,2,3]).take(2) |
dropwhile(predicate) |
Lazy | Drop items while predicate is true | Stream([1,2,3]).dropwhile(lambda x: x<2) |
takewhile(predicate) |
Lazy | Take items while predicate is true | Stream([1,2,3]).takewhile(lambda x: x<3) |
reverse() |
Lazy | Reverse the items | Stream([1,2,3]).reverse() |
zip(*iterables) |
Lazy | Zip with other iterables | Stream([1,2]).zip(['a','b']) |
zip_longest(*iterables) |
Lazy | Zip with padding | Stream([1]).zip_longest([2,3]) |
accumulate(func=None, initial=None) |
Lazy | Cumulative sums or function | Stream([1,2,3]).accumulate() |
subprocess_run(command) |
Lazy | Run a subprocess and stream output | Stream.subprocess_run(('ls',)) |
pipe(command) |
Lazy | Pipe stream to subprocess | Stream(['foo']).pipe(('grep','f')) |
sum(start=0) |
Eager | Sum all items | Stream([1,2,3]).sum() |
min(key=None, default=None) |
Eager | Minimum value | Stream([1,2,3]).min() |
max(key=None, default=None) |
Eager | Maximum value | Stream([1,2,3]).max() |
sorted(key=None, reverse=False) |
Eager | Sort items | Stream([3,1,2]).sorted() |
first(default=None) |
Eager | First item | Stream([1,2]).first() |
find(func) |
Eager | Find first item matching function | Stream([1,2,3]).find(lambda x: x>1) |
group_by(key) |
Eager | Group items by key | Stream([1,2,3,4]).group_by(lambda x: x%2) |
for_each(func) |
Eager | Apply function to all items | Stream([1,2]).for_each(print) |
cache() |
Eager | Cache stream items | Stream(range(3)).cache() |
to_list() |
Eager | Collect as list | Stream([1,2]).to_list() |
to_tuple() |
Eager | Collect as tuple | Stream([1,2]).to_tuple() |
to_set() |
Eager | Collect as set | Stream([1,2]).to_set() |
to_dict() |
Eager | Collect as dict (from tuples) | Stream([(1,'a')]).to_dict() |
collect(func) |
Eager | Apply function to iterable | Stream([1,2]).collect(sum) |
from_io(io) |
Lazy | Stream lines from file or binary IO | Stream.from_io(open('file.txt')) |
open(file) |
Lazy | Open and stream lines from text file | Stream.open('data.txt') |
open_binary(file) |
Lazy | Open and stream lines from binary file | Stream.open_binary('data.bin') |
open_csv(file) |
Lazy | Open CSV file as stream of dictionaries | Stream.open_csv('data.csv') |
open_jsonl(file) |
Lazy | Open JSONL file as stream of objects | Stream.open_jsonl('data.jsonl') |
to_file(file) |
Eager | Write stream contents to text file | Stream(['line1\n']).to_file('out.txt') |
to_csv(file) |
Eager | Write stream of dicts to CSV file | Stream([{'a':1}]).to_csv('out.csv') |
sections(predicate) |
Lazy | Split into sections based on predicate | Stream([1,1,2]).sections(lambda x:x==2) |
range(start, stop, step=1) |
Lazy | Stream over a range | Stream.range(1,5) |
Contributing
Contributions are welcome! Please open an issue or pull request with improvements or bug fixes.
License
stream4py is distributed under the terms of the MIT license.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file stream4py-0.1.2.tar.gz.
File metadata
- Download URL: stream4py-0.1.2.tar.gz
- Upload date:
- Size: 21.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
32b64547f1c0193fde0166d26307daf299703724bdf1b4cc6b2b67c0e0ec0261
|
|
| MD5 |
b3e1782e528d7b7aee5c5552e501e0d8
|
|
| BLAKE2b-256 |
da97830bda555691c5896445e931c4af686adc8ceba87e3071fecf3ec1e94cca
|
Provenance
The following attestation bundles were made for stream4py-0.1.2.tar.gz:
Publisher:
main.yaml on FlavioAmurrioCS/stream4py
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
stream4py-0.1.2.tar.gz -
Subject digest:
32b64547f1c0193fde0166d26307daf299703724bdf1b4cc6b2b67c0e0ec0261 - Sigstore transparency entry: 543094105
- Sigstore integration time:
-
Permalink:
FlavioAmurrioCS/stream4py@03f5e45acd169cee581d8c5b28384700c23b238a -
Branch / Tag:
refs/tags/v0.1.2 - Owner: https://github.com/FlavioAmurrioCS
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
main.yaml@03f5e45acd169cee581d8c5b28384700c23b238a -
Trigger Event:
push
-
Statement type:
File details
Details for the file stream4py-0.1.2-py3-none-any.whl.
File metadata
- Download URL: stream4py-0.1.2-py3-none-any.whl
- Upload date:
- Size: 13.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
171832cbee1d49afd82628ff9063e1e5de465569b3f3dbb7c55eb2bb91db7952
|
|
| MD5 |
5de83d337013dee03c46c09a0654889f
|
|
| BLAKE2b-256 |
4c38f2649e77069c20645d9bbe00be09c80f5aa63fb79558f1ff758929db3c5e
|
Provenance
The following attestation bundles were made for stream4py-0.1.2-py3-none-any.whl:
Publisher:
main.yaml on FlavioAmurrioCS/stream4py
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
stream4py-0.1.2-py3-none-any.whl -
Subject digest:
171832cbee1d49afd82628ff9063e1e5de465569b3f3dbb7c55eb2bb91db7952 - Sigstore transparency entry: 543094107
- Sigstore integration time:
-
Permalink:
FlavioAmurrioCS/stream4py@03f5e45acd169cee581d8c5b28384700c23b238a -
Branch / Tag:
refs/tags/v0.1.2 - Owner: https://github.com/FlavioAmurrioCS
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
main.yaml@03f5e45acd169cee581d8c5b28384700c23b238a -
Trigger Event:
push
-
Statement type: