Skip to main content

Zero-allocation lazy query pipelines for Python, powered by Rust

Project description

ZPyFlow

Zero-allocation lazy query pipelines for Python, powered by Rust.

  • Lazy & fused — filter + map + take run in a single pass with no intermediate lists
  • SIMD-accelerated — float/int arrays execute in Rust with the GIL released, using f64x4
  • Expression DSLcol > 5 eliminates Python callbacks entirely
  • Python-friendly — numpy, pandas, dataclasses, plain lists, and generators all work as input
  • Parallel execution.parallel() enables Rayon work-stealing

ZPyFlow is not a DataFrame engine. It lets Python sequence hot paths — list[float], numpy arrays, dict record streams — run as fused Rust pipelines without moving the surrounding codebase into a tabular data model. See § 13 ZPyFlow vs Polars for the product-choice boundary.


Installation

pip install zpyflow

Optional extras:

pip install zpyflow[numpy]   # NumPy integration
pip install zpyflow[arrow]   # PyArrow integration

Development install (from source)

pip install maturin
git clone https://github.com/bigbangdash/zpyflow
cd zpyflow
maturin develop --release

Development (Make + Docker)

All development tasks run inside Docker — no local Rust or Python setup needed beyond Docker itself.

Quick reference

Command What it does
make dc-test Build + run Python unit tests
make dc-test-k K="f64" Run tests matching keyword f64
make dc-bench Build + run all benchmark suites
make dc-bench-filter filter benchmarks only
make dc-bench-chained chained pipeline benchmarks only
make dc-bench-agg aggregation benchmarks (vs numpy / pandas / polars)
make dc-bench-numpy numpy comparison benchmarks
make dc-bench-objects Python object (dict / dataclass) benchmarks
make dc-bench-vector vector search — top-K early stopping
make dc-bench-ml ML feature preprocessing pipeline
make dc-bench-etl ETL multi-stat aggregation (vs Polars / Pandas)
make dc-bench-fraud fraud / risk scoring — review queue, exposure sum
make dc-bench-groupby GroupBy and pagination (object path)
make dc-bench-null null-mixed list benchmarks (None handling)
make dc-bench-rust Rust (Criterion) benchmarks
make dc-bench-save Save current results as baseline
make dc-bench-compare Compare against saved baseline (fails on >10% regression)
make dc-shell Interactive shell inside the container
make dc-image Rebuild the Docker image
make dc-clean Remove named volumes (reset Cargo + target cache)

Typical workflow

# First time: build the image (takes a few minutes — downloads Rust toolchain)
make dc-image

# Edit Rust source, then test
make dc-test

# Measure performance against all libraries
make dc-bench-agg

# Run a single aggregation benchmark suite and compare against baseline
make dc-bench-save             # save current run as baseline
# ... make changes ...
make dc-bench-compare          # fails if mean regresses by >10%

# Open a shell to inspect the built extension
make dc-shell
(inside) python -c "from zpyflow import Query, col; print(Query([1,2,3]).filter(col>1).to_list())"

Local build (requires Rust + maturin on host)

make build          # maturin develop --release
make test           # build + pytest
make bench          # build + all Python benchmarks
make bench-rust     # cargo bench
make lint           # cargo clippy
make fmt            # cargo fmt + ruff format
make clean          # remove build artifacts

Contents

  1. Basic usage
  2. Expression DSL vs lambda
  3. numpy
  4. pandas
  5. Dataclasses and custom objects
  6. Dict records and log processing
  7. GroupBy and aggregation
  8. CSV and JSON Lines streaming
  9. AI, embeddings, and LangChain / LangGraph
  10. Parallel execution
  11. Full API reference
  12. Performance
  13. ZPyFlow vs Polars

1. Basic usage

from zpyflow import Query, col

data = [1.5, -2.3, 0.7, 4.1, -0.5, 3.8, -1.1, 2.2]

result = (
    Query(data)
        .filter(col > 0)      # keep positive values
        .map(col * 2.0)       # double them
        .take(4)              # first 4
        .to_list()
)
# → [3.0, 1.4, 8.2, 7.6]

# Aggregations
total = Query(data).filter(col > 0).sum()    # 12.3
count = Query(data).filter(col > 0).count()  # 5
vmax  = Query(data).max()                    # 4.1
vmin  = Query(data).filter(col > 0).min()    # 0.7

first_positive = Query(data).filter(col > 0).first()  # 1.5
last_positive  = Query(data).filter(col > 0).last()   # 3.8

2. Expression DSL vs lambda

ZPyFlow supports two styles. Choose based on your data type.

