Skip to main content

Lightweight dataset library for distributed data processing

Project description

Zephyr

Simple data processing library for Marin pipelines. Build lazy dataset pipelines that run on Iris jobs or a local backend.

Quick Start

from zephyr import Dataset, ZephyrContext, load_jsonl

# Read, transform, write
ctx = ZephyrContext(max_workers=100)
pipeline = (
    Dataset.from_files("gs://input/", "**/*.jsonl.gz")
    .flat_map(load_jsonl)
    .filter(lambda x: x["score"] > 0.5)
    .map(lambda x: transform_record(x))
    .write_jsonl("gs://output/data-{shard:05d}-of-{total:05d}.jsonl.gz")
)
ctx.execute(pipeline)

Key Patterns

Dataset Creation:

  • Dataset.from_files(path, pattern) - glob files
  • Dataset.from_list(items) - explicit list

Loading Files

  • .load_{file,parquet,jsonl,vortex} - load rows from a file

Transformations:

  • .map(fn) - transform each item
  • .flat_map(fn) - expand items (e.g., load_jsonl)
  • .filter(fn) - filter items by function or expression
  • .select(columna, columnb) - select out the given columns
  • .window(n) - group into batches
  • .reshard(n) - redistribute across n shards

Output:

  • .write_jsonl(pattern) - write JSONL (gzip if .gz)
  • .write_parquet(pattern, schema) - write to a Parquet file
  • .write_vortex(pattern) - write to a Vortex file

Execution (ZephyrContext):

  • ZephyrContext(max_workers=N) — auto-detects the backend (Iris inside an Iris job, local otherwise) via fray.current_client()
  • ZephyrContext(client=LocalClient()) — explicit local backend (testing)
  • ctx.execute(pipeline) — runs the pipeline; returns a ZephyrExecutionResult(results, counters)

Real Usage

Wikipedia Processing:

from zephyr import Dataset, ZephyrContext, load_jsonl

ctx = ZephyrContext(max_workers=100)
pipeline = (
    Dataset.from_list(files)
    .load_jsonl()
    .map(lambda row: process_record(row, config))
    .filter(lambda x: x is not None)
    .write_jsonl(f"{output}/data-{{shard:05d}}-of-{{total:05d}}.jsonl.gz")
)
ctx.execute(pipeline)

Dataset Sampling:

from zephyr import Dataset, ZephyrContext

ctx = ZephyrContext(max_workers=1000)
pipeline = (
    Dataset.from_files(input_path, "**/*.jsonl.gz")
    .map(lambda path: sample_file(path, weights))
    .write_jsonl(f"{output}/sampled-{{shard:05d}}.jsonl.gz")
)
ctx.execute(pipeline)

Parallel Downloads:

from zephyr import Dataset, ZephyrContext

tasks = [(config, fs, src, dst) for src, dst in file_pairs]
ctx = ZephyrContext(max_workers=32)
pipeline = Dataset.from_list(tasks).map(lambda t: download(*t))
ctx.execute(pipeline)

Installation

# From Marin monorepo
uv sync

# Standalone
cd lib/zephyr
uv pip install -e .

Running Tests

Zephyr tests run against multiple execution backends to ensure correctness across different environments.

All Tests on Both Backends (Default)

uv run pytest lib/zephyr/tests
# Runs all tests on both Local and Iris backends
# Local Iris cluster is started automatically via ClusterManager

Run Specific Backend Only

uv run pytest lib/zephyr/tests -k "local"
uv run pytest lib/zephyr/tests -k "iris"

The Iris cluster is started once per test session and reused across all tests for efficiency.

Design

Zephyr consolidates ad-hoc distributed and Hugging Face dataset processing patterns in Marin into a simple abstraction.

Key Features:

  • Lazy evaluation with operation fusion
  • Disk-based inter-stage data flow for low memory footprint
  • Chunk-by-chunk streaming to minimize memory pressure
  • Distributed execution with bounded parallelism (Iris/local backends)
  • Automatic chunking to prevent large object overhead
  • fsspec integration (GCS, S3, local)
  • Type-safe operation chaining

See AGENTS.md for execution internals and source layout.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

marin_zephyr-0.2.9.dev202606060818.tar.gz (75.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

marin_zephyr-0.2.9.dev202606060818-py3-none-any.whl (81.5 kB view details)

Uploaded Python 3

File details

Details for the file marin_zephyr-0.2.9.dev202606060818.tar.gz.

File metadata

File hashes

Hashes for marin_zephyr-0.2.9.dev202606060818.tar.gz
Algorithm Hash digest
SHA256 ea6e341358ca18c468550cb922b83b3836a23d6af10d40ed04035c4ff1595e66
MD5 dd0672f15d416a6614d219baac15f56b
BLAKE2b-256 bccd16b889d1c2acefef7a83d1969faf772b47e60ddb0fe52f12529f10e87fbb

See more details on using hashes here.

Provenance

The following attestation bundles were made for marin_zephyr-0.2.9.dev202606060818.tar.gz:

Publisher: marin-release-libs-wheels.yaml on marin-community/marin

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file marin_zephyr-0.2.9.dev202606060818-py3-none-any.whl.

File metadata

File hashes

Hashes for marin_zephyr-0.2.9.dev202606060818-py3-none-any.whl
Algorithm Hash digest
SHA256 13c0d22bbb33877286a440cd042dcf9e28d48bd18fc6b30dfce57cd1e9508aa9
MD5 c826dcc31f0c8e7ef317ce9585dde58a
BLAKE2b-256 4131bdd6f70722b571ea436f025f5b717a63b6f55cf5df2b2b9ca91dc24cd388

See more details on using hashes here.

Provenance

The following attestation bundles were made for marin_zephyr-0.2.9.dev202606060818-py3-none-any.whl:

Publisher: marin-release-libs-wheels.yaml on marin-community/marin

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page