Skip to main content

Lightweight dataset library for distributed data processing

Project description

Zephyr

Simple data processing library for Marin pipelines. Build lazy dataset pipelines that run on Iris jobs or a local backend.

Quick Start

from zephyr import Dataset, ZephyrContext, load_jsonl

# Read, transform, write
ctx = ZephyrContext(max_workers=100)
pipeline = (
    Dataset.from_files("gs://input/", "**/*.jsonl.gz")
    .flat_map(load_jsonl)
    .filter(lambda x: x["score"] > 0.5)
    .map(lambda x: transform_record(x))
    .write_jsonl("gs://output/data-{shard:05d}-of-{total:05d}.jsonl.gz")
)
ctx.execute(pipeline)

Key Patterns

Dataset Creation:

  • Dataset.from_files(path, pattern) - glob files
  • Dataset.from_list(items) - explicit list

Loading Files

  • .load_{file,parquet,jsonl,vortex} - load rows from a file

Transformations:

  • .map(fn) - transform each item
  • .flat_map(fn) - expand items (e.g., load_jsonl)
  • .filter(fn) - filter items by function or expression
  • .select(columna, columnb) - select out the given columns
  • .window(n) - group into batches
  • .reshard(n) - redistribute across n shards

Output:

  • .write_jsonl(pattern) - write JSONL (gzip if .gz)
  • .write_parquet(pattern, schema) - write to a Parquet file
  • .write_vortex(pattern) - write to a Vortex file

Execution (ZephyrContext):

  • ZephyrContext(max_workers=N) — auto-detects the backend (Iris inside an Iris job, local otherwise) via fray.current_client()
  • ZephyrContext(client=LocalClient()) — explicit local backend (testing)
  • ctx.execute(pipeline) — runs the pipeline; returns a ZephyrExecutionResult(results, counters)

Real Usage

Wikipedia Processing:

from zephyr import Dataset, ZephyrContext, load_jsonl

ctx = ZephyrContext(max_workers=100)
pipeline = (
    Dataset.from_list(files)
    .load_jsonl()
    .map(lambda row: process_record(row, config))
    .filter(lambda x: x is not None)
    .write_jsonl(f"{output}/data-{{shard:05d}}-of-{{total:05d}}.jsonl.gz")
)
ctx.execute(pipeline)

Dataset Sampling:

from zephyr import Dataset, ZephyrContext

ctx = ZephyrContext(max_workers=1000)
pipeline = (
    Dataset.from_files(input_path, "**/*.jsonl.gz")
    .map(lambda path: sample_file(path, weights))
    .write_jsonl(f"{output}/sampled-{{shard:05d}}.jsonl.gz")
)
ctx.execute(pipeline)

Parallel Downloads:

from zephyr import Dataset, ZephyrContext

tasks = [(config, fs, src, dst) for src, dst in file_pairs]
ctx = ZephyrContext(max_workers=32)
pipeline = Dataset.from_list(tasks).map(lambda t: download(*t))
ctx.execute(pipeline)

Installation

# From Marin monorepo
uv sync

# Standalone
cd lib/zephyr
uv pip install -e .

Running Tests

Zephyr tests run against multiple execution backends to ensure correctness across different environments.

All Tests on Both Backends (Default)

uv run pytest lib/zephyr/tests
# Runs all tests on both Local and Iris backends
# Local Iris cluster is started automatically via ClusterManager

Run Specific Backend Only

uv run pytest lib/zephyr/tests -k "local"
uv run pytest lib/zephyr/tests -k "iris"

The Iris cluster is started once per test session and reused across all tests for efficiency.

Design

Zephyr consolidates ad-hoc distributed and Hugging Face dataset processing patterns in Marin into a simple abstraction.

Key Features:

  • Lazy evaluation with operation fusion
  • Disk-based inter-stage data flow for low memory footprint
  • Chunk-by-chunk streaming to minimize memory pressure
  • Distributed execution with bounded parallelism (Iris/local backends)
  • Automatic chunking to prevent large object overhead
  • fsspec integration (GCS, S3, local)
  • Type-safe operation chaining

See AGENTS.md for execution internals and source layout.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

marin_zephyr-0.2.8.dev202606050858.tar.gz (75.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

marin_zephyr-0.2.8.dev202606050858-py3-none-any.whl (81.3 kB view details)

Uploaded Python 3

File details

Details for the file marin_zephyr-0.2.8.dev202606050858.tar.gz.

File metadata

File hashes

Hashes for marin_zephyr-0.2.8.dev202606050858.tar.gz
Algorithm Hash digest
SHA256 b611d331f5ef6f66deb6365f0126060d59d5b3c5b2e8a35aaae527bb2680510a
MD5 a4a368a7eef714702f255c2b55e04472
BLAKE2b-256 5994b56d677325244ec50ff5b93b7bca0a993261b4d2b65b18712cf363a621f6

See more details on using hashes here.

Provenance

The following attestation bundles were made for marin_zephyr-0.2.8.dev202606050858.tar.gz:

Publisher: marin-release-libs-wheels.yaml on marin-community/marin

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file marin_zephyr-0.2.8.dev202606050858-py3-none-any.whl.

File metadata

File hashes

Hashes for marin_zephyr-0.2.8.dev202606050858-py3-none-any.whl
Algorithm Hash digest
SHA256 80d0c137db7ac6a08acb99d8f41fb33084900b80275316d2ce84f2ab6d436fac
MD5 29c093591b32c035cb411a6b138da6e3
BLAKE2b-256 863b630f47d56228c619b347e7ca508faaf51b3e59d2e14ad747d04bafdc2491

See more details on using hashes here.

Provenance

The following attestation bundles were made for marin_zephyr-0.2.8.dev202606050858-py3-none-any.whl:

Publisher: marin-release-libs-wheels.yaml on marin-community/marin

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page