Skip to main content

A functional programming library for Python mimicking Java Streams and JS Arrays.

Project description

fpstreams

Build Status License: MIT PyPI version

A robust, type-safe functional programming library for Python.

fpstreams brings the power of Java Streams, Rust Results, and JavaScript Array methods to Python. It provides a fluent interface for data processing, null safety, and error handling without the boilerplate, all while remaining fully typed for IDE autocompletion.

Features

  • Fluent Streams: Lazy evaluation chains (map, filter, reduce, zip).
  • Structure Operations: Powerful chunking with .batch(), .window(), and .zip_longest().
  • Parallel Processing: Memory-safe multi-core distribution with .parallel() and auto-batching.
  • Advanced Statistics: One-pass summary stats (.summarizing()) and SQL-like grouping (.grouping_by(..., downstream=...)).
  • Clean Code Syntax: Syntactic sugar like .pick() and .filter_none() to replace lambdas.
  • Data Science Ready: Convert streams directly to Pandas DataFrames, NumPy arrays, or CSV/JSON files.
  • Null Safety: Option to eliminate None checks.
  • Error Handling: Result (Success/Failure) to replace ugly try/except blocks.

Installation

pip install fpstreams

Quick Start

1. Stream Factories

Create streams directly from values, functions, or algorithmic sequences.

from fpstreams import Stream

Stream.of(1, 2, 3, 4, 5)

# seed 1, Function: x * 2 -> 1, 2, 4, 8, 16...
Stream.iterate(1, lambda x: x * 2).limit(10)

# Infinite polling (e.g., API)
Stream.generate(lambda: random.random()).limit(5)

2. Basic Processing

Replace messy loops with clean, readable pipelines.

from fpstreams import Stream, Collectors

data = ["apple", "banana", "cherry", "apricot", "blueberry"]

# Filter, transform, and group in one
result = (
    Stream(data)
    .filter(lambda s: s.startswith("a") or s.startswith("b"))
    .map(str.upper)
    .collect(Collectors.grouping_by(lambda s: s[0]))
)
# Output: {'A': ['APPLE', 'APRICOT'], 'B': ['BANANA', 'BLUEBERRY']}

3. Structure & Windowing

Process data in chunks or sliding windows—essential for time-series analysis or bulk API processing.

data = range(10)

# Batching: Process 3 items at a time
# Result: [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
Stream(data).batch(3).to_list()

# Sliding Window: View of size 3, sliding by 1
# Result: [[0, 1, 2], [1, 2, 3], [2, 3, 4]...]
Stream(data).window(size=3, step=1).to_list()

4. Clean Code Shortcuts

Stop writing repetitive lambdas for dictionaries.

users = [
    {"id": 1, "name": "Alice", "role": "admin"},
    {"id": 2, "name": "Bob", "role": None},
    {"id": 3, "name": None, "role": "user"},
]

names = (
    Stream(users)
    .pick("name")      # Extract "name" key
    .filter_none()     # Remove None values
    .to_list()
)
# Output: ["Alice", "Bob"]

5. Parallel Processing

fpstreams can automatically distribute heavy workloads across all CPU cores using the .parallel() method. It uses an optimized Map-Reduce architecture to minimize memory usage.

import math
from fpstreams import Stream

def heavy_task_batch(numbers):
    # Process a whole list of numbers at once (Vectorization or bulk API)
    return [math.factorial(n) for n in numbers]

# Memory Efficient: "batch(100)" sends chunks to workers
# instead of pickling 10,000 individual tasks.
results = (
    Stream(range(10000))
    .parallel()
    .batch(100) 
    .map(heavy_task_batch)
    .to_list()
)

4. Data Science & I/O

Seamlessly integrate with the scientific stack.

# 1. One-pass Statistics (Count, Sum, Min, Max, Avg)
stats = Stream(users).collect(Collectors.summarizing(lambda u: u['age']))
print(f"Average Age: {stats.average}, Max: {stats.max}")

# 2. Advanced Grouping (SQL-style)
# Group by Dept, then Avg Salary
avg_salaries = Stream(employees).collect(
    Collectors.grouping_by(
        lambda e: e['dept'],
        downstream=Collectors.averaging(lambda e: e['salary'])
    )
)

# 3. Export
Stream(users).to_df()
Stream(users).to_csv("output.csv")

Infinite Streams & Lazy Evaluation

Process massive datasets efficiently. Operations are only executed when needed.

# Infinite stream of even numbers using .iterate()
evens = (
    Stream.iterate(0, lambda n: n + 1)
    .filter(lambda x: x % 2 == 0)
    .limit(10)
    .to_list()
)

Benchmark

Comparison between standard streams and fpstreams.parallel() on a 4-core machine:

Task Sequential(s) Parallel(s) Speedup
Heavy Calculation (Factorials) 24.8358 9.5575 2.60x
I/O Simulation (Sleep) 2.1053 0.8101 2.60x
Light Calculation (Multiplication) 0.0135 0.3109 0.04x

Note: Parallel streams have overhead. Use them for CPU-intensive tasks or slow I/O, not simple arithmetic.

Project Structure

  • Stream: The core wrapper for sequential data processing.
  • ParallelStream: A multi-core wrapper for heavy parallel processing.
  • Option: Null-safe container.
  • Result: Error-handling container.
  • Collectors: Accumulation utilities (grouping, joining, summary stats).

Licence

This project is licensed under the MIT License - see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fpstreams-0.4.0.tar.gz (20.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fpstreams-0.4.0-py3-none-any.whl (18.6 kB view details)

Uploaded Python 3

File details

Details for the file fpstreams-0.4.0.tar.gz.

File metadata

  • Download URL: fpstreams-0.4.0.tar.gz
  • Upload date:
  • Size: 20.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for fpstreams-0.4.0.tar.gz
Algorithm Hash digest
SHA256 6d7fc3315f086fdc0cd694113ce1315da1b753f10c3c4af5a97bd1c7818b7c2a
MD5 05e8854739e5cb6271af84a58eb86aef
BLAKE2b-256 4f8b1f4bba87c2949c7408a209de64cea141ba66c8bff0c141c2f276ba2c6e4d

See more details on using hashes here.

Provenance

The following attestation bundles were made for fpstreams-0.4.0.tar.gz:

Publisher: publish.yml on steventimes/fpstreams

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file fpstreams-0.4.0-py3-none-any.whl.

File metadata

  • Download URL: fpstreams-0.4.0-py3-none-any.whl
  • Upload date:
  • Size: 18.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for fpstreams-0.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 b13fe3981519ff6718090818acdb310444d75789e71067dd9133269b4982bc56
MD5 25905107f24b786ed473f99672514def
BLAKE2b-256 a2c67e59be3bd7b7595dff5b14ebccab6dca7dee408878753d3ded92cfe2d030

See more details on using hashes here.

Provenance

The following attestation bundles were made for fpstreams-0.4.0-py3-none-any.whl:

Publisher: publish.yml on steventimes/fpstreams

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page