Expression DSL — recommended for numeric data (GIL released, SIMD)

from zpyflow import col

(
    Query(data)
        .filter(col > 0)              # FilterGt     → Rust, SIMD
        .filter(col.between(0, 10))   # FilterBetween → Rust, SIMD
        .map(col * 2.0)               # MapMulScalar  → Rust, SIMD
        .map(col + 1.0)               # MapAddScalar  → Rust, SIMD
        .map(col.abs())               # MapAbs        → Rust
        .map(col.sqrt())              # MapSqrt       → Rust
        .map(col ** 2)                # MapPowScalar  → Rust
        .map(-col)                    # MapNeg        → Rust, SIMD
        .to_list()
)

Supported DSL operators

Python expression Internal op SIMD Description
col > x FilterGt greater than
col >= x FilterGe greater than or equal
col < x FilterLt less than
col <= x FilterLe less than or equal
col.between(a,b) FilterBetween a ≤ v ≤ b
col * x MapMulScalar multiply by scalar
col + x MapAddScalar add scalar
col - x MapSubScalar subtract scalar
col / x MapDivScalar divide (via reciprocal)
col ** x MapPowScalar exponentiate
-col MapNeg negate
col.abs() MapAbs absolute value
col.sqrt() MapSqrt square root
col.floor() MapFloor floor (round toward −∞)
col.ceil() MapCeil ceiling (round toward +∞)
col.round() MapRound round to nearest integer
col.reciprocal() MapReciprocal 1 / x

Python lambda — for arbitrary Python objects (GIL held)

(
    Query(records)
        .filter(lambda r: r["score"] > 80)
        .map(lambda r: r["name"].upper())
        .to_list()
)

Rule of thumb: use the DSL for numeric arrays, use lambdas for arbitrary Python objects. The performance gap is roughly 20–40× at 1M elements.


3. numpy

import numpy as np
from zpyflow import Query, col, from_numpy

rng = np.random.default_rng(42)
arr = rng.standard_normal(1_000_000)   # shape: (1M,), dtype: float64

# from_numpy() converts to Query using the f64 fast path
result = (
    from_numpy(arr)
        .filter(col > 0)        # SIMD filter, GIL released
        .map(col ** 2)          # square
        .take(10_000)
        .to_list()
)

# Aggregations on large arrays
positive = from_numpy(arr).filter(col > 0)
mean_positive = positive.sum() / positive.count()
max_val       = from_numpy(arr).max()

# Convert result back to numpy
result_arr = np.array(from_numpy(arr).filter(col.between(-1, 1)).to_list())

Integer arrays

ids = np.arange(1_000_000, dtype=np.int64)

# DSL path — fast, GIL released
big_ids = from_numpy(ids).filter(col > 500_000).to_list()

# Lambda fallback when DSL doesn't cover the operation
even_ids = from_numpy(ids).filter(lambda x: x % 2 == 0).take(100).to_list()

Float32 arrays (ML / embedding workloads)

from_numpy routes float32 arrays to a native f32x8 SIMD path (8 lanes per AVX2 register, twice the throughput of f64x4). Results are promoted to Python float (f64) at collection time; to_numpy() preserves the original float32 dtype.

# Typical ML scenario: embedding similarity post-filter
scores = np.random.rand(1_000_000).astype(np.float32)

# Zero-copy buffer read → f32x8 SIMD filter → 0 intermediate allocations
top_indices = from_numpy(scores).filter(col > 0.95).count()

# to_numpy() returns float32, not float64 — no precision loss
filtered = from_numpy(scores).filter(col > 0.9).to_numpy()
assert filtered.dtype == np.float32

Explicit typed constructors

When a list contains mixed numeric types (e.g. [1, 2, 3.0]), Query() falls back to the generic Python path. Use the explicit constructors to force the fast path:

# Mixed-type list → force f64 fast path (SIMD, GIL released)
Query.f64([1, 2, 3.0]).filter(col > 1).sum()   # → 5.0

# Force i64 fast path
Query.i64([1, 2, 3]).filter(col > 1).to_list() # → [2, 3]

Speed comparison (1M float64)

import time
import numpy as np
from zpyflow import from_numpy, col

arr = np.random.randn(1_000_000)

# numpy — eager, creates 2 intermediate arrays
t0 = time.perf_counter()
r_np = arr[arr > 0] * 2
print(f"numpy:   {(time.perf_counter()-t0)*1000:.1f}ms  (2 allocations)")

# ZPyFlow — single fused pass, 1 allocation, GIL released
t0 = time.perf_counter()
r_zpf = from_numpy(arr).filter(col > 0).map(col * 2).to_list()
print(f"zpyflow: {(time.perf_counter()-t0)*1000:.1f}ms  (1 allocation)")

4. pandas

ZPyFlow is not a pandas replacement. It accelerates numeric column preprocessing and row-level filtering within existing pandas workflows.

Processing a numeric column

import pandas as pd
from zpyflow import Query, col

df = pd.DataFrame({
    "user_id": range(100_000),
    "score":   [float(i) / 100 for i in range(100_000)],
    "active":  [i % 3 != 0 for i in range(100_000)],
    "region":  ["JP" if i % 5 == 0 else "US" for i in range(100_000)],
})

# Extract, transform, and write back
scores = df["score"].tolist()

# Normalize scores above the median — single fused pass
median = df["score"].median()
normalized = (
    Query(scores)
        .filter(col > median)
        .map(col / df["score"].max())
        .to_list()
)
df_high = df[df["score"] > median].copy()
df_high["normalized"] = normalized

Row-level processing via to_dict("records")

records = df.to_dict("records")   # list[dict]

result = (
    Query(records)
        .filter(lambda r: r["active"] and r["region"] == "JP")
        .map(lambda r: {"user_id": r["user_id"], "score": r["score"] * 1.1})
        .take(1_000)
        .to_list()
)

result_df = pd.DataFrame(result)

Wrapping ZPyFlow as a pandas transform step

def fast_clip_and_scale(series: pd.Series, lo: float, hi: float) -> list[float]:
    """Filter to [lo, hi] and scale to [0, 1]."""
    span = hi - lo
    return (
        Query(series.tolist())
            .filter(col.between(lo, hi))
            .map((col - lo) / span)
            .to_list()
    )

df["score_scaled"] = fast_clip_and_scale(df["score"], lo=0.2, hi=0.8)

5. Dataclasses and custom objects

from dataclasses import dataclass
from zpyflow import Query

@dataclass
class Employee:
    id: int
    name: str
    department: str
    salary: float
    years: int

employees = [
    Employee(1, "Alice",  "Engineering", 120_000, 5),
    Employee(2, "Bob",    "Marketing",    85_000, 3),
    Employee(3, "Carol",  "Engineering", 140_000, 8),
    Employee(4, "Dan",    "HR",           75_000, 2),
    Employee(5, "Eve",    "Engineering", 110_000, 4),
    Employee(6, "Frank",  "Marketing",   90_000,  6),
    Employee(7, "Grace",  "HR",          80_000,  7),
]

# Filter + project
senior_engineers = (
    Query(employees)
        .filter(lambda e: e.department == "Engineering" and e.years >= 5)
        .map(lambda e: e.name)
        .to_list()
)
# → ["Alice", "Carol"]

# Top earners as (name, salary) tuples
top_earners = (
    Query(employees)
        .filter(lambda e: e.salary > 100_000)
        .map(lambda e: (e.name, e.salary))
        .to_list()
)
# → [("Alice", 120000), ("Carol", 140000), ("Eve", 110000)]

# Any / all
has_hr = Query(employees).any(lambda e: e.department == "HR")       # True
all_ft = Query(employees).all(lambda e: e.years > 0)                 # True

# Reduce to compute total salary
total_salary = (
    Query(employees)
        .reduce(lambda acc, e: acc + e.salary, initial=0.0)
)

# Extract salaries as float list → use f64 fast path for aggregation
salaries = Query([e.salary for e in employees])
avg_salary = salaries.sum() / salaries.count()   # GIL released for sum

Pydantic models

from pydantic import BaseModel
from zpyflow import Query

class Order(BaseModel):
    order_id: str
    amount: float
    status: str
    customer_id: int

orders = [Order(**o) for o in raw_orders]

# Filter pending high-value orders
large_pending = (
    Query(orders)
        .filter(lambda o: o.status == "pending" and o.amount > 10_000)
        .map(lambda o: {"order_id": o.order_id, "amount": o.amount})
        .take(50)
        .to_list()
)

# Fast aggregation on the amount field alone
amounts = Query([o.amount for o in orders])
total_pending = (
    Query(orders)
        .filter(lambda o: o.status == "pending")
        .map(lambda o: o.amount)
        .reduce(lambda acc, x: acc + x, initial=0.0)
)

6. Dict records and log processing

from zpyflow import Query, field

logs = [
    {"ts": "2024-01-15T10:23:11", "level": "ERROR", "status": 500, "path": "/api/users", "latency_ms": 312},
    {"ts": "2024-01-15T10:23:12", "level": "INFO",  "status": 200, "path": "/api/items", "latency_ms": 45},
    {"ts": "2024-01-15T10:23:13", "level": "WARN",  "status": 429, "path": "/api/users", "latency_ms": 8},
    {"ts": "2024-01-15T10:23:14", "level": "ERROR", "status": 500, "path": "/health",    "latency_ms": 520},
    # ... millions of records
]

