Skip to main content

A functional programming library for Python mimicking Java Streams and JS Arrays.

Project description

fpstreams

Build Status License: MIT PyPI version

A robust, type-safe functional programming library for Python.

fpstreams brings the power of Java Streams, Rust Results, and JavaScript Array methods to Python. It provides a fluent interface for data processing, null safety, and error handling without the boilerplate, all while remaining fully typed for IDE autocompletion.

Features

  • Fluent Streams: Lazy evaluation chains (map, filter, reduce, zip).
  • Structure Operations: Powerful chunking with .batch(), .window(), and .zip_longest().
  • Parallel Processing: Memory-safe multi-core distribution with .parallel() and auto-batching.
  • Advanced Statistics: One-pass summary stats (.summarizing()) and SQL-like grouping (.grouping_by(..., downstream=...)).
  • Clean Code Syntax: Syntactic sugar like .pick() and .filter_none() to replace lambdas.
  • Data Science Ready: Convert streams directly to Pandas DataFrames, NumPy arrays, or CSV/JSON files.
  • Null Safety: Option to eliminate None checks.
  • Error Handling: Result (Success/Failure) to replace ugly try/except blocks.

Installation

pip install fpstreams

Optional Rust Acceleration

Some list-based operations (like distinct and batch) can use a Rust extension when available. Build the extension from the rust/ directory with your preferred tooling (maturin or a custom build pipeline) and the Python runtime will automatically pick it up if fpstreams_rust is importable.

Quick Start

1. Stream Factories

Create streams directly from values, functions, or algorithmic sequences.

from fpstreams import Stream

Stream.of(1, 2, 3, 4, 5)

# seed 1, Function: x * 2 -> 1, 2, 4, 8, 16...
Stream.iterate(1, lambda x: x * 2).limit(10)

# Infinite polling (e.g., API)
Stream.generate(lambda: random.random()).limit(5)

2. Basic Processing

Replace messy loops with clean, readable pipelines.

from fpstreams import Stream, Collectors

data = ["apple", "banana", "cherry", "apricot", "blueberry"]

# Filter, transform, and group in one
result = (
    Stream(data)
    .filter(lambda s: s.startswith("a") or s.startswith("b"))
    .map(str.upper)
    .collect(Collectors.grouping_by(lambda s: s[0]))
)
# Output: {'A': ['APPLE', 'APRICOT'], 'B': ['BANANA', 'BLUEBERRY']}

3. Structure & Windowing

Process data in chunks or sliding windows—essential for time-series analysis or bulk API processing.

data = range(10)

# Batching: Process 3 items at a time
# Result: [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
Stream(data).batch(3).to_list()

# Sliding Window: View of size 3, sliding by 1
# Result: [[0, 1, 2], [1, 2, 3], [2, 3, 4]...]
Stream(data).window(size=3, step=1).to_list()

4. Clean Code Shortcuts

Stop writing repetitive lambdas for dictionaries.

users = [
    {"id": 1, "name": "Alice", "role": "admin"},
    {"id": 2, "name": "Bob", "role": None},
    {"id": 3, "name": None, "role": "user"},
]

names = (
    Stream(users)
    .pick("name")      # Extract "name" key
    .filter_none()     # Remove None values
    .to_list()
)
# Output: ["Alice", "Bob"]

5. Parallel Processing

fpstreams can automatically distribute heavy workloads across all CPU cores using the .parallel() method. It uses an optimized Map-Reduce architecture to minimize memory usage.

import math
from fpstreams import Stream

def heavy_task_batch(numbers):
    # Process a whole list of numbers at once (Vectorization or bulk API)
    return [math.factorial(n) for n in numbers]

# Memory Efficient: "batch(100)" sends chunks to workers
# instead of pickling 10,000 individual tasks.
results = (
    Stream(range(10000))
    .parallel()
    .batch(100) 
    .map(heavy_task_batch)
    .to_list()
)

4. Data Science & I/O

Seamlessly integrate with the scientific stack.

# 1. One-pass Statistics (Count, Sum, Min, Max, Avg)
stats = Stream(users).collect(Collectors.summarizing(lambda u: u['age']))
print(f"Average Age: {stats.average}, Max: {stats.max}")

# 2. Advanced Grouping (SQL-style)
# Group by Dept, then Avg Salary
avg_salaries = Stream(employees).collect(
    Collectors.grouping_by(
        lambda e: e['dept'],
        downstream=Collectors.averaging(lambda e: e['salary'])
    )
)

# 3. Export
Stream(users).to_df()
Stream(users).to_csv("output.csv")

Infinite Streams & Lazy Evaluation

Process massive datasets efficiently. Operations are only executed when needed.

# Infinite stream of even numbers using .iterate()
evens = (
    Stream.iterate(0, lambda n: n + 1)
    .filter(lambda x: x % 2 == 0)
    .limit(10)
    .to_list()
)

Benchmark

Comparison between standard streams and fpstreams.parallel() on a 4-core machine:

Task Sequential(s) Parallel(s) Speedup
Heavy Calculation (Factorials) 24.8358 9.5575 2.60x
I/O Simulation (Sleep) 2.1053 0.8101 2.60x
Light Calculation (Multiplication) 0.0135 0.3109 0.04x

Note: Parallel streams have overhead. Use them for CPU-intensive tasks or slow I/O, not simple arithmetic.

Project Structure

  • Stream: The core wrapper for sequential data processing.
  • ParallelStream: A multi-core wrapper for heavy parallel processing.
  • Option: Null-safe container.
  • Result: Error-handling container.
  • Collectors: Accumulation utilities (grouping, joining, summary stats).

Functional Coverage & Roadmap

fpstreams already delivers composable pipelines, collectors, and Option/Result containers, but there are a few areas worth extending (additional combinators, richer statistics, and more ergonomic Option/Result helpers). A longer-term path is to introduce an optional Rust extension to accelerate numeric-heavy collectors and parallel operations while keeping the Python-first API intact. See the roadmap for details: docs/roadmap.md.

Licence

This project is licensed under the MIT License - see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fpstreams-1.0.1.tar.gz (30.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fpstreams-1.0.1-py3-none-any.whl (26.5 kB view details)

Uploaded Python 3

File details

Details for the file fpstreams-1.0.1.tar.gz.

File metadata

  • Download URL: fpstreams-1.0.1.tar.gz
  • Upload date:
  • Size: 30.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for fpstreams-1.0.1.tar.gz
Algorithm Hash digest
SHA256 824ac3f9adaa3443489e17ebc7c46be97c760aed6ccb936cc8fb12cd34d74d08
MD5 bd7ea5dde6d5d0edab282e70514ce48b
BLAKE2b-256 5a50a263e4a89c80a1078c249cb2f5dc91436e5b2ed3a23cb6a251d89b4140a9

See more details on using hashes here.

Provenance

The following attestation bundles were made for fpstreams-1.0.1.tar.gz:

Publisher: publish.yml on steventimes/fpstreams

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file fpstreams-1.0.1-py3-none-any.whl.

File metadata

  • Download URL: fpstreams-1.0.1-py3-none-any.whl
  • Upload date:
  • Size: 26.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for fpstreams-1.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 238ec260fc03b9d30d6c5c148720454fe2c414e0ea0291977fc49cdf7a8544c2
MD5 ff607cd98cff8ac4c874a8c4ca371c50
BLAKE2b-256 58c00e464a075aeb94210836abef9356f63e7bbb7aa716424afda7d40aeea2f1

See more details on using hashes here.

Provenance

The following attestation bundles were made for fpstreams-1.0.1-py3-none-any.whl:

Publisher: publish.yml on steventimes/fpstreams

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page