Zero-allocation lazy query pipelines for Python, powered by Rust
Project description
ZPyFlow
Zero-allocation lazy query pipelines for Python, powered by Rust.
- Lazy & fused — filter + map + take run in a single pass with no intermediate lists
- SIMD-accelerated — float/int arrays execute in Rust with the GIL released, using
f64x4 - Expression DSL —
col > 5eliminates Python callbacks entirely - Python-friendly — numpy, pandas, dataclasses, plain lists, and generators all work as input
- Parallel execution —
.parallel()enables Rayon work-stealing
ZPyFlow is not a DataFrame engine. It lets Python sequence hot paths —
list[float], numpy arrays, dict record streams — run as fused Rust pipelines without moving the surrounding codebase into a tabular data model. See § 13 ZPyFlow vs Polars for the product-choice boundary.
Installation
pip install zpyflow
Optional extras:
pip install zpyflow[numpy] # NumPy integration
pip install zpyflow[arrow] # PyArrow integration
Development install (from source)
pip install maturin
git clone https://github.com/bigbangdash/zpyflow
cd zpyflow
maturin develop --release
Development (Make + Docker)
All development tasks run inside Docker — no local Rust or Python setup needed beyond Docker itself.
Quick reference
| Command | What it does |
|---|---|
make dc-test |
Build + run Python unit tests |
make dc-test-k K="f64" |
Run tests matching keyword f64 |
make dc-bench |
Build + run all benchmark suites |
make dc-bench-filter |
filter benchmarks only |
make dc-bench-chained |
chained pipeline benchmarks only |
make dc-bench-agg |
aggregation benchmarks (vs numpy / pandas / polars) |
make dc-bench-numpy |
numpy comparison benchmarks |
make dc-bench-objects |
Python object (dict / dataclass) benchmarks |
make dc-bench-vector |
vector search — top-K early stopping |
make dc-bench-ml |
ML feature preprocessing pipeline |
make dc-bench-etl |
ETL multi-stat aggregation (vs Polars / Pandas) |
make dc-bench-fraud |
fraud / risk scoring — review queue, exposure sum |
make dc-bench-groupby |
GroupBy and pagination (object path) |
make dc-bench-null |
null-mixed list benchmarks (None handling) |
make dc-bench-rust |
Rust (Criterion) benchmarks |
make dc-bench-save |
Save current results as baseline |
make dc-bench-compare |
Compare against saved baseline (fails on >10% regression) |
make dc-shell |
Interactive shell inside the container |
make dc-image |
Rebuild the Docker image |
make dc-clean |
Remove named volumes (reset Cargo + target cache) |
Typical workflow
# First time: build the image (takes a few minutes — downloads Rust toolchain)
make dc-image
# Edit Rust source, then test
make dc-test
# Measure performance against all libraries
make dc-bench-agg
# Run a single aggregation benchmark suite and compare against baseline
make dc-bench-save # save current run as baseline
# ... make changes ...
make dc-bench-compare # fails if mean regresses by >10%
# Open a shell to inspect the built extension
make dc-shell
(inside) python -c "from zpyflow import Query, col; print(Query([1,2,3]).filter(col>1).to_list())"
Local build (requires Rust + maturin on host)
make build # maturin develop --release
make test # build + pytest
make bench # build + all Python benchmarks
make bench-rust # cargo bench
make lint # cargo clippy
make fmt # cargo fmt + ruff format
make clean # remove build artifacts
Contents
- Basic usage
- Expression DSL vs lambda
- numpy
- pandas
- Dataclasses and custom objects
- Dict records and log processing
- GroupBy and aggregation
- CSV and JSON Lines streaming
- AI, embeddings, and LangChain / LangGraph
- Parallel execution
- Full API reference
- Performance
- ZPyFlow vs Polars
1. Basic usage
from zpyflow import Query, col
data = [1.5, -2.3, 0.7, 4.1, -0.5, 3.8, -1.1, 2.2]
result = (
Query(data)
.filter(col > 0) # keep positive values
.map(col * 2.0) # double them
.take(4) # first 4
.to_list()
)
# → [3.0, 1.4, 8.2, 7.6]
# Aggregations
total = Query(data).filter(col > 0).sum() # 12.3
count = Query(data).filter(col > 0).count() # 5
vmax = Query(data).max() # 4.1
vmin = Query(data).filter(col > 0).min() # 0.7
first_positive = Query(data).filter(col > 0).first() # 1.5
last_positive = Query(data).filter(col > 0).last() # 3.8
2. Expression DSL vs lambda
ZPyFlow supports two styles. Choose based on your data type.
Expression DSL — recommended for numeric data (GIL released, SIMD)
from zpyflow import col
(
Query(data)
.filter(col > 0) # FilterGt → Rust, SIMD
.filter(col.between(0, 10)) # FilterBetween → Rust, SIMD
.map(col * 2.0) # MapMulScalar → Rust, SIMD
.map(col + 1.0) # MapAddScalar → Rust, SIMD
.map(col.abs()) # MapAbs → Rust
.map(col.sqrt()) # MapSqrt → Rust
.map(col ** 2) # MapPowScalar → Rust
.map(-col) # MapNeg → Rust, SIMD
.to_list()
)
Supported DSL operators
| Python expression | Internal op | SIMD | Description |
|---|---|---|---|
col > x |
FilterGt | ✅ | greater than |
col >= x |
FilterGe | ✅ | greater than or equal |
col < x |
FilterLt | ✅ | less than |
col <= x |
FilterLe | ✅ | less than or equal |
col.between(a,b) |
FilterBetween | ✅ | a ≤ v ≤ b |
col * x |
MapMulScalar | ✅ | multiply by scalar |
col + x |
MapAddScalar | ✅ | add scalar |
col - x |
MapSubScalar | ✅ | subtract scalar |
col / x |
MapDivScalar | ✅ | divide (via reciprocal) |
col ** x |
MapPowScalar | — | exponentiate |
-col |
MapNeg | ✅ | negate |
col.abs() |
MapAbs | — | absolute value |
col.sqrt() |
MapSqrt | — | square root |
col.floor() |
MapFloor | — | floor (round toward −∞) |
col.ceil() |
MapCeil | — | ceiling (round toward +∞) |
col.round() |
MapRound | — | round to nearest integer |
col.reciprocal() |
MapReciprocal | — | 1 / x |
Python lambda — for arbitrary Python objects (GIL held)
(
Query(records)
.filter(lambda r: r["score"] > 80)
.map(lambda r: r["name"].upper())
.to_list()
)
Rule of thumb: use the DSL for numeric arrays, use lambdas for arbitrary Python objects. The performance gap is roughly 20–40× at 1M elements.
3. numpy
import numpy as np
from zpyflow import Query, col, from_numpy
rng = np.random.default_rng(42)
arr = rng.standard_normal(1_000_000) # shape: (1M,), dtype: float64
# from_numpy() converts to Query using the f64 fast path
result = (
from_numpy(arr)
.filter(col > 0) # SIMD filter, GIL released
.map(col ** 2) # square
.take(10_000)
.to_list()
)
# Aggregations on large arrays
positive = from_numpy(arr).filter(col > 0)
mean_positive = positive.sum() / positive.count()
max_val = from_numpy(arr).max()
# Convert result back to numpy
result_arr = np.array(from_numpy(arr).filter(col.between(-1, 1)).to_list())
Integer arrays
ids = np.arange(1_000_000, dtype=np.int64)
# DSL path — fast, GIL released
big_ids = from_numpy(ids).filter(col > 500_000).to_list()
# Lambda fallback when DSL doesn't cover the operation
even_ids = from_numpy(ids).filter(lambda x: x % 2 == 0).take(100).to_list()
Float32 arrays (ML / embedding workloads)
from_numpy routes float32 arrays to a native f32x8 SIMD path (8 lanes per AVX2
register, twice the throughput of f64x4). Results are promoted to Python float (f64)
at collection time; to_numpy() preserves the original float32 dtype.
# Typical ML scenario: embedding similarity post-filter
scores = np.random.rand(1_000_000).astype(np.float32)
# Zero-copy buffer read → f32x8 SIMD filter → 0 intermediate allocations
top_indices = from_numpy(scores).filter(col > 0.95).count()
# to_numpy() returns float32, not float64 — no precision loss
filtered = from_numpy(scores).filter(col > 0.9).to_numpy()
assert filtered.dtype == np.float32
Explicit typed constructors
When a list contains mixed numeric types (e.g. [1, 2, 3.0]), Query() falls back to
the generic Python path. Use the explicit constructors to force the fast path:
# Mixed-type list → force f64 fast path (SIMD, GIL released)
Query.f64([1, 2, 3.0]).filter(col > 1).sum() # → 5.0
# Force i64 fast path
Query.i64([1, 2, 3]).filter(col > 1).to_list() # → [2, 3]
Speed comparison (1M float64)
import time
import numpy as np
from zpyflow import from_numpy, col
arr = np.random.randn(1_000_000)
# numpy — eager, creates 2 intermediate arrays
t0 = time.perf_counter()
r_np = arr[arr > 0] * 2
print(f"numpy: {(time.perf_counter()-t0)*1000:.1f}ms (2 allocations)")
# ZPyFlow — single fused pass, 1 allocation, GIL released
t0 = time.perf_counter()
r_zpf = from_numpy(arr).filter(col > 0).map(col * 2).to_list()
print(f"zpyflow: {(time.perf_counter()-t0)*1000:.1f}ms (1 allocation)")
4. pandas
ZPyFlow is not a pandas replacement. It accelerates numeric column preprocessing and row-level filtering within existing pandas workflows.
Processing a numeric column
import pandas as pd
from zpyflow import Query, col
df = pd.DataFrame({
"user_id": range(100_000),
"score": [float(i) / 100 for i in range(100_000)],
"active": [i % 3 != 0 for i in range(100_000)],
"region": ["JP" if i % 5 == 0 else "US" for i in range(100_000)],
})
# Extract, transform, and write back
scores = df["score"].tolist()
# Normalize scores above the median — single fused pass
median = df["score"].median()
normalized = (
Query(scores)
.filter(col > median)
.map(col / df["score"].max())
.to_list()
)
df_high = df[df["score"] > median].copy()
df_high["normalized"] = normalized
Row-level processing via to_dict("records")
records = df.to_dict("records") # list[dict]
result = (
Query(records)
.filter(lambda r: r["active"] and r["region"] == "JP")
.map(lambda r: {"user_id": r["user_id"], "score": r["score"] * 1.1})
.take(1_000)
.to_list()
)
result_df = pd.DataFrame(result)
Wrapping ZPyFlow as a pandas transform step
def fast_clip_and_scale(series: pd.Series, lo: float, hi: float) -> list[float]:
"""Filter to [lo, hi] and scale to [0, 1]."""
span = hi - lo
return (
Query(series.tolist())
.filter(col.between(lo, hi))
.map((col - lo) / span)
.to_list()
)
df["score_scaled"] = fast_clip_and_scale(df["score"], lo=0.2, hi=0.8)
5. Dataclasses and custom objects
from dataclasses import dataclass
from zpyflow import Query
@dataclass
class Employee:
id: int
name: str
department: str
salary: float
years: int
employees = [
Employee(1, "Alice", "Engineering", 120_000, 5),
Employee(2, "Bob", "Marketing", 85_000, 3),
Employee(3, "Carol", "Engineering", 140_000, 8),
Employee(4, "Dan", "HR", 75_000, 2),
Employee(5, "Eve", "Engineering", 110_000, 4),
Employee(6, "Frank", "Marketing", 90_000, 6),
Employee(7, "Grace", "HR", 80_000, 7),
]
# Filter + project
senior_engineers = (
Query(employees)
.filter(lambda e: e.department == "Engineering" and e.years >= 5)
.map(lambda e: e.name)
.to_list()
)
# → ["Alice", "Carol"]
# Top earners as (name, salary) tuples
top_earners = (
Query(employees)
.filter(lambda e: e.salary > 100_000)
.map(lambda e: (e.name, e.salary))
.to_list()
)
# → [("Alice", 120000), ("Carol", 140000), ("Eve", 110000)]
# Any / all
has_hr = Query(employees).any(lambda e: e.department == "HR") # True
all_ft = Query(employees).all(lambda e: e.years > 0) # True
# Reduce to compute total salary
total_salary = (
Query(employees)
.reduce(lambda acc, e: acc + e.salary, initial=0.0)
)
# Extract salaries as float list → use f64 fast path for aggregation
salaries = Query([e.salary for e in employees])
avg_salary = salaries.sum() / salaries.count() # GIL released for sum
Pydantic models
from pydantic import BaseModel
from zpyflow import Query
class Order(BaseModel):
order_id: str
amount: float
status: str
customer_id: int
orders = [Order(**o) for o in raw_orders]
# Filter pending high-value orders
large_pending = (
Query(orders)
.filter(lambda o: o.status == "pending" and o.amount > 10_000)
.map(lambda o: {"order_id": o.order_id, "amount": o.amount})
.take(50)
.to_list()
)
# Fast aggregation on the amount field alone
amounts = Query([o.amount for o in orders])
total_pending = (
Query(orders)
.filter(lambda o: o.status == "pending")
.map(lambda o: o.amount)
.reduce(lambda acc, x: acc + x, initial=0.0)
)
6. Dict records and log processing
from zpyflow import Query, field
logs = [
{"ts": "2024-01-15T10:23:11", "level": "ERROR", "status": 500, "path": "/api/users", "latency_ms": 312},
{"ts": "2024-01-15T10:23:12", "level": "INFO", "status": 200, "path": "/api/items", "latency_ms": 45},
{"ts": "2024-01-15T10:23:13", "level": "WARN", "status": 429, "path": "/api/users", "latency_ms": 8},
{"ts": "2024-01-15T10:23:14", "level": "ERROR", "status": 500, "path": "/health", "latency_ms": 520},
# ... millions of records
]
# ✅ OK — field() DSL: GIL released after first filter, SIMD for numeric fields
slow_errors = (
Query(logs)
.filter(field("status") >= 500)
.filter(field("latency_ms") > 100)
.count()
)
# ✅ OK — field() DSL + map_field(): single fused Rust loop
slow_paths = (
Query(logs)
.filter(field("latency_ms") > 100)
.map_field("path")
.to_list()
)
# ✅ OK — map with dict construction: no DSL equivalent, lambda is the right choice
errors = (
Query(logs)
.filter(field("level") == "ERROR")
.map(lambda l: {"path": l["path"], "latency_ms": l["latency_ms"], "ts": l["ts"]})
.to_list()
)
errors.sort(key=lambda l: l["latency_ms"], reverse=True)
# NG — numeric filter via lambda: GIL held per element, same speed as Python
# Query(logs).filter(lambda l: l["latency_ms"] > 100).to_list()
# NG — single field extraction via lambda when map_field() exists
# Query(logs).filter(...).map(lambda l: l["path"]).to_list()
# ✅ OK — compound condition (AND of two fields): no DSL equivalent, lambda is correct
active_errors = (
Query(logs)
.filter(lambda l: l["level"] == "ERROR" and l["latency_ms"] > 100)
.to_list()
)
# Extract latency as float list → f64 fast path for aggregation
latencies = Query([float(l["latency_ms"]) for l in logs])
avg_latency = latencies.sum() / latencies.count()
max_latency = latencies.max()
Rule of thumb for dict records
- Single-field numeric/equality filter →
field()DSL (GIL released, faster at large N)- Single-field extraction →
map_field("key")(fused with field filter in one Rust loop)- Multi-field compound condition (
and/or) → lambda (no DSL equivalent)- Map that builds a new dict → lambda (no DSL equivalent)
Multiple queries on the same dataset — use .preload()
field() pays a one-time dict→RustObj conversion cost on the first filter call (GIL held,
O(N)). For a single query this cost amortizes across the filter pass itself. When you
run multiple queries over the same dataset, call .preload() first to pay the
conversion once:
# ✅ OK — preload() converts once, all subsequent filters run GIL-free
q = Query(logs).preload()
slow_count = q.filter(field("latency_ms") > 100).count()
error_count = q.filter(field("status") >= 500).count()
# Count by status code
from collections import Counter
status_counts = Counter(Query(logs).map(lambda l: l["status"]).to_list())
# → Counter({200: 1, 500: 2, 429: 1})
Reading directly from JSON Lines
from zpyflow import from_json_lines, col
# Extract a single numeric field — uses f64 fast path
latencies = from_json_lines("access.log.ndjson", field="latency_ms", dtype="float")
stats = {
"count": latencies.count(),
"total": latencies.sum(),
"max": latencies.max(),
"above_200ms": latencies.filter(col > 200).count(),
}
7. GroupBy and aggregation
from zpyflow import Query, GroupBy
transactions = [
{"user": "alice", "amount": 120.0, "category": "food"},
{"user": "bob", "amount": 45.0, "category": "transport"},
{"user": "alice", "amount": 300.0, "category": "shopping"},
{"user": "carol", "amount": 80.0, "category": "food"},
{"user": "bob", "amount": 200.0, "category": "shopping"},
{"user": "alice", "amount": 55.0, "category": "food"},
]
by_user = GroupBy(transactions, key_fn=lambda t: t["user"])
# Per-user count and total
summary = by_user.agg(
count=lambda g: g.count(),
total=lambda g: Query([t["amount"] for t in g.to_list()]).sum(),
)
# → [{"_key": "alice", "count": 3, "total": 475.0}, ...]
# Fetch a single group
alice_txns = by_user.get_group("alice").to_list()
# Sum per group using a field extractor
by_category = GroupBy(transactions, key_fn=lambda t: t["category"])
category_totals = by_category.sum_per_group(field=lambda t: t["amount"])
# → {"food": 255.0, "transport": 45.0, "shopping": 500.0}
# Count per group
counts = by_user.count_per_group()
# → {"alice": 3, "bob": 2, "carol": 1}
Single-pass aggregation with group_agg
group_agg runs count, sum, mean, max, and min in one Rust pass over the data,
avoiding intermediate list materialization.
from zpyflow import Query, field, agg_count, agg_sum, agg_mean
transactions = [
{"user": "alice", "amount": 120.0, "category": "food"},
{"user": "bob", "amount": 45.0, "category": "transport"},
{"user": "alice", "amount": 300.0, "category": "shopping"},
{"user": "carol", "amount": 80.0, "category": "food"},
{"user": "bob", "amount": 200.0, "category": "shopping"},
{"user": "alice", "amount": 55.0, "category": "food"},
]
# Lambda key
result = (
Query(transactions)
.group_agg(
lambda t: t["user"],
count = agg_count(),
total = agg_sum(lambda t: t["amount"]),
)
)
# → [{"_key": "alice", "count": 3, "total": 475.0},
# {"_key": "bob", "count": 2, "total": 245.0},
# {"_key": "carol", "count": 1, "total": 80.0}]
# field() DSL key — Rust-side key extraction after dict→RustObj conversion
by_category = (
Query(transactions)
.group_agg(
field("category"),
count = agg_count(),
revenue = agg_sum(lambda t: t["amount"]),
avg = agg_mean(lambda t: t["amount"]),
)
)
When to prefer
group_aggoverGroupBy:group_aggis a single Rust pass and returns immediately. UseGroupBywhen you need per-groupQueryobjects for multi-step operations (pagination, nested queries).
8. CSV and JSON Lines streaming
from zpyflow import from_csv, from_json_lines, col
# Read a single numeric column from CSV — f64 fast path
prices = from_csv("products.csv", column="price", dtype="float")
discounted = (
prices
.filter(col > 10.0)
.map(col * 0.9)
.to_list()
)
# Read all rows as dicts
products = from_csv("products.csv") # column=None → list[dict]
premium = (
products
.filter(lambda p: float(p["price"]) > 100 and p["in_stock"] == "true")
.map(lambda p: {"name": p["name"], "price": float(p["price"])})
.to_list()
)
# JSON Lines — filter at the source level
events = from_json_lines("events.ndjson")
errors = (
events
.filter(lambda e: e.get("level") == "error")
.take(1_000)
.to_list()
)
9. AI and embedding pipelines
Similarity score arrays are a natural fit for ZPyFlow's f64 fast path: large, homogeneous, numeric, and the filtering threshold is known at query time.
import numpy as np
from zpyflow import Query, col, from_numpy
# Pre-computed cosine similarity scores against a query vector (1M documents)
scores = np.random.uniform(0, 1, size=1_000_000).astype(np.float64)
doc_ids = np.arange(1_000_000, dtype=np.int64)
# Filter by threshold, retrieve top-K candidates — SIMD, GIL released
THRESHOLD = 0.85
TOP_K = 100
candidate_scores = from_numpy(scores).filter(col > THRESHOLD).to_list()
candidate_ids = from_numpy(doc_ids).filter(col > THRESHOLD).to_list()
# Pair and rank
candidates = sorted(
zip(candidate_scores, range(len(candidate_scores))),
reverse=True,
)[:TOP_K]
Batch inference scoring statistics
def score_batch_stats(batch_scores: list[float]) -> dict:
q = Query(batch_scores)
n = q.count()
return {
"n": n,
"mean": q.sum() / n,
"high_conf": q.filter(col > 0.9).count(),
"low_conf": q.filter(col < 0.5).count(),
"max": q.max(),
}
# Apply to each inference batch (all Rust, no GIL for the numeric ops)
batches = [scores[i:i+10_000].tolist() for i in range(0, len(scores), 10_000)]
batch_stats = [score_batch_stats(b) for b in batches]
Embedding norm validation
# Detect embeddings that slipped through without L2 normalization
norms = [float(np.linalg.norm(emb)) for emb in embeddings]
unnormalized_count = (
Query(norms)
.filter(lambda n: abs(n - 1.0) > 0.01)
.count()
)
print(f"{unnormalized_count} embeddings need re-normalization")
# Fast summary of norm distribution
norm_query = Query(norms)
print(f"min={norm_query.min():.4f} max={norm_query.max():.4f} "
f"mean={norm_query.sum()/norm_query.count():.4f}")
LangChain / LangGraph integration
ZPyFlow slots into LangChain and LangGraph wherever a node processes large numeric arrays. No special integration is needed — just use it in your node functions or tools.
RAG retrieval — filter similarity scores with early stopping
from langchain_core.retrievers import BaseRetriever
from langchain_core.documents import Document
from zpyflow import from_numpy, col
class ZPyFlowRetriever(BaseRetriever):
"""Retriever that uses ZPyFlow for fast threshold filtering."""
docs: list[Document]
embeddings: object # any embedding model
def _get_relevant_documents(self, query: str) -> list[Document]:
import numpy as np
query_vec = self.embeddings.embed_query(query)
scores = np.array([
np.dot(query_vec, self.embeddings.embed_documents([d.page_content])[0])
for d in self.docs
])
# SIMD filter + early stopping — never scans beyond the K-th hit
top_indices = (
from_numpy(scores)
.filter(col > 0.7)
.take(20)
.to_list()
)
return [self.docs[int(i)] for i in top_indices]
LangGraph node — aggregate tool results without materializing a full list
from langgraph.graph import StateGraph, MessagesState
def score_filter_node(state: MessagesState) -> dict:
"""LangGraph node: filter and aggregate a large score array from a tool call."""
scores: list[float] = state["tool_scores"] # e.g. 500K candidates
q = Query(scores)
return {
"candidate_count": q.filter(col > 0.8).count(), # stays in Rust
"top_score": q.max(),
"mean_score": q.sum() / q.count(),
}
graph = StateGraph(MessagesState)
graph.add_node("score_filter", score_filter_node)
LangChain tool — return pre-aggregated stats to the LLM
from langchain_core.tools import tool
from zpyflow import Query, col
@tool
def analyze_search_results(scores: list[float]) -> dict:
"""Aggregate similarity scores from a vector search."""
q = Query(scores)
n = q.count()
return {
"total": n,
"high_quality": q.filter(col > 0.85).count(),
"low_quality": q.filter(col < 0.5).count(),
"best_score": q.max(),
"mean_score": round(q.sum() / n, 4) if n else 0.0,
}
When ZPyFlow helps in AI pipelines: large numeric score arrays (similarity, confidence, reward, logprobs) where you threshold-filter and aggregate. It does not speed up LLM calls themselves (those are I/O-bound) or small lists (< 10K elements).
10. Parallel execution
.parallel() applies to numeric fast paths only (f64 / i64). Python object pipelines
ignore this hint.
from zpyflow import Query, col, from_numpy
import numpy as np
large = np.random.randn(10_000_000).tolist()
# Single-threaded: SIMD + GIL released
result_single = Query(large).filter(col > 0).map(col * 2).to_list()
# Multi-threaded: Rayon work-stealing, GIL fully released
result_parallel = Query(large).filter(col > 0).map(col * 2).parallel().to_list()
# Aggregation also parallelizes
total = Query(large).filter(col > 0).parallel().sum()
| Data size | Single-thread (SIMD) | Parallel (8 cores) |
|---|---|---|
| 1M f64 | ~3ms | ~0.8ms |
| 10M f64 | ~30ms | ~5ms |
| 100M f64 | ~300ms | ~45ms |
Note: threading overhead (split + join) means parallel mode is slower than single-threaded for inputs under ~500K elements. Profile before enabling it.
11. Full API reference
Constructors / source adapters
from zpyflow import Query, from_numpy, from_arrow, from_csv, from_json_lines
Query([1.0, 2.0, 3.0]) # list[float] → f64 fast path
Query([1, 2, 3]) # list[int] → i64 fast path
Query(["a", "b"]) # list[str] → Python path
Query(x**2 for x in range(100)) # generator (consumed eagerly)
Query(dict_or_obj_list) # any iterable
from_numpy(np_array) # 1-D numpy array; bool/uint8 stay compact
from_arrow(pa_array_or_table) # PyArrow Array / ChunkedArray / Table
from_csv("data.csv", column="price") # CSV single-column
from_csv("data.csv") # CSV all rows as list[dict]
from_json_lines("log.ndjson") # NDJSON all rows as list[dict]
from_json_lines("log.ndjson",
field="value",
dtype="float") # NDJSON single numeric field
Lazy combinators (deferred until terminal call)
q.filter(col > 0) # DSL predicate — Rust, SIMD where possible
q.filter(lambda x: x > 0) # Python callable — GIL held
q.map(col * 2.0) # DSL transform — Rust, SIMD where possible
q.map(lambda x: x * 2) # Python callable — GIL held
q.map_field("name") # extract one field from dict records (fused with field() filter)
q.take(n) # keep first n elements
q.skip(n) # drop first n elements
q.take_while(pred) # take while pred holds, then stop
q.skip_while(pred) # skip while pred holds, then emit remainder
q.parallel() # request parallel execution (numeric only)
Terminal operations (trigger execution)
q.to_list() # collect to Python list
q.to_numpy() # collect to numpy ndarray (no per-element boxing)
q.to_dict(key=fn, value=fn) # collect to Python dict
q.to_bytes() # raw f64 bytes (for zero-copy numpy.frombuffer)
q.count() # number of elements
q.sum() # sum (numeric paths use SIMD)
q.min() # minimum value
q.max() # maximum value (SIMD for f64)
q.mean() # arithmetic mean, or None if empty
q.var() # population variance (ddof=0), or None if empty
q.std() # standard deviation, or None if empty
q.stats() # count/sum/mean/min/max in one pass — {"count": N, ...}
q.first() # first element, or None
q.last() # last element, or None
q.reduce(fn, initial=val) # general fold
q.for_each(fn) # consume with side effect, returns None
q.any(pred) # True if any element satisfies pred
q.all(pred) # True if all elements satisfy pred
12. Performance
Benchmark: 1M float64 elements — filter(x > 0) + map(x * 2) + take(10_000)
| Approach | Time | Allocations | GIL |
|---|---|---|---|
| Python list comprehension | ~80ms | 2 lists | held |
| Python generator + take | ~40ms | 0 | held |
numpy (arr[arr > 0] * 2) |
~8ms | 2 arrays | released |
| ZPyFlow lambda (Python callback) | ~70–80ms | 1 list | held |
| ZPyFlow Expression DSL (SIMD) | ~2–5ms | 1 list | released |
| ZPyFlow Expression DSL + parallel | ~0.5–1ms | 1 list | released |
Note: The lambda path (Python callback) is GIL-bound and offers throughput comparable to a plain list comprehension — the benefit over raw Python is the unified pipeline API, not speed. For maximum throughput, use the Expression DSL (e.g.
col > 0,col * 2) which releases the GIL and uses SIMD.
Running the benchmarks
# Rust (Criterion) — detailed per-operation breakdown
cargo bench --bench pipeline
# SIMD selectivity analysis (10% / 25% / 50% / 75% / 90% pass rate)
cargo bench --bench simd_filter
# Python (pytest-benchmark)
pip install pytest-benchmark
pytest tests/test_performance.py -v --benchmark-autosave
Measuring memory
pip install memory-profiler
python -m memory_profiler your_script.py
Raw benchmark JSON results are saved in
sandbox/benchmark/results/ after each
make dc-bench run. Use make dc-bench-compare to diff against a saved baseline.
13. ZPyFlow vs Polars
ZPyFlow is not a Polars replacement.
When to use ZPyFlow
- Data already lives in Python as
list[float], numpy arrays, dict records, or generators - The hot path is a single fused pipeline:
filter → take,filter → count,filter → sum - Moving to a DataFrame model would require re-architecting surrounding code
- You need the GIL released from a Python list without changing the calling code
- Early-stop semantics matter:
filter(col > t).take(K)scans only until K results are found
When to use Polars (or pandas)
- The workload involves multi-column joins or window functions
- You need stats across multiple columns at once, or complex GROUP BY analytics
- Data is table-shaped and loaded from a file from the start
- SQL-style GROUP BY over multiple columns with complex group logic
Product-choice comparison
| Scenario | ZPyFlow | Polars |
|---|---|---|
filter(col > t).count() on 1M floats |
✅ ~2ms, SIMD, GIL released | ✅ ~5ms (columnar) |
filter(col > t).take(K) with small K |
✅ Early-stop, scans only until K found | ⚠️ Scans all N first |
filter + map + sum in one pass |
✅ Fused, 1 allocation | ✅ Columnar, 2–3 allocations |
| count + sum + mean + min + max in one pass | ✅ stats() — 1 SIMD pass, GIL released |
✅ 1 columnar pass |
| Multi-column join | ❌ Not supported | ✅ Native |
| Loading CSV + analyzing as a table | ⚠️ Awkward | ✅ Natural |
| Arbitrary Python objects (dicts, dataclasses) | ✅ Lambda path | ⚠️ Requires schema |
| Embedding threshold + top-K (ANN post-filter) | ✅ Fast early-stop | ⚠️ No early-stop |
Core rule: if data is already Python sequences and the pipeline is simple, ZPyFlow removes allocation and GIL cost with zero framework migration. If the data is tabular or the analysis spans multiple columns, use Polars.
The API-response case — Polars is not in the picture
Polars is rarely considered for API response processing, and for good reason:
# Typical API response: list[dict], schema unknown, nullable fields
resp = httpx.get("https://api.example.com/events")
events = resp.json() # list[dict] — arbitrary structure
# Using Polars requires a round-trip through DataFrame
df = pl.DataFrame(events) # schema inference + full materialization
active = df.filter(pl.col("status") == "active").to_dicts() # back to list[dict]
That list[dict] → DataFrame → list[dict] round-trip pays schema inference and
columnar conversion costs — for simple filtering it is strictly slower and more
code than a list comprehension.
In practice, API response processing looks like one of these:
# Before ZPyFlow — plain Python
active = [e for e in events if e["status"] == "active"]
top = sorted(active, key=lambda e: e["score"], reverse=True)[:100]
# With ZPyFlow — same semantics, GIL released after first field() filter
active = Query(events).filter(field("status") == "active").to_list()
top = Query(events).filter(field("score") >= threshold).take(100).to_list()
In this space ZPyFlow's real competitor is plain Python, not Polars.
The relevant comparison is not ZPyFlow vs Polars, but ZPyFlow vs list comprehensions
on list[dict] — and at N > 50K that gap is 3–8× in ZPyFlow's favour.
Design background
ZPyFlow is inspired by ZLinq (zero-allocation LINQ for C# / .NET).
ZLinq fuses Where().Select().Take() into a single loop at JIT time using CLR generic
specialization. ZPyFlow achieves the same fusion inside the Rust core using the
ZStream trait — every operator is a concrete generic type, LLVM inlines the full
chain at -O3. The PyO3 boundary is the only place dynamic dispatch appears, and it
is crossed once per terminal call, not once per element.
See ARCHITECTURE.md for the full design document.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distributions
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file zpyflow-0.1.0.tar.gz.
File metadata
- Download URL: zpyflow-0.1.0.tar.gz
- Upload date:
- Size: 204.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: maturin/1.13.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
16402d28d3c5c6fc1356b613271c5308c48cc2e6c050726a0654a7b966135e78
|
|
| MD5 |
f69df2a3cc7793b4a7f0ac44abc022f6
|
|
| BLAKE2b-256 |
eb7d25665385504f611e43bfdf03e9b06419fc46373f6c80ccd319d589abc7b1
|
File details
Details for the file zpyflow-0.1.0-cp310-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.
File metadata
- Download URL: zpyflow-0.1.0-cp310-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
- Upload date:
- Size: 509.2 kB
- Tags: CPython 3.10+, manylinux: glibc 2.17+ x86-64
- Uploaded using Trusted Publishing? Yes
- Uploaded via: maturin/1.13.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2819a93d756014e213f0c07ccfa1fe1eadf7840f096456cadc6b072560d3af43
|
|
| MD5 |
401c3b3e8689c114741dc40b4e938580
|
|
| BLAKE2b-256 |
89f68b2a5090adb25b9d9383e3dab3c4276121e5e15f5f6546e5d11443d2f224
|
File details
Details for the file zpyflow-0.1.0-cp310-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl.
File metadata
- Download URL: zpyflow-0.1.0-cp310-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl
- Upload date:
- Size: 477.8 kB
- Tags: CPython 3.10+, manylinux: glibc 2.17+ ARM64
- Uploaded using Trusted Publishing? Yes
- Uploaded via: maturin/1.13.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a6ffd19b729e779de1e15d8e8b50b04c8f94dab32ac3e9812b55cef15ac14b12
|
|
| MD5 |
5f9fcac4f8c11d5fafa61d8345a0536e
|
|
| BLAKE2b-256 |
d893ebf4a64544c70dc35f289ddc954be85d37abaa9df2bd11bc5e22ca622786
|
File details
Details for the file zpyflow-0.1.0-cp310-abi3-macosx_10_12_x86_64.macosx_11_0_arm64.macosx_10_12_universal2.whl.
File metadata
- Download URL: zpyflow-0.1.0-cp310-abi3-macosx_10_12_x86_64.macosx_11_0_arm64.macosx_10_12_universal2.whl
- Upload date:
- Size: 881.2 kB
- Tags: CPython 3.10+, macOS 10.12+ universal2 (ARM64, x86-64), macOS 10.12+ x86-64, macOS 11.0+ ARM64
- Uploaded using Trusted Publishing? Yes
- Uploaded via: maturin/1.13.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9535fffd7faebd5177af5debd19c0bb4e9504a886609e47028236f07c71a9066
|
|
| MD5 |
c281fbeebcc8eb68a2e00a6883bd8182
|
|
| BLAKE2b-256 |
e688307f2e0d25d961563881e491011805fb00df5bf49cc5b128fa6089444be6
|