# ✅ OK — field() DSL: GIL released after first filter, SIMD for numeric fields
slow_errors = (
    Query(logs)
        .filter(field("status") >= 500)
        .filter(field("latency_ms") > 100)
        .count()
)

# ✅ OK — field() DSL + map_field(): single fused Rust loop
slow_paths = (
    Query(logs)
        .filter(field("latency_ms") > 100)
        .map_field("path")
        .to_list()
)

# ✅ OK — map with dict construction: no DSL equivalent, lambda is the right choice
errors = (
    Query(logs)
        .filter(field("level") == "ERROR")
        .map(lambda l: {"path": l["path"], "latency_ms": l["latency_ms"], "ts": l["ts"]})
        .to_list()
)
errors.sort(key=lambda l: l["latency_ms"], reverse=True)

# NG — numeric filter via lambda: GIL held per element, same speed as Python
# Query(logs).filter(lambda l: l["latency_ms"] > 100).to_list()

# NG — single field extraction via lambda when map_field() exists
# Query(logs).filter(...).map(lambda l: l["path"]).to_list()

# ✅ OK — compound condition (AND of two fields): no DSL equivalent, lambda is correct
active_errors = (
    Query(logs)
        .filter(lambda l: l["level"] == "ERROR" and l["latency_ms"] > 100)
        .to_list()
)

# Extract latency as float list → f64 fast path for aggregation
latencies = Query([float(l["latency_ms"]) for l in logs])
avg_latency = latencies.sum() / latencies.count()
max_latency = latencies.max()

Rule of thumb for dict records

  • Single-field numeric/equality filter → field() DSL (GIL released, faster at large N)
  • Single-field extraction → map_field("key") (fused with field filter in one Rust loop)
  • Multi-field compound condition (and / or) → lambda (no DSL equivalent)
  • Map that builds a new dict → lambda (no DSL equivalent)

Multiple queries on the same dataset — use .preload()

field() pays a one-time dict→RustObj conversion cost on the first filter call (GIL held, O(N)). For a single query this cost amortizes across the filter pass itself. When you run multiple queries over the same dataset, call .preload() first to pay the conversion once:

# ✅ OK — preload() converts once, all subsequent filters run GIL-free
q = Query(logs).preload()
slow_count   = q.filter(field("latency_ms") > 100).count()
error_count  = q.filter(field("status") >= 500).count()

# Count by status code
from collections import Counter
status_counts = Counter(Query(logs).map(lambda l: l["status"]).to_list())
# → Counter({200: 1, 500: 2, 429: 1})

Reading directly from JSON Lines

from zpyflow import from_json_lines, col

# Extract a single numeric field — uses f64 fast path
latencies = from_json_lines("access.log.ndjson", field="latency_ms", dtype="float")

stats = {
    "count": latencies.count(),
    "total": latencies.sum(),
    "max":   latencies.max(),
    "above_200ms": latencies.filter(col > 200).count(),
}

7. GroupBy and aggregation

from zpyflow import Query, GroupBy

transactions = [
    {"user": "alice", "amount": 120.0, "category": "food"},
    {"user": "bob",   "amount":  45.0, "category": "transport"},
    {"user": "alice", "amount": 300.0, "category": "shopping"},
    {"user": "carol", "amount":  80.0, "category": "food"},
    {"user": "bob",   "amount": 200.0, "category": "shopping"},
    {"user": "alice", "amount":  55.0, "category": "food"},
]

by_user = GroupBy(transactions, key_fn=lambda t: t["user"])

# Per-user count and total
summary = by_user.agg(
    count=lambda g: g.count(),
    total=lambda g: Query([t["amount"] for t in g.to_list()]).sum(),
)
# → [{"_key": "alice", "count": 3, "total": 475.0}, ...]

# Fetch a single group
alice_txns = by_user.get_group("alice").to_list()

# Sum per group using a field extractor
by_category = GroupBy(transactions, key_fn=lambda t: t["category"])
category_totals = by_category.sum_per_group(field=lambda t: t["amount"])
# → {"food": 255.0, "transport": 45.0, "shopping": 500.0}

# Count per group
counts = by_user.count_per_group()
# → {"alice": 3, "bob": 2, "carol": 1}

Single-pass aggregation with group_agg

group_agg runs count, sum, mean, max, and min in one Rust pass over the data, avoiding intermediate list materialization.

from zpyflow import Query, field, agg_count, agg_sum, agg_mean

transactions = [
    {"user": "alice", "amount": 120.0, "category": "food"},
    {"user": "bob",   "amount":  45.0, "category": "transport"},
    {"user": "alice", "amount": 300.0, "category": "shopping"},
    {"user": "carol", "amount":  80.0, "category": "food"},
    {"user": "bob",   "amount": 200.0, "category": "shopping"},
    {"user": "alice", "amount":  55.0, "category": "food"},
]

# Lambda key
result = (
    Query(transactions)
        .group_agg(
            lambda t: t["user"],
            count   = agg_count(),
            total   = agg_sum(lambda t: t["amount"]),
        )
)
# → [{"_key": "alice", "count": 3, "total": 475.0},
#    {"_key": "bob",   "count": 2, "total": 245.0},
#    {"_key": "carol", "count": 1, "total":  80.0}]

# field() DSL key — Rust-side key extraction after dict→RustObj conversion
by_category = (
    Query(transactions)
        .group_agg(
            field("category"),
            count   = agg_count(),
            revenue = agg_sum(lambda t: t["amount"]),
            avg     = agg_mean(lambda t: t["amount"]),
        )
)

When to prefer group_agg over GroupBy: group_agg is a single Rust pass and returns immediately. Use GroupBy when you need per-group Query objects for multi-step operations (pagination, nested queries).


8. CSV and JSON Lines streaming

from zpyflow import from_csv, from_json_lines, col

# Read a single numeric column from CSV — f64 fast path
prices = from_csv("products.csv", column="price", dtype="float")

discounted = (
    prices
        .filter(col > 10.0)
        .map(col * 0.9)
        .to_list()
)

# Read all rows as dicts
products = from_csv("products.csv")   # column=None → list[dict]

premium = (
    products
        .filter(lambda p: float(p["price"]) > 100 and p["in_stock"] == "true")
        .map(lambda p: {"name": p["name"], "price": float(p["price"])})
        .to_list()
)

# JSON Lines — filter at the source level
events = from_json_lines("events.ndjson")
errors = (
    events
        .filter(lambda e: e.get("level") == "error")
        .take(1_000)
        .to_list()
)

9. AI and embedding pipelines

Similarity score arrays are a natural fit for ZPyFlow's f64 fast path: large, homogeneous, numeric, and the filtering threshold is known at query time.

import numpy as np
from zpyflow import Query, col, from_numpy

# Pre-computed cosine similarity scores against a query vector (1M documents)
scores = np.random.uniform(0, 1, size=1_000_000).astype(np.float64)
doc_ids = np.arange(1_000_000, dtype=np.int64)

# Filter by threshold, retrieve top-K candidates — SIMD, GIL released
THRESHOLD = 0.85
TOP_K = 100

candidate_scores = from_numpy(scores).filter(col > THRESHOLD).to_list()
candidate_ids    = from_numpy(doc_ids).filter(col > THRESHOLD).to_list()

# Pair and rank
candidates = sorted(
    zip(candidate_scores, range(len(candidate_scores))),
    reverse=True,
)[:TOP_K]

Batch inference scoring statistics

def score_batch_stats(batch_scores: list[float]) -> dict:
    q = Query(batch_scores)
    n = q.count()
    return {
        "n":          n,
        "mean":       q.sum() / n,
        "high_conf":  q.filter(col > 0.9).count(),
        "low_conf":   q.filter(col < 0.5).count(),
        "max":        q.max(),
    }

# Apply to each inference batch (all Rust, no GIL for the numeric ops)
batches = [scores[i:i+10_000].tolist() for i in range(0, len(scores), 10_000)]
batch_stats = [score_batch_stats(b) for b in batches]

Embedding norm validation

# Detect embeddings that slipped through without L2 normalization
norms = [float(np.linalg.norm(emb)) for emb in embeddings]

unnormalized_count = (
    Query(norms)
        .filter(lambda n: abs(n - 1.0) > 0.01)
        .count()
)
print(f"{unnormalized_count} embeddings need re-normalization")

# Fast summary of norm distribution
norm_query = Query(norms)
print(f"min={norm_query.min():.4f}  max={norm_query.max():.4f}  "
      f"mean={norm_query.sum()/norm_query.count():.4f}")

LangChain / LangGraph integration

ZPyFlow slots into LangChain and LangGraph wherever a node processes large numeric arrays. No special integration is needed — just use it in your node functions or tools.

RAG retrieval — filter similarity scores with early stopping

from langchain_core.retrievers import BaseRetriever
from langchain_core.documents import Document
from zpyflow import from_numpy, col

class ZPyFlowRetriever(BaseRetriever):
    """Retriever that uses ZPyFlow for fast threshold filtering."""

    docs: list[Document]
    embeddings: object  # any embedding model

    def _get_relevant_documents(self, query: str) -> list[Document]:
        import numpy as np

        query_vec = self.embeddings.embed_query(query)
        scores = np.array([
            np.dot(query_vec, self.embeddings.embed_documents([d.page_content])[0])
            for d in self.docs
        ])

        # SIMD filter + early stopping — never scans beyond the K-th hit
        top_indices = (
            from_numpy(scores)
            .filter(col > 0.7)
            .take(20)
            .to_list()
        )
        return [self.docs[int(i)] for i in top_indices]

LangGraph node — aggregate tool results without materializing a full list

from langgraph.graph import StateGraph, MessagesState

def score_filter_node(state: MessagesState) -> dict:
    """LangGraph node: filter and aggregate a large score array from a tool call."""
    scores: list[float] = state["tool_scores"]  # e.g. 500K candidates

    q = Query(scores)
    return {
        "candidate_count": q.filter(col > 0.8).count(),   # stays in Rust
        "top_score":       q.max(),
        "mean_score":      q.sum() / q.count(),
    }

graph = StateGraph(MessagesState)
graph.add_node("score_filter", score_filter_node)

LangChain tool — return pre-aggregated stats to the LLM

from langchain_core.tools import tool
from zpyflow import Query, col

@tool
def analyze_search_results(scores: list[float]) -> dict:
    """Aggregate similarity scores from a vector search."""
    q = Query(scores)
    n = q.count()
    return {
        "total":        n,
        "high_quality": q.filter(col > 0.85).count(),
        "low_quality":  q.filter(col < 0.5).count(),
        "best_score":   q.max(),
        "mean_score":   round(q.sum() / n, 4) if n else 0.0,
    }

When ZPyFlow helps in AI pipelines: large numeric score arrays (similarity, confidence, reward, logprobs) where you threshold-filter and aggregate. It does not speed up LLM calls themselves (those are I/O-bound) or small lists (< 10K elements).


10. Parallel execution

.parallel() applies to numeric fast paths only (f64 / i64). Python object pipelines ignore this hint.

from zpyflow import Query, col, from_numpy
import numpy as np

large = np.random.randn(10_000_000).tolist()

# Single-threaded: SIMD + GIL released
result_single   = Query(large).filter(col > 0).map(col * 2).to_list()

# Multi-threaded: Rayon work-stealing, GIL fully released
result_parallel = Query(large).filter(col > 0).map(col * 2).parallel().to_list()

# Aggregation also parallelizes
total = Query(large).filter(col > 0).parallel().sum()
Data size Single-thread (SIMD) Parallel (8 cores)
1M f64 ~3ms ~0.8ms
10M f64 ~30ms ~5ms
100M f64 ~300ms ~45ms

Note: threading overhead (split + join) means parallel mode is slower than single-threaded for inputs under ~500K elements. Profile before enabling it.


11. Full API reference

Constructors / source adapters

from zpyflow import Query, from_numpy, from_arrow, from_csv, from_json_lines

Query([1.0, 2.0, 3.0])               # list[float] → f64 fast path
Query([1, 2, 3])                      # list[int]   → i64 fast path
Query(["a", "b"])                     # list[str]   → Python path
Query(x**2 for x in range(100))      # generator (consumed eagerly)
Query(dict_or_obj_list)               # any iterable

from_numpy(np_array)                  # 1-D numpy array; bool/uint8 stay compact
from_arrow(pa_array_or_table)         # PyArrow Array / ChunkedArray / Table
from_csv("data.csv", column="price") # CSV single-column
from_csv("data.csv")                  # CSV all rows as list[dict]
from_json_lines("log.ndjson")         # NDJSON all rows as list[dict]
from_json_lines("log.ndjson",
                field="value",
                dtype="float")        # NDJSON single numeric field

Lazy combinators (deferred until terminal call)

q.filter(col > 0)               # DSL predicate — Rust, SIMD where possible
q.filter(lambda x: x > 0)       # Python callable — GIL held
q.map(col * 2.0)                 # DSL transform — Rust, SIMD where possible
q.map(lambda x: x * 2)          # Python callable — GIL held
q.map_field("name")              # extract one field from dict records (fused with field() filter)
q.take(n)                        # keep first n elements
q.skip(n)                        # drop first n elements
q.take_while(pred)               # take while pred holds, then stop
q.skip_while(pred)               # skip while pred holds, then emit remainder
q.parallel()                     # request parallel execution (numeric only)

Terminal operations (trigger execution)

q.to_list()                      # collect to Python list
q.to_numpy()                     # collect to numpy ndarray (no per-element boxing)
q.to_dict(key=fn, value=fn)      # collect to Python dict
q.to_bytes()                     # raw f64 bytes (for zero-copy numpy.frombuffer)
q.count()                        # number of elements
q.sum()                          # sum (numeric paths use SIMD)
q.min()                          # minimum value
q.max()                          # maximum value (SIMD for f64)
q.mean()                         # arithmetic mean, or None if empty
q.var()                          # population variance (ddof=0), or None if empty
q.std()                          # standard deviation, or None if empty
q.stats()                        # count/sum/mean/min/max in one pass — {"count": N, ...}
q.first()                        # first element, or None
q.last()                         # last element, or None
q.reduce(fn, initial=val)        # general fold
q.for_each(fn)                   # consume with side effect, returns None
q.any(pred)                      # True if any element satisfies pred
q.all(pred)                      # True if all elements satisfy pred

12. Performance

Benchmark: 1M float64 elements — filter(x > 0) + map(x * 2) + take(10_000)

Approach Time Allocations GIL
Python list comprehension ~80ms 2 lists held
Python generator + take ~40ms 0 held
numpy (arr[arr > 0] * 2) ~8ms 2 arrays released
ZPyFlow lambda (Python callback) ~70–80ms 1 list held
ZPyFlow Expression DSL (SIMD) ~2–5ms 1 list released
ZPyFlow Expression DSL + parallel ~0.5–1ms 1 list released

Note: The lambda path (Python callback) is GIL-bound and offers throughput comparable to a plain list comprehension — the benefit over raw Python is the unified pipeline API, not speed. For maximum throughput, use the Expression DSL (e.g. col > 0, col * 2) which releases the GIL and uses SIMD.

Running the benchmarks

# Rust (Criterion) — detailed per-operation breakdown
cargo bench --bench pipeline

# SIMD selectivity analysis (10% / 25% / 50% / 75% / 90% pass rate)
cargo bench --bench simd_filter

# Python (pytest-benchmark)
pip install pytest-benchmark
pytest tests/test_performance.py -v --benchmark-autosave

Measuring memory

pip install memory-profiler
python -m memory_profiler your_script.py

Raw benchmark JSON results are saved in sandbox/benchmark/results/ after each make dc-bench run. Use make dc-bench-compare to diff against a saved baseline.


13. ZPyFlow vs Polars

ZPyFlow is not a Polars replacement.

When to use ZPyFlow

  • Data already lives in Python as list[float], numpy arrays, dict records, or generators
  • The hot path is a single fused pipeline: filter → take, filter → count, filter → sum
  • Moving to a DataFrame model would require re-architecting surrounding code
  • You need the GIL released from a Python list without changing the calling code
  • Early-stop semantics matter: filter(col > t).take(K) scans only until K results are found

When to use Polars (or pandas)

  • The workload involves multi-column joins or window functions
  • You need stats across multiple columns at once, or complex GROUP BY analytics
  • Data is table-shaped and loaded from a file from the start
  • SQL-style GROUP BY over multiple columns with complex group logic

Product-choice comparison

Scenario ZPyFlow Polars
filter(col > t).count() on 1M floats ✅ ~2ms, SIMD, GIL released ✅ ~5ms (columnar)
filter(col > t).take(K) with small K ✅ Early-stop, scans only until K found ⚠️ Scans all N first
filter + map + sum in one pass ✅ Fused, 1 allocation ✅ Columnar, 2–3 allocations
count + sum + mean + min + max in one pass stats() — 1 SIMD pass, GIL released ✅ 1 columnar pass
Multi-column join ❌ Not supported ✅ Native
Loading CSV + analyzing as a table ⚠️ Awkward ✅ Natural
Arbitrary Python objects (dicts, dataclasses) ✅ Lambda path ⚠️ Requires schema
Embedding threshold + top-K (ANN post-filter) ✅ Fast early-stop ⚠️ No early-stop

Core rule: if data is already Python sequences and the pipeline is simple, ZPyFlow removes allocation and GIL cost with zero framework migration. If the data is tabular or the analysis spans multiple columns, use Polars.

The API-response case — Polars is not in the picture

Polars is rarely considered for API response processing, and for good reason:

# Typical API response: list[dict], schema unknown, nullable fields
resp = httpx.get("https://api.example.com/events")
events = resp.json()          # list[dict] — arbitrary structure

# Using Polars requires a round-trip through DataFrame
df = pl.DataFrame(events)     # schema inference + full materialization
active = df.filter(pl.col("status") == "active").to_dicts()  # back to list[dict]

That list[dict] → DataFrame → list[dict] round-trip pays schema inference and columnar conversion costs — for simple filtering it is strictly slower and more code than a list comprehension.

In practice, API response processing looks like one of these:

# Before ZPyFlow — plain Python
active = [e for e in events if e["status"] == "active"]
top    = sorted(active, key=lambda e: e["score"], reverse=True)[:100]

# With ZPyFlow — same semantics, GIL released after first field() filter
active = Query(events).filter(field("status") == "active").to_list()
top    = Query(events).filter(field("score") >= threshold).take(100).to_list()

In this space ZPyFlow's real competitor is plain Python, not Polars. The relevant comparison is not ZPyFlow vs Polars, but ZPyFlow vs list comprehensions on list[dict] — and at N > 50K that gap is 3–8× in ZPyFlow's favour.


Design background

ZPyFlow is inspired by ZLinq (zero-allocation LINQ for C# / .NET).

ZLinq fuses Where().Select().Take() into a single loop at JIT time using CLR generic specialization. ZPyFlow achieves the same fusion inside the Rust core using the ZStream trait — every operator is a concrete generic type, LLVM inlines the full chain at -O3. The PyO3 boundary is the only place dynamic dispatch appears, and it is crossed once per terminal call, not once per element.

See ARCHITECTURE.md for the full design document.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

zpyflow-0.1.0.tar.gz (204.5 kB view details)

Uploaded Source

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

zpyflow-0.1.0-cp310-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (509.2 kB view details)

Uploaded CPython 3.10+manylinux: glibc 2.17+ x86-64

zpyflow-0.1.0-cp310-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (477.8 kB view details)

Uploaded CPython 3.10+manylinux: glibc 2.17+ ARM64

zpyflow-0.1.0-cp310-abi3-macosx_10_12_x86_64.macosx_11_0_arm64.macosx_10_12_universal2.whl (881.2 kB view details)

Uploaded CPython 3.10+macOS 10.12+ universal2 (ARM64, x86-64)macOS 10.12+ x86-64macOS 11.0+ ARM64

File details

Details for the file zpyflow-0.1.0.tar.gz.

File metadata

  • Download URL: zpyflow-0.1.0.tar.gz
  • Upload date:
  • Size: 204.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: maturin/1.13.3

File hashes

Hashes for zpyflow-0.1.0.tar.gz
Algorithm Hash digest
SHA256 16402d28d3c5c6fc1356b613271c5308c48cc2e6c050726a0654a7b966135e78
MD5 f69df2a3cc7793b4a7f0ac44abc022f6
BLAKE2b-256 eb7d25665385504f611e43bfdf03e9b06419fc46373f6c80ccd319d589abc7b1

See more details on using hashes here.

File details

Details for the file zpyflow-0.1.0-cp310-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.

File metadata

File hashes

Hashes for zpyflow-0.1.0-cp310-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Algorithm Hash digest
SHA256 2819a93d756014e213f0c07ccfa1fe1eadf7840f096456cadc6b072560d3af43
MD5 401c3b3e8689c114741dc40b4e938580
BLAKE2b-256 89f68b2a5090adb25b9d9383e3dab3c4276121e5e15f5f6546e5d11443d2f224

See more details on using hashes here.

File details

Details for the file zpyflow-0.1.0-cp310-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl.

File metadata

File hashes

Hashes for zpyflow-0.1.0-cp310-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl
Algorithm Hash digest
SHA256 a6ffd19b729e779de1e15d8e8b50b04c8f94dab32ac3e9812b55cef15ac14b12
MD5 5f9fcac4f8c11d5fafa61d8345a0536e
BLAKE2b-256 d893ebf4a64544c70dc35f289ddc954be85d37abaa9df2bd11bc5e22ca622786

See more details on using hashes here.

File details

Details for the file zpyflow-0.1.0-cp310-abi3-macosx_10_12_x86_64.macosx_11_0_arm64.macosx_10_12_universal2.whl.

File metadata

File hashes

Hashes for zpyflow-0.1.0-cp310-abi3-macosx_10_12_x86_64.macosx_11_0_arm64.macosx_10_12_universal2.whl
Algorithm Hash digest
SHA256 9535fffd7faebd5177af5debd19c0bb4e9504a886609e47028236f07c71a9066
MD5 c281fbeebcc8eb68a2e00a6883bd8182
BLAKE2b-256 e688307f2e0d25d961563881e491011805fb00df5bf49cc5b128fa6089444be6

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page