Skip to main content

Lightweight dataset library for distributed data processing

Project description

Zephyr

Simple data processing library for Marin pipelines. Build lazy dataset pipelines that run on Iris jobs or a local backend.

Quick Start

from zephyr import Dataset, ZephyrContext, load_jsonl

# Read, transform, write
ctx = ZephyrContext(max_workers=100)
pipeline = (
    Dataset.from_files("gs://input/", "**/*.jsonl.gz")
    .flat_map(load_jsonl)
    .filter(lambda x: x["score"] > 0.5)
    .map(lambda x: transform_record(x))
    .write_jsonl("gs://output/data-{shard:05d}-of-{total:05d}.jsonl.gz")
)
ctx.execute(pipeline)

Key Patterns

Dataset Creation:

  • Dataset.from_files(path, pattern) - glob files
  • Dataset.from_list(items) - explicit list

Loading Files

  • .load_{file,parquet,jsonl,vortex} - load rows from a file

Transformations:

  • .map(fn) - transform each item
  • .flat_map(fn) - expand items (e.g., load_jsonl)
  • .filter(fn) - filter items by function or expression
  • .select(columna, columnb) - select out the given columns
  • .window(n) - group into batches
  • .reshard(n) - redistribute across n shards

Output:

  • .write_jsonl(pattern) - write JSONL (gzip if .gz)
  • .write_parquet(pattern, schema) - write to a Parquet file
  • .write_vortex(pattern) - write to a Vortex file

Execution (ZephyrContext):

  • ZephyrContext(max_workers=N) — auto-detects the backend (Iris inside an Iris job, local otherwise) via fray.current_client()
  • ZephyrContext(client=LocalClient()) — explicit local backend (testing)
  • ctx.execute(pipeline) — runs the pipeline; returns a ZephyrExecutionResult(results, counters)

Real Usage

Wikipedia Processing:

from zephyr import Dataset, ZephyrContext, load_jsonl

ctx = ZephyrContext(max_workers=100)
pipeline = (
    Dataset.from_list(files)
    .load_jsonl()
    .map(lambda row: process_record(row, config))
    .filter(lambda x: x is not None)
    .write_jsonl(f"{output}/data-{{shard:05d}}-of-{{total:05d}}.jsonl.gz")
)
ctx.execute(pipeline)

Dataset Sampling:

from zephyr import Dataset, ZephyrContext

ctx = ZephyrContext(max_workers=1000)
pipeline = (
    Dataset.from_files(input_path, "**/*.jsonl.gz")
    .map(lambda path: sample_file(path, weights))
    .write_jsonl(f"{output}/sampled-{{shard:05d}}.jsonl.gz")
)
ctx.execute(pipeline)

Parallel Downloads:

from zephyr import Dataset, ZephyrContext

tasks = [(config, fs, src, dst) for src, dst in file_pairs]
ctx = ZephyrContext(max_workers=32)
pipeline = Dataset.from_list(tasks).map(lambda t: download(*t))
ctx.execute(pipeline)

Installation

# From Marin monorepo
uv sync

# Standalone
cd lib/zephyr
uv pip install -e .

Running Tests

Zephyr tests run against multiple execution backends to ensure correctness across different environments.

All Tests on Both Backends (Default)

uv run pytest lib/zephyr/tests
# Runs all tests on both Local and Iris backends
# Local Iris cluster is started automatically via ClusterManager

Run Specific Backend Only

uv run pytest lib/zephyr/tests -k "local"
uv run pytest lib/zephyr/tests -k "iris"

The Iris cluster is started once per test session and reused across all tests for efficiency.

Design

Zephyr consolidates ad-hoc distributed and Hugging Face dataset processing patterns in Marin into a simple abstraction.

Key Features:

  • Lazy evaluation with operation fusion
  • Disk-based inter-stage data flow for low memory footprint
  • Chunk-by-chunk streaming to minimize memory pressure
  • Distributed execution with bounded parallelism (Iris/local backends)
  • Automatic chunking to prevent large object overhead
  • fsspec integration (GCS, S3, local)
  • Type-safe operation chaining

See AGENTS.md for execution internals and source layout.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

marin_zephyr-0.2.14.dev202606120949.tar.gz (77.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

marin_zephyr-0.2.14.dev202606120949-py3-none-any.whl (85.0 kB view details)

Uploaded Python 3

File details

Details for the file marin_zephyr-0.2.14.dev202606120949.tar.gz.

File metadata

File hashes

Hashes for marin_zephyr-0.2.14.dev202606120949.tar.gz
Algorithm Hash digest
SHA256 a6a382d8bd602054825b32c1edf115ea49071588e2ec1e0391fc191f92134eaa
MD5 7bec8a44056d10a648a5bf863ec84b4c
BLAKE2b-256 11af6b8c42105f2fc6bced997f638a59167e936f1b59130f1fad574761110072

See more details on using hashes here.

Provenance

The following attestation bundles were made for marin_zephyr-0.2.14.dev202606120949.tar.gz:

Publisher: marin-release-libs-wheels.yaml on marin-community/marin

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file marin_zephyr-0.2.14.dev202606120949-py3-none-any.whl.

File metadata

File hashes

Hashes for marin_zephyr-0.2.14.dev202606120949-py3-none-any.whl
Algorithm Hash digest
SHA256 26b445747898999fa0b04e66b3b7a9556d58fc2b570e8348bf3923aae2297fb0
MD5 fbb42c6d7c8c12db68e2fd8a5e0f8ab7
BLAKE2b-256 fe403e517bcce62366041df5015a34d0a0bcdc32b25f19bf91b663b7d19c4d27

See more details on using hashes here.

Provenance

The following attestation bundles were made for marin_zephyr-0.2.14.dev202606120949-py3-none-any.whl:

Publisher: marin-release-libs-wheels.yaml on marin-community/marin

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page