A functional programming library for Python mimicking Java Streams and JS Arrays.
Project description
fpstreams
A robust, type-safe functional programming library for Python.
fpstreams brings the power of Java Streams, Rust Results, and JavaScript Array methods to Python. It provides a fluent interface for data processing, null safety, and error handling without the boilerplate, all while remaining fully typed for IDE autocompletion.
Features
- Fluent Streams: Lazy evaluation chains (
map,filter,reduce,zip). - Structure Operations: Powerful chunking with
.batch(),.window(), and.zip_longest(). - Parallel Processing: Memory-safe multi-core distribution with
.parallel()and auto-batching. - Advanced Statistics: One-pass summary stats (
.summarizing()) and SQL-like grouping (.grouping_by(..., downstream=...)). - Clean Code Syntax: Syntactic sugar like
.pick()and.filter_none()to replace lambdas. - Data Science Ready: Convert streams directly to Pandas DataFrames, NumPy arrays, or CSV/JSON files.
- Null Safety:
Optionto eliminateNonechecks. - Error Handling:
Result(Success/Failure) to replace uglytry/exceptblocks.
Installation
pip install fpstreams
Quick Start
1. Stream Factories
Create streams directly from values, functions, or algorithmic sequences.
from fpstreams import Stream
Stream.of(1, 2, 3, 4, 5)
# seed 1, Function: x * 2 -> 1, 2, 4, 8, 16...
Stream.iterate(1, lambda x: x * 2).limit(10)
# Infinite polling (e.g., API)
Stream.generate(lambda: random.random()).limit(5)
2. Basic Processing
Replace messy loops with clean, readable pipelines.
from fpstreams import Stream, Collectors
data = ["apple", "banana", "cherry", "apricot", "blueberry"]
# Filter, transform, and group in one
result = (
Stream(data)
.filter(lambda s: s.startswith("a") or s.startswith("b"))
.map(str.upper)
.collect(Collectors.grouping_by(lambda s: s[0]))
)
# Output: {'A': ['APPLE', 'APRICOT'], 'B': ['BANANA', 'BLUEBERRY']}
3. Structure & Windowing
Process data in chunks or sliding windows—essential for time-series analysis or bulk API processing.
data = range(10)
# Batching: Process 3 items at a time
# Result: [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
Stream(data).batch(3).to_list()
# Sliding Window: View of size 3, sliding by 1
# Result: [[0, 1, 2], [1, 2, 3], [2, 3, 4]...]
Stream(data).window(size=3, step=1).to_list()
4. Clean Code Shortcuts
Stop writing repetitive lambdas for dictionaries.
users = [
{"id": 1, "name": "Alice", "role": "admin"},
{"id": 2, "name": "Bob", "role": None},
{"id": 3, "name": None, "role": "user"},
]
names = (
Stream(users)
.pick("name") # Extract "name" key
.filter_none() # Remove None values
.to_list()
)
# Output: ["Alice", "Bob"]
5. Parallel Processing
fpstreams can automatically distribute heavy workloads across all CPU cores using the .parallel() method. It uses an optimized Map-Reduce architecture to minimize memory usage.
import math
from fpstreams import Stream
def heavy_task_batch(numbers):
# Process a whole list of numbers at once (Vectorization or bulk API)
return [math.factorial(n) for n in numbers]
# Memory Efficient: "batch(100)" sends chunks to workers
# instead of pickling 10,000 individual tasks.
results = (
Stream(range(10000))
.parallel()
.batch(100)
.map(heavy_task_batch)
.to_list()
)
4. Data Science & I/O
Seamlessly integrate with the scientific stack.
# 1. One-pass Statistics (Count, Sum, Min, Max, Avg)
stats = Stream(users).collect(Collectors.summarizing(lambda u: u['age']))
print(f"Average Age: {stats.average}, Max: {stats.max}")
# 2. Advanced Grouping (SQL-style)
# Group by Dept, then Avg Salary
avg_salaries = Stream(employees).collect(
Collectors.grouping_by(
lambda e: e['dept'],
downstream=Collectors.averaging(lambda e: e['salary'])
)
)
# 3. Export
Stream(users).to_df()
Stream(users).to_csv("output.csv")
Infinite Streams & Lazy Evaluation
Process massive datasets efficiently. Operations are only executed when needed.
# Infinite stream of even numbers using .iterate()
evens = (
Stream.iterate(0, lambda n: n + 1)
.filter(lambda x: x % 2 == 0)
.limit(10)
.to_list()
)
Benchmark
Comparison between standard streams and fpstreams.parallel() on a 4-core machine:
| Task | Sequential(s) | Parallel(s) | Speedup |
|---|---|---|---|
| Heavy Calculation (Factorials) | 24.8358 | 9.5575 | 2.60x |
| I/O Simulation (Sleep) | 2.1053 | 0.8101 | 2.60x |
| Light Calculation (Multiplication) | 0.0135 | 0.3109 | 0.04x |
Note: Parallel streams have overhead. Use them for CPU-intensive tasks or slow I/O, not simple arithmetic.
Project Structure
Stream: The core wrapper for sequential data processing.ParallelStream: A multi-core wrapper for heavy parallel processing.Option: Null-safe container.Result: Error-handling container.Collectors: Accumulation utilities (grouping, joining, summary stats).
Licence
This project is licensed under the MIT License - see the LICENSE file for details.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file fpstreams-1.0.0.tar.gz.
File metadata
- Download URL: fpstreams-1.0.0.tar.gz
- Upload date:
- Size: 28.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
918ec0fcf9034190ce3dd3f7309919daef32627ec832d8754c59091fb033b089
|
|
| MD5 |
17351ba8bc3cd32fc550d85257705815
|
|
| BLAKE2b-256 |
2627a43212bdcec47aed29024ba6ea40e9ad49662205894db155d8dbcb3d22a6
|
Provenance
The following attestation bundles were made for fpstreams-1.0.0.tar.gz:
Publisher:
publish.yml on steventimes/fpstreams
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
fpstreams-1.0.0.tar.gz -
Subject digest:
918ec0fcf9034190ce3dd3f7309919daef32627ec832d8754c59091fb033b089 - Sigstore transparency entry: 781022053
- Sigstore integration time:
-
Permalink:
steventimes/fpstreams@1073cbaa09f7ca6ccb9503957418e915eaf351f2 -
Branch / Tag:
refs/tags/v1.0.0 - Owner: https://github.com/steventimes
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@1073cbaa09f7ca6ccb9503957418e915eaf351f2 -
Trigger Event:
release
-
Statement type:
File details
Details for the file fpstreams-1.0.0-py3-none-any.whl.
File metadata
- Download URL: fpstreams-1.0.0-py3-none-any.whl
- Upload date:
- Size: 25.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1feb858f6f726bec923cd48d634a2f24a3eb2654a61b176a5de094996e58eedd
|
|
| MD5 |
34b5995e10d9ab31a9ee50ae0f6c389a
|
|
| BLAKE2b-256 |
b12e0d34fac2ca1988a59414148e0053b0d62b8ef19a7936b0879ceab092d592
|
Provenance
The following attestation bundles were made for fpstreams-1.0.0-py3-none-any.whl:
Publisher:
publish.yml on steventimes/fpstreams
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
fpstreams-1.0.0-py3-none-any.whl -
Subject digest:
1feb858f6f726bec923cd48d634a2f24a3eb2654a61b176a5de094996e58eedd - Sigstore transparency entry: 781022059
- Sigstore integration time:
-
Permalink:
steventimes/fpstreams@1073cbaa09f7ca6ccb9503957418e915eaf351f2 -
Branch / Tag:
refs/tags/v1.0.0 - Owner: https://github.com/steventimes
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@1073cbaa09f7ca6ccb9503957418e915eaf351f2 -
Trigger Event:
release
-
Statement type: