Skip to main content

Lightweight dataset library for distributed data processing

Project description

Zephyr

Simple data processing library for Marin pipelines. Build lazy dataset pipelines that run on Iris jobs or a local backend.

Quick Start

from zephyr import Dataset, ZephyrContext, load_jsonl

# Read, transform, write
ctx = ZephyrContext(max_workers=100)
pipeline = (
    Dataset.from_files("gs://input/", "**/*.jsonl.gz")
    .flat_map(load_jsonl)
    .filter(lambda x: x["score"] > 0.5)
    .map(lambda x: transform_record(x))
    .write_jsonl("gs://output/data-{shard:05d}-of-{total:05d}.jsonl.gz")
)
ctx.execute(pipeline)

Key Patterns

Dataset Creation:

  • Dataset.from_files(path, pattern) - glob files
  • Dataset.from_list(items) - explicit list

Loading Files

  • .load_{file,parquet,jsonl,vortex} - load rows from a file

Transformations:

  • .map(fn) - transform each item
  • .flat_map(fn) - expand items (e.g., load_jsonl)
  • .filter(fn) - filter items by function or expression
  • .select(columna, columnb) - select out the given columns
  • .window(n) - group into batches
  • .reshard(n) - redistribute across n shards

Output:

  • .write_jsonl(pattern) - write JSONL (gzip if .gz)
  • .write_parquet(pattern, schema) - write to a Parquet file
  • .write_vortex(pattern) - write to a Vortex file

Execution (ZephyrContext):

  • ZephyrContext(max_workers=N) — auto-detects the backend (Iris inside an Iris job, local otherwise) via fray.current_client()
  • ZephyrContext(client=LocalClient()) — explicit local backend (testing)
  • ctx.execute(pipeline) — runs the pipeline; returns a ZephyrExecutionResult(results, counters)

Real Usage

Wikipedia Processing:

from zephyr import Dataset, ZephyrContext, load_jsonl

ctx = ZephyrContext(max_workers=100)
pipeline = (
    Dataset.from_list(files)
    .load_jsonl()
    .map(lambda row: process_record(row, config))
    .filter(lambda x: x is not None)
    .write_jsonl(f"{output}/data-{{shard:05d}}-of-{{total:05d}}.jsonl.gz")
)
ctx.execute(pipeline)

Dataset Sampling:

from zephyr import Dataset, ZephyrContext

ctx = ZephyrContext(max_workers=1000)
pipeline = (
    Dataset.from_files(input_path, "**/*.jsonl.gz")
    .map(lambda path: sample_file(path, weights))
    .write_jsonl(f"{output}/sampled-{{shard:05d}}.jsonl.gz")
)
ctx.execute(pipeline)

Parallel Downloads:

from zephyr import Dataset, ZephyrContext

tasks = [(config, fs, src, dst) for src, dst in file_pairs]
ctx = ZephyrContext(max_workers=32)
pipeline = Dataset.from_list(tasks).map(lambda t: download(*t))
ctx.execute(pipeline)

Installation

# From Marin monorepo
uv sync

# Standalone
cd lib/zephyr
uv pip install -e .

Running Tests

Zephyr tests run against multiple execution backends to ensure correctness across different environments.

All Tests on Both Backends (Default)

uv run pytest lib/zephyr/tests
# Runs all tests on both Local and Iris backends
# Local Iris cluster is started automatically via ClusterManager

Run Specific Backend Only

uv run pytest lib/zephyr/tests -k "local"
uv run pytest lib/zephyr/tests -k "iris"

The Iris cluster is started once per test session and reused across all tests for efficiency.

Design

Zephyr consolidates ad-hoc distributed and Hugging Face dataset processing patterns in Marin into a simple abstraction.

Key Features:

  • Lazy evaluation with operation fusion
  • Disk-based inter-stage data flow for low memory footprint
  • Chunk-by-chunk streaming to minimize memory pressure
  • Distributed execution with bounded parallelism (Iris/local backends)
  • Automatic chunking to prevent large object overhead
  • fsspec integration (GCS, S3, local)
  • Type-safe operation chaining

See AGENTS.md for execution internals and source layout.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

marin_zephyr-0.2.20.dev202606180959.tar.gz (84.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

marin_zephyr-0.2.20.dev202606180959-py3-none-any.whl (94.3 kB view details)

Uploaded Python 3

File details

Details for the file marin_zephyr-0.2.20.dev202606180959.tar.gz.

File metadata

File hashes

Hashes for marin_zephyr-0.2.20.dev202606180959.tar.gz
Algorithm Hash digest
SHA256 3c8aceb574fb22f9b2f63e5bf882d1429f9d425752e0ff33388f16c24b673bf2
MD5 8eaf0008408d637094c7e88ff5ef77c1
BLAKE2b-256 4d58c76316f08fa5cd016fb8561a056ab1ba92c587da6ee52a4f7f73ca901f8a

See more details on using hashes here.

Provenance

The following attestation bundles were made for marin_zephyr-0.2.20.dev202606180959.tar.gz:

Publisher: marin-release-libs-wheels.yaml on marin-community/marin

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file marin_zephyr-0.2.20.dev202606180959-py3-none-any.whl.

File metadata

File hashes

Hashes for marin_zephyr-0.2.20.dev202606180959-py3-none-any.whl
Algorithm Hash digest
SHA256 f8a34b7477ae4aa94f7858ae2963e8957142c350f634d11abc565162b581dff3
MD5 e3b7b525650ed7fed2a1f276be4dd65d
BLAKE2b-256 e89eb709ba705fed73f5a126d5cc5f87caea732fe2f11551e9c3cd021151d998

See more details on using hashes here.

Provenance

The following attestation bundles were made for marin_zephyr-0.2.20.dev202606180959-py3-none-any.whl:

Publisher: marin-release-libs-wheels.yaml on marin-community/marin

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page