Skip to main content

Lightweight dataset library for distributed data processing

Project description

Zephyr

Simple data processing library for Marin pipelines. Build lazy dataset pipelines that run on Iris jobs or a local backend.

Quick Start

from zephyr import Dataset, ZephyrContext, load_jsonl

# Read, transform, write
ctx = ZephyrContext(max_workers=100)
pipeline = (
    Dataset.from_files("gs://input/", "**/*.jsonl.gz")
    .flat_map(load_jsonl)
    .filter(lambda x: x["score"] > 0.5)
    .map(lambda x: transform_record(x))
    .write_jsonl("gs://output/data-{shard:05d}-of-{total:05d}.jsonl.gz")
)
ctx.execute(pipeline)

Key Patterns

Dataset Creation:

  • Dataset.from_files(path, pattern) - glob files
  • Dataset.from_list(items) - explicit list

Loading Files

  • .load_{file,parquet,jsonl,vortex} - load rows from a file

Transformations:

  • .map(fn) - transform each item
  • .flat_map(fn) - expand items (e.g., load_jsonl)
  • .filter(fn) - filter items by function or expression
  • .select(columna, columnb) - select out the given columns
  • .window(n) - group into batches
  • .reshard(n) - redistribute across n shards

Output:

  • .write_jsonl(pattern) - write JSONL (gzip if .gz)
  • .write_parquet(pattern, schema) - write to a Parquet file
  • .write_vortex(pattern) - write to a Vortex file

Execution (ZephyrContext):

  • ZephyrContext(max_workers=N) — auto-detects the backend (Iris inside an Iris job, local otherwise) via fray.current_client()
  • ZephyrContext(client=LocalClient()) — explicit local backend (testing)
  • ctx.execute(pipeline) — runs the pipeline; returns a ZephyrExecutionResult(results, counters)

Real Usage

Wikipedia Processing:

from zephyr import Dataset, ZephyrContext, load_jsonl

ctx = ZephyrContext(max_workers=100)
pipeline = (
    Dataset.from_list(files)
    .load_jsonl()
    .map(lambda row: process_record(row, config))
    .filter(lambda x: x is not None)
    .write_jsonl(f"{output}/data-{{shard:05d}}-of-{{total:05d}}.jsonl.gz")
)
ctx.execute(pipeline)

Dataset Sampling:

from zephyr import Dataset, ZephyrContext

ctx = ZephyrContext(max_workers=1000)
pipeline = (
    Dataset.from_files(input_path, "**/*.jsonl.gz")
    .map(lambda path: sample_file(path, weights))
    .write_jsonl(f"{output}/sampled-{{shard:05d}}.jsonl.gz")
)
ctx.execute(pipeline)

Parallel Downloads:

from zephyr import Dataset, ZephyrContext

tasks = [(config, fs, src, dst) for src, dst in file_pairs]
ctx = ZephyrContext(max_workers=32)
pipeline = Dataset.from_list(tasks).map(lambda t: download(*t))
ctx.execute(pipeline)

Installation

# From Marin monorepo
uv sync

# Standalone
cd lib/zephyr
uv pip install -e .

Running Tests

Zephyr tests run against multiple execution backends to ensure correctness across different environments.

All Tests on Both Backends (Default)

uv run pytest lib/zephyr/tests
# Runs all tests on both Local and Iris backends
# Local Iris cluster is started automatically via ClusterManager

Run Specific Backend Only

uv run pytest lib/zephyr/tests -k "local"
uv run pytest lib/zephyr/tests -k "iris"

The Iris cluster is started once per test session and reused across all tests for efficiency.

Design

Zephyr consolidates ad-hoc distributed and Hugging Face dataset processing patterns in Marin into a simple abstraction.

Key Features:

  • Lazy evaluation with operation fusion
  • Disk-based inter-stage data flow for low memory footprint
  • Chunk-by-chunk streaming to minimize memory pressure
  • Distributed execution with bounded parallelism (Iris/local backends)
  • Automatic chunking to prevent large object overhead
  • fsspec integration (GCS, S3, local)
  • Type-safe operation chaining

See AGENTS.md for execution internals and source layout.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

marin_zephyr-0.2.11.dev202606081009.tar.gz (76.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

marin_zephyr-0.2.11.dev202606081009-py3-none-any.whl (82.7 kB view details)

Uploaded Python 3

File details

Details for the file marin_zephyr-0.2.11.dev202606081009.tar.gz.

File metadata

File hashes

Hashes for marin_zephyr-0.2.11.dev202606081009.tar.gz
Algorithm Hash digest
SHA256 42b4c762653223a453442c461f22616e0b0ba2f31e9fff678ededa82e1875c99
MD5 4858e8439b268954288328b860e4e7b5
BLAKE2b-256 c130cbec6463014b1414f47689456247afb1bc3f43ebe0c19cda4c50eab526a7

See more details on using hashes here.

Provenance

The following attestation bundles were made for marin_zephyr-0.2.11.dev202606081009.tar.gz:

Publisher: marin-release-libs-wheels.yaml on marin-community/marin

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file marin_zephyr-0.2.11.dev202606081009-py3-none-any.whl.

File metadata

File hashes

Hashes for marin_zephyr-0.2.11.dev202606081009-py3-none-any.whl
Algorithm Hash digest
SHA256 686b43ce3a3a576f63a2500239f85643bf1787d90f5996ed3c5f9ee4bffb30d8
MD5 d3eb6946d7905d37d5db99ad4b675da5
BLAKE2b-256 bae0b095a35a5bfa85eb6afd2fc1f611cdcdceb8a9e3a144cae372c282b027b7

See more details on using hashes here.

Provenance

The following attestation bundles were made for marin_zephyr-0.2.11.dev202606081009-py3-none-any.whl:

Publisher: marin-release-libs-wheels.yaml on marin-community/marin

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page