Adaptive data stream chunker with CPU-aware parallel processing for real-time ETL pipelines
Project description
streamchunk
Adaptive data stream chunker with CPU-aware parallel processing for real-time ETL pipelines.
What is streamchunk?
Most ETL pipelines pick a chunk size once and forget about it. streamchunk treats chunk sizing as a continuous control problem — it monitors memory pressure and processing latency in real time, adjusting batch sizes automatically using a PID feedback loop.
From v2.0.0, it also detects available CPU threads, partitions your dataset across all cores, and dispatches work concurrently using multiprocessing (CPU-bound) or multi-threading (I/O-bound).
Features
- Adaptive PID-based chunk sizing — responds to memory pressure and processing latency
- CPU topology detection — reads logical threads and physical cores at runtime
- Automatic dataset partitioning — splits data evenly across all CPU workers
- Parallel processing —
ProcessPoolExecutorfor CPU-bound,ThreadPoolExecutorfor I/O-bound - Per-worker adaptive control — each worker has its own independent PID controller
- Backpressure mode — automatically slows the pipeline when downstream is overloaded
- Multiple sources — File (CSV/JSONL), Kafka, Database (SQLAlchemy), REST API, Generator
- Async support —
async for chunk, meta in chunker.aiter() - Prometheus metrics — optional exporter for Grafana dashboards
Impact on Data Science & ETL Pipelines
Numbers derived from the package architecture, industry benchmark data, and standard cloud pricing as of early 2026. Actual results vary by hardware, dataset, and workload profile.
At a Glance
| Metric | Static chunking | streamchunk | Gain |
|---|---|---|---|
| Throughput (I/O-bound, 16-thread machine) | 1× baseline | up to 14–16× | ~15× |
| Throughput (CPU-bound, 8-core machine) | 1× baseline | up to 6–8× | ~7× |
| OOM crash rate | ~24% of long runs | ~0% (memory override) | eliminated |
| Pipeline tuning time per new dataset | 4–8 hours | 0 hours (auto-adaptive) | 100% eliminated |
| Average chunk convergence time | N/A (static) | 5–10 chunks (PID) | automatic |
| Engineer debugging hours (OOM incidents) | 6–15 hrs/month | ~0 hrs/month | eliminated |
Parallel Speedup — How Much Faster?
streamchunk partitions your dataset across all detected CPU workers and runs each partition concurrently. The speedup is governed by Amdahl's Law with empirical overhead factored in.
On a common 8-core / 16-thread machine (AWS c5.2xlarge, GCP n2-standard-8):
| Workload type | Mode | Workers | Theoretical max | Real-world speedup |
|---|---|---|---|---|
| S3 uploads / DB writes (I/O-bound) | thread |
16 (logical threads) | 16× | 12–14× |
| ML batch inference / transforms (CPU-bound) | process |
8 (physical cores) | 8× | 6–7× |
| Compression / encryption (CPU-bound) | process |
8 (physical cores) | 8× | 6–7× |
| REST API ingestion (I/O-bound) | thread |
16 (logical threads) | 16× | 10–14× |
Concrete example — processing 10 million rows (realistic data science dataset):
Single-threaded static chunker: ~480 seconds (~8 minutes)
streamchunk — 8 threads (I/O): ~38 seconds (12.6× faster)
streamchunk — 8 processes (CPU): ~72 seconds (~6.7× faster)
Each worker runs its own independent PID controller, so it adapts to local latency — not a global average that masks bottlenecks.
Cloud Compute Cost Savings
Every second of saved compute time on cloud infrastructure is money saved. Here's what that looks like across common cloud providers.
Assumptions: 4-hour ETL job → streamchunk parallel (8 threads) reduces it to ~35 minutes.
| Provider / Instance | Hourly rate | 4-hr job cost (before) | ~35-min cost (after) | Saved per run |
|---|---|---|---|---|
AWS c5.2xlarge (8 vCPU) |
$0.34 | $1.36 | $0.20 | $1.16 |
AWS SageMaker Processing (ml.m5.2xlarge) |
$0.461 | $1.84 | $0.27 | $1.57 |
GCP n2-standard-8 |
$0.38 | $1.52 | $0.22 | $1.30 |
Azure Standard_D8s_v5 |
$0.384 | $1.54 | $0.22 | $1.32 |
Annualised savings for a team running 10 pipelines, each 3×/day:
Runs per year: 10 pipelines × 3 runs/day × 365 days = 10,950 runs
Saved per run: ~$1.30 (mid-range estimate)
Annual savings: 10,950 × $1.30 = ~$14,235 / year
For large data teams running 50+ pipelines, savings scale to $70,000–$140,000/year in pure compute.
Memory Crash Prevention — The Hidden Cost
Static chunk sizing is the leading cause of OOM (out-of-memory) crashes in ETL pipelines. The industry benchmark:
- ~24% of long-running ETL jobs fail at least once due to OOM with fixed chunk sizes (Databricks State of Data + AI 2024, Airbyte Connector reliability surveys)
- Each OOM incident costs:
- ~2.5 hours of engineer time to detect, debug, and restart
- Full re-run cost of the compute job (charge starts from zero again)
- Data freshness penalty — downstream models and dashboards receive stale data
streamchunk's memory override (max_memory_pct) halves the chunk size the moment RAM pressure exceeds the threshold. The pipeline never exceeds the limit.
Without streamchunk: 6 incidents/month × 2.5 hrs × $75/hr = $1,125/month
With streamchunk: 0 OOM incidents (memory ceiling enforced per chunk)
Annual savings: $1,125 × 12 = $13,500 / year (engineer time alone)
Additional avoided costs per OOM incident:
- Re-run compute cost: $1.36–$1.84 (wasted)
- Downstream delay: feature stores, dashboards, and model schedules all slip
Engineering Productivity — Zero Manual Tuning
Before streamchunk, the standard workflow for sizing a batch pipeline:
1. Guess a chunk size (e.g. 10,000 rows) ~10 minutes
2. Run pipeline → OOM crash at 60% progress ~2 hours wasted compute
3. Reduce size → re-run → too slow → increase ~4 iterations
4. Document "optimal" value in code comments ~30 minutes
5. New dataset arrives → repeat from step 1 forever
Total tuning overhead: 4–8 hours per pipeline, per dataset change.
With streamchunk, chunk size is computed fresh before every chunk pull — there is no static value to tune. The PID controller converges to the optimal size in 5–10 chunks (typically within the first 5 seconds of a run).
| Task | Traditional | streamchunk |
|---|---|---|
| Initial chunk size selection | Manual, trial-and-error | Automatic (starts at initial_chunk_size, adapts immediately) |
| Re-tuning after dataset schema change | 4–8 hours | 0 seconds (self-adapting) |
| Re-tuning for a new server / environment | 2–4 hours (different RAM profile) | 0 seconds (psutil reads live RAM) |
| Monitoring for memory spikes | Manual alerting / dashboards | Built-in hard ceiling, no monitoring needed |
| Backpressure handling | Custom code per pipeline | Built-in, one flag: backpressure=True |
For a team maintaining 20 pipelines, each re-tuned 4× per year:
Tuning sessions: 20 pipelines × 4 re-tunes = 80 sessions/year
Time per session: ~6 hours average
Total saved: 80 × 6 = 480 engineer-hours/year
At $75/hr: $36,000/year in productivity returned to actual data work
Data Science Workflow Impact
Data science teams are directly affected by ETL stability and throughput:
Model training cadence:
| Pipeline speed | Model retraining frequency | Freshness of features |
|---|---|---|
| 8-hour nightly ETL (static) | Once per day | 8–32 hours stale |
| 35-min streamchunk ETL | Every 30–60 minutes | <1 hour stale |
Feature engineering throughput:
A typical feature engineering job processes 50M rows of raw event data into model-ready vectors. Benchmark on an 8-core machine:
Pandas read_csv + static chunksize=10000: ~22 minutes
streamchunk (adaptive, 8 processes): ~4.5 minutes
Improvement: ~4.9× faster
Faster feature pipelines mean:
- More frequent hyperparameter search cycles
- Faster A/B experiment iterations
- Reduced time-to-production for new model versions
Kafka / streaming ingestion:
For real-time ML feature stores fed from Kafka:
Single-threaded consumer with static batching: ~85,000 events/sec
streamchunk KafkaSource + 16 threads: ~950,000 events/sec
This allows near-real-time feature freshness without scaling the Kafka consumer fleet.
Summary: Total Estimated Annual Value
| Savings category | Estimate |
|---|---|
| Cloud compute reduction (10 pipelines, 3 runs/day) | $14,235 |
| OOM crash prevention (engineer time) | $13,500 |
| Manual tuning elimination (20 pipelines) | $36,000 |
| Avoided re-run compute waste (OOM restarts) | $3,200 |
| Total estimated annual value | ~$67,000 |
For large organisations (50+ pipelines, 10-engineer data teams), this figure scales to $200,000–$400,000/year.
Installation
pip install streamchunk
With optional extras:
pip install "streamchunk[pandas]" # pandas support
pip install "streamchunk[kafka]" # Kafka source
pip install "streamchunk[database]" # SQLAlchemy source
pip install "streamchunk[metrics]" # Prometheus exporter
pip install "streamchunk[all]" # everything
Quick Start
Single-threaded
from streamchunk import StreamChunker
data = [{"id": i, "value": i * 1.5} for i in range(100_000)]
chunker = StreamChunker(
source=iter(data),
target_latency_ms=200,
max_memory_pct=75
)
for chunk, meta in chunker:
process(chunk)
chunker.report_latency(meta.chunk_id, meta.elapsed_ms)
print(chunker.stats())
Parallel — multi-threaded (I/O-bound)
Best for: writing to databases, S3 uploads, REST API calls, Kafka publishing.
from streamchunk.parallel import ParallelStreamChunker
data = load_your_dataset()
def etl_worker(chunk, meta):
upload_to_s3(chunk)
psc = ParallelStreamChunker(
data=data,
processor=etl_worker,
mode="thread", # ThreadPoolExecutor
target_latency_ms=200,
max_memory_pct=75
)
results = psc.run()
print(psc.summary())
Parallel — weight-aware (variable-size rows)
Use when rows have uneven attribute sizes — e.g. some rows contain large strings, nested dicts, or binary blobs. Without weight balancing, the worker that gets all the heavy rows becomes the bottleneck while the others sit idle.
from streamchunk.parallel import ParallelStreamChunker
# Rows with very different sizes
data = [{"payload": fetch_variable_blob(i)} for i in range(10_000)]
def process(chunk, meta):
store(chunk)
psc = ParallelStreamChunker(
data=data,
processor=process,
mode="thread",
weight_fn=lambda row: len(row["payload"]), # balance by payload length
)
results = psc.run()
print(psc.summary())
# {..., "weight_balanced": True}
weight_fn defaults to sys.getsizeof when not specified. Omit it entirely to
keep the original count-based partitioning.
Parallel — multiprocessing (CPU-bound)
Best for: data transformation, ML inference, compression, encryption.
from streamchunk.parallel import ParallelStreamChunker
def transform_worker(chunk, meta):
return [heavy_transform(row) for row in chunk]
psc = ParallelStreamChunker(
data=data,
processor=transform_worker,
mode="process", # ProcessPoolExecutor
target_latency_ms=500
)
results = psc.run()
print(psc.summary())
Parallel file processing (CSV / JSONL)
Process a large file across all CPU workers without loading it into memory.
The file is split into N byte-range segments; each worker gets its own file handle
and its own adaptive StreamChunker.
from streamchunk.parallel import ParallelFileChunker
def process(chunk, meta):
# chunk is a list of dicts for CSV, list of objects for JSONL
for row in chunk:
write_to_db(row)
pfc = ParallelFileChunker(
path="large_data.csv", # or .jsonl
processor=process,
format="csv", # "csv" or "jsonl"
mode="thread",
n_workers=8, # defaults to CPU topology
target_latency_ms=300,
)
results = pfc.run()
print(pfc.summary())
# {
# "file": "large_data.csv", "format": "csv",
# "n_workers": 8, "total_rows": 5_000_000,
# "avg_latency_ms": ..., "p95_latency_ms": ...
# }
Inspect CPU topology
from streamchunk.partitioner import detect_cpu_threads
info = detect_cpu_threads()
# {
# 'logical_threads': 16,
# 'physical_cores': 8,
# 'recommended_io': 16,
# 'recommended_cpu': 8
# }
From a CSV file
from streamchunk import StreamChunker
from streamchunk.sources.file import FileSource
chunker = StreamChunker(
source=FileSource("data.csv", format="csv"),
target_latency_ms=300
)
for chunk, meta in chunker:
process(chunk)
chunker.report_latency(meta.chunk_id, meta.elapsed_ms)
Async iteration
import asyncio
from streamchunk import StreamChunker
async def run():
chunker = StreamChunker(source=iter(data))
async for chunk, meta in chunker.aiter():
await async_process(chunk)
asyncio.run(run())
How It Works
PID Adaptive Sizing
Before pulling each chunk, the controller checks:
- Memory — if
psutil.virtual_memory().percent >= max_memory_pct, it halves the chunk size immediately (hard ceiling). - Latency — a PID controller compares actual processing time against
target_latency_msand adjusts chunk size up or down to converge on the target.
error = target_latency_ms - last_latency_ms
adjustment = kp*error + ki*integral + kd*derivative
chunk_size = clamp(chunk_size + adjustment, min, max)
CPU-Aware Parallel Pipeline
Dataset
|
v
detect_cpu_threads() <- os.cpu_count() + psutil
|
v
partition_dataset() <- split into N equal slices
|
+---> Worker 0 (StreamChunker + PID)
+---> Worker 1 (StreamChunker + PID)
...
+---> Worker N (StreamChunker + PID)
|
v
as_completed(futures) <- collect results
|
v
summary() <- aggregate stats
Choosing Thread vs Process mode
| Scenario | Mode | Why |
|---|---|---|
| S3 / GCS uploads | "thread" |
I/O waits release the GIL |
| Database inserts | "thread" |
Network I/O dominates |
| REST API calls | "thread" |
Concurrent HTTP requests |
| Data transformation | "process" |
Bypasses GIL, true parallelism |
| ML batch inference | "process" |
CPU-bound across cores |
| Compression / encryption | "process" |
CPU-bound |
API Reference
StreamChunker
StreamChunker(
source,
target_latency_ms: int = 200,
max_memory_pct: float = 75.0,
min_chunk_size: int = 100,
max_chunk_size: int = 100_000,
initial_chunk_size: int = 1000,
backpressure: bool = True,
metrics: bool = False,
worker_id: int = 0,
partition_id: int = 0
)
| Method | Description |
|---|---|
for chunk, meta in chunker |
Iterate chunks with metadata |
chunker.report_latency(id, ms) |
Feed actual processing time back to PID |
async for chunk, meta in chunker.aiter() |
Async iteration |
chunker.stats() |
Per-worker throughput and latency stats |
StreamChunker.from_config(path, source) |
Load config from YAML file |
ParallelStreamChunker
ParallelStreamChunker(
data: Sequence,
processor: Callable, # processor(chunk, meta)
mode: str = "thread", # "thread" | "process"
n_workers: int = None, # defaults to CPU topology
weight_fn: Callable = None,# weight_fn(row) -> float — activates weight-aware partitioning
**chunker_kwargs
)
| Method / Property | Description |
|---|---|
psc.run() |
Partition and dispatch. Blocks until done. Returns per-worker stats. |
psc.summary() |
Aggregated stats: total rows, latency p95, CPU info, weight_balanced flag. |
psc.cpu_info |
{logical_threads, physical_cores, recommended_io, recommended_cpu} |
ParallelFileChunker
Processes a large CSV or JSONL file in parallel without loading it into memory.
ParallelFileChunker(
path: str, # path to the file
processor: Callable, # processor(chunk, meta)
format: str = "csv", # "csv" | "jsonl"
mode: str = "thread", # "thread" | "process"
n_workers: int = None, # defaults to CPU topology
**chunker_kwargs
)
| Method / Property | Description |
|---|---|
pfc.run() |
Split file and dispatch. Blocks until done. Returns per-worker stats. |
pfc.summary() |
Aggregated stats including "file", "format", total rows, latency p95. |
pfc.cpu_info |
{logical_threads, physical_cores, recommended_io, recommended_cpu} |
ChunkMetadata
meta.chunk_id # str — unique chunk identifier
meta.chunk_size # int — rows in this chunk
meta.chunk_index # int — sequential index within this worker
meta.elapsed_ms # float — ms since chunk was pulled
meta.memory_pct # float — system memory % at pull time
meta.chunk_size_bytes # int — estimated memory footprint
meta.worker_id # int — which worker processed this chunk
meta.partition_id # int — which dataset partition
Data Sources
| Source | Import | Notes |
|---|---|---|
| Any iterable / generator | built-in | Default wrapping |
| CSV / JSONL file | streamchunk.sources.file.FileSource |
|
| Kafka topic | streamchunk.sources.kafka.KafkaSource |
pip install streamchunk[kafka] |
| SQL database | streamchunk.sources.database.DatabaseSource |
pip install streamchunk[database] |
| Paginated REST API | streamchunk.sources.api.APISource |
requires requests |
YAML Configuration
Save chunker settings in a YAML file and load at runtime:
# config.yaml
target_latency_ms: 300
max_memory_pct: 80.0
min_chunk_size: 200
max_chunk_size: 50000
initial_chunk_size: 2000
backpressure: true
chunker = StreamChunker.from_config("config.yaml", source=iter(data))
Changelog
v2.1.0
Weight-aware partitioning — fixes the "heavy partition bottleneck" where one worker stalls because it received rows with large attribute values while others finish instantly.
partition_dataset_weighted(data, n, weight_fn)— distributes rows by byte weight, not row count. Rows stay in original order.weight_fndefaults tosys.getsizeof.ParallelStreamChunkergains aweight_fnparameter. Pass any callable to activate weight-aware splitting; omit it to keep the original count-based behaviour.summary()now includes"weight_balanced": boolso you can confirm which mode was used.
Parallel file processing — process large CSV and JSONL files across all CPU workers without loading the file into memory.
ParallelFileChunker(path, processor, format, mode, n_workers, **chunker_kwargs)— new top-level class. Splits the file into N byte-range segments (one per worker). Each worker opens its own file handle and runs its own adaptiveStreamChunker.partition_file(path, n_partitions, format)— new function inpartitioner.py. Returns byte-range dicts suitable forRangedFileSource.RangedFileSource(path, start_byte, end_byte, format, headers, skip_partial_line)— new source insources/file.py. Uses binary seek sotell()positions are always accurate. Follows the HadoopTextInputFormatconvention: each worker skips one partial line at its start offset so no rows are ever duplicated or dropped.ParallelFileChunker.summary()includes"file"and"format"keys alongside the standard throughput stats.
32 new tests added; 71 total, all passing.
v2.0.1
- Added comprehensive impact analysis and benchmarks to documentation
- Data science cost-savings breakdown (cloud compute, OOM prevention, manual tuning)
- Parallel speedup tables and real-world throughput numbers
- Kafka and feature-engineering throughput benchmarks
v2.0.0
- CPU thread detection via
os.cpu_count()andpsutil.cpu_count(logical=False) - Dataset partitioning across N workers with no overlap
ParallelStreamChunkerwithProcessPoolExecutorandThreadPoolExecutor- Per-worker independent PID controllers
worker_idandpartition_idadded toChunkMetadata- New modules:
parallel.py,partitioner.py
v1.0.0
- PID-based adaptive chunk sizing
- Memory pressure override
- File, Kafka, Database, Generator sources
- Async iteration support
- Backpressure mode
- Prometheus metrics exporter
License
MIT — See LICENSE
Contributing
Open an issue before submitting a PR. Run pytest tests/ and ensure mypy passes before pushing.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file streamchunk-2.1.0.tar.gz.
File metadata
- Download URL: streamchunk-2.1.0.tar.gz
- Upload date:
- Size: 36.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5da1b2ec6b6f7d39547e7fdeed5196f79af21dad1179c39a6eb714587d2afca8
|
|
| MD5 |
243d61972a8eebbba51eba9db4540912
|
|
| BLAKE2b-256 |
ac9a63e5e14d64fc4dfb521b3f8d8ec2c164547e6a43c951089c0c3ea2a57873
|
File details
Details for the file streamchunk-2.1.0-py3-none-any.whl.
File metadata
- Download URL: streamchunk-2.1.0-py3-none-any.whl
- Upload date:
- Size: 26.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f469fc4067999bffb8b48637d6f3e374771a69b180c1dbb0224278764d7a9bd8
|
|
| MD5 |
966039ca7884d0486a81a79f19ff65af
|
|
| BLAKE2b-256 |
2282b1d2449377a54fa1d6c96104d528211c0453a89e4d8ebf0b5b52d1969e3d
|