Lightweight dataset library for distributed data processing
Project description
Zephyr
Simple data processing library for Marin pipelines. Build lazy dataset pipelines that run on Iris jobs or a local backend.
Quick Start
from zephyr import Dataset, ZephyrContext, load_jsonl
# Read, transform, write
ctx = ZephyrContext(max_workers=100)
pipeline = (
Dataset.from_files("gs://input/", "**/*.jsonl.gz")
.flat_map(load_jsonl)
.filter(lambda x: x["score"] > 0.5)
.map(lambda x: transform_record(x))
.write_jsonl("gs://output/data-{shard:05d}-of-{total:05d}.jsonl.gz")
)
ctx.execute(pipeline)
Key Patterns
Dataset Creation:
Dataset.from_files(path, pattern)- glob filesDataset.from_list(items)- explicit list
Loading Files
.load_{file,parquet,jsonl,vortex}- load rows from a file
Transformations:
.map(fn)- transform each item.flat_map(fn)- expand items (e.g.,load_jsonl).filter(fn)- filter items by function or expression.select(columna, columnb)- select out the given columns.window(n)- group into batches.reshard(n)- redistribute across n shards
Output:
.write_jsonl(pattern)- write JSONL (gzip if.gz).write_parquet(pattern, schema)- write to a Parquet file.write_vortex(pattern)- write to a Vortex file
Execution (ZephyrContext):
ZephyrContext(max_workers=N)— auto-detects the backend (Iris inside an Iris job, local otherwise) viafray.current_client()ZephyrContext(client=LocalClient())— explicit local backend (testing)ctx.execute(pipeline)— runs the pipeline; returns aZephyrExecutionResult(results, counters)
Real Usage
Wikipedia Processing:
from zephyr import Dataset, ZephyrContext, load_jsonl
ctx = ZephyrContext(max_workers=100)
pipeline = (
Dataset.from_list(files)
.load_jsonl()
.map(lambda row: process_record(row, config))
.filter(lambda x: x is not None)
.write_jsonl(f"{output}/data-{{shard:05d}}-of-{{total:05d}}.jsonl.gz")
)
ctx.execute(pipeline)
Dataset Sampling:
from zephyr import Dataset, ZephyrContext
ctx = ZephyrContext(max_workers=1000)
pipeline = (
Dataset.from_files(input_path, "**/*.jsonl.gz")
.map(lambda path: sample_file(path, weights))
.write_jsonl(f"{output}/sampled-{{shard:05d}}.jsonl.gz")
)
ctx.execute(pipeline)
Parallel Downloads:
from zephyr import Dataset, ZephyrContext
tasks = [(config, fs, src, dst) for src, dst in file_pairs]
ctx = ZephyrContext(max_workers=32)
pipeline = Dataset.from_list(tasks).map(lambda t: download(*t))
ctx.execute(pipeline)
Installation
# From Marin monorepo
uv sync
# Standalone
cd lib/zephyr
uv pip install -e .
Running Tests
Zephyr tests run against multiple execution backends to ensure correctness across different environments.
All Tests on Both Backends (Default)
uv run pytest lib/zephyr/tests
# Runs all tests on both Local and Iris backends
# Local Iris cluster is started automatically via ClusterManager
Run Specific Backend Only
uv run pytest lib/zephyr/tests -k "local"
uv run pytest lib/zephyr/tests -k "iris"
The Iris cluster is started once per test session and reused across all tests for efficiency.
Design
Zephyr consolidates ad-hoc distributed and Hugging Face dataset processing patterns in Marin into a simple abstraction.
Key Features:
- Lazy evaluation with operation fusion
- Disk-based inter-stage data flow for low memory footprint
- Chunk-by-chunk streaming to minimize memory pressure
- Distributed execution with bounded parallelism (Iris/local backends)
- Automatic chunking to prevent large object overhead
- fsspec integration (GCS, S3, local)
- Type-safe operation chaining
See AGENTS.md for execution internals and source layout.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file marin_zephyr-0.99.tar.gz.
File metadata
- Download URL: marin_zephyr-0.99.tar.gz
- Upload date:
- Size: 71.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.8.17
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
152b61ecae82d823639d9f8cbc7dda0484bc9f6b5db12d0d5f4bead79301ddec
|
|
| MD5 |
ce7e974eaa3b0620261dd8b00d3c4b54
|
|
| BLAKE2b-256 |
9fa2e9e3eb4232842ae6d32b912359f6a92a27d2b1682855003096fe3f4cf3b7
|
File details
Details for the file marin_zephyr-0.99-py3-none-any.whl.
File metadata
- Download URL: marin_zephyr-0.99-py3-none-any.whl
- Upload date:
- Size: 77.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.8.17
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c23f622d454d2f9b0a6015b1b871eaa7d3056f411fb1bab5028690bd02d98542
|
|
| MD5 |
f6e2a52ada5eaffb1624f314dcba4f40
|
|
| BLAKE2b-256 |
88913cdb97560e7806cd11ad24abd4f398126f97108c939f044a2b073a22fe0a
|