Skip to main content

Streaming lets users create PyTorch compatible datasets that can be streamed from cloud-based object stores

Project description


Fast, accurate streaming of training data from cloud storage

[Website] - [Getting Started] - [Docs] - [We're Hiring!]

PyPi Version PyPi Package Version Unit test PyPi Downloads Documentation Chat @ Slack License


👋 Welcome

We built StreamingDataset to make training on large datasets from cloud storage as fast, cheap, and scalable as possible.

It’s specially designed for multi-node, distributed training for large models—maximizing correctness guarantees, performance, and ease of use. Now, you can efficiently train anywhere, independent of your training data location. Just stream in the data you need, when you need it. To learn more about why we built StreamingDataset, read our announcement blog.

StreamingDataset is compatible with any data type, including images, text, video, and multimodal data.

With support for major cloud storage providers (AWS, OCI, GCS, Azure, Databricks, and any S3 compatible object store such as Cloudflare R2, Coreweave, Backblaze b2, etc. ) and designed as a drop-in replacement for your PyTorch IterableDataset class, StreamingDataset seamlessly integrates into your existing training workflows.

The flow of samples from shards in the cloud to devices in your cluster

🚀 Getting Started

💾 Installation

Streaming can be installed with pip:

pip install mosaicml-streaming

🏁 Quick Start

1. Prepare Your Data

Convert your raw dataset into one of our supported streaming formats:

  • MDS (Mosaic Data Shard) format which can encode and decode any Python object
  • CSV / TSV
  • JSONL
import numpy as np
from PIL import Image
from streaming import MDSWriter

# Local or remote directory in which to store the compressed output files
data_dir = 'path-to-dataset'

# A dictionary mapping input fields to their data types
columns = {
    'image': 'jpeg',
    'class': 'int'
}

# Shard compression, if any
compression = 'zstd'

# Save the samples as shards using MDSWriter
with MDSWriter(out=data_dir, columns=columns, compression=compression) as out:
    for i in range(10000):
        sample = {
            'image': Image.fromarray(np.random.randint(0, 256, (32, 32, 3), np.uint8)),
            'class': np.random.randint(10),
        }
        out.write(sample)

2. Upload Your Data to Cloud Storage

Upload your streaming dataset to the cloud storage of your choice (AWS, OCI, or GCP). Below is one example of uploading a directory to an S3 bucket using the AWS CLI.

$ aws s3 cp --recursive path-to-dataset s3://my-bucket/path-to-dataset

3. Build a StreamingDataset and DataLoader

from torch.utils.data import DataLoader
from streaming import StreamingDataset

# Remote path where full dataset is persistently stored
remote = 's3://my-bucket/path-to-dataset'

# Local working dir where dataset is cached during operation
local = '/tmp/path-to-dataset'

# Create streaming dataset
dataset = StreamingDataset(local=local, remote=remote, shuffle=True)

# Let's see what is in sample #1337...
sample = dataset[1337]
img = sample['image']
cls = sample['class']

# Create PyTorch DataLoader
dataloader = DataLoader(dataset)

📚 What next?

Getting started guides, examples, API references, and other useful information can be found in our docs.

We have end-to-end tutorials for training a model on:

We also have starter code for the following popular datasets, which can be found in the streaming directory:

Dataset Task Read Write
LAION-400M Text and image Read Write
WebVid Text and video Read Write
C4 Text Read Write
EnWiki Text Read Write
Pile Text Read Write
ADE20K Image segmentation Read Write
CIFAR10 Image classification Read Write
COCO Image classification Read Write
ImageNet Image classification Read Write

To start training on these datasets:

  1. Convert raw data into .mds format using the corresponding script from the convert directory.

For example:

$ python -m streaming.multimodal.convert.webvid --in <CSV file> --out <MDS output directory>
  1. Import dataset class to start training the model.
from streaming.multimodal import StreamingInsideWebVid
dataset = StreamingInsideWebVid(local=local, remote=remote, shuffle=True)

🔑 Key Features


Seamless data mixing

Easily experiment with dataset mixtures with Stream. Dataset sampling can be controlled in relative (proportion) or absolute (repeat or samples terms). During streaming, the different datasets are streamed, shuffled, and mixed seamlessly just-in-time.

# mix C4, github code, and internal datasets
streams = [
  Stream(remote='s3://datasets/c4', proportion=0.4),
  Stream(remote='s3://datasets/github', proportion=0.1),
  Stream(remote='gcs://datasets/my_internal', proportion=0.5),
]

dataset = StreamingDataset(
  streams=streams,
  samples_per_epoch=1e8,
)

True Determinism

A unique feature of our solution: samples are in the same order regardless of the number of GPUs, nodes, or CPU workers. This makes it easier to:

  • Reproduce and debug training runs and loss spikes
  • Load a checkpoint trained on 64 GPUs and debug on 8 GPUs with reproducibility

See the figure below — training a model on 1, 8, 16, 32, or 64 GPUs yields the exact same loss curve (up to the limitations of floating point math!)

Plot of elastic determinism

Instant mid-epoch resumption

It can be expensive — and annoying — to wait for your job to resume while your dataloader spins after a hardware failure or loss spike. Thanks to our deterministic sample ordering, StreamingDataset lets you resume training in seconds, not hours, in the middle of a long training run.

Minimizing resumption latency can save thousands of dollars in egress fees and idle GPU compute time compared to existing solutions.

High throughput

Our MDS format cuts extraneous work to the bone, resulting in ultra-low sample latency and higher throughput compared to alternatives for workloads bottlenecked by the dataloader.

Tool Throughput
StreamingDataset ~19000 img/sec
ImageFolder ~18000 img/sec
WebDataset ~16000 img/sec

Results shown are from ImageNet + ResNet-50 training, collected over 5 repetitions after the data is cached after the first epoch.

Equal convergence

Model convergence from using StreamingDataset is just as good as using local disk, thanks to our shuffling algorithm.

Plot of equal convergence

Below are results from ImageNet + ResNet-50 training, collected over 5 repetitions.

Tool Top-1 Accuracy
StreamingDataset 76.51% +/- 0.09
ImageFolder 76.57% +/- 0.10
WebDataset 76.23% +/- 0.17

StreamingDataset shuffles across all samples assigned to a node, whereas alternative solutions only shuffle samples in a smaller pool (within a single process). Shuffling across a wider pool spreads out adjacent samples more. In addition, our shuffling algorithm minimizes dropped samples. We have found both of these shuffling features advantageous for model convergence.

Random access

Access the data you need when you need it.

Even if a sample isn’t downloaded yet, you can access dataset[i] to get sample i. The download will kick off immediately and the result will be returned when it’s done - similar to a map-style PyTorch dataset with samples numbered sequentially and accessible in any order.

dataset = StreamingDataset(...)
sample = dataset[19543]

No divisibility requirements

StreamingDataset will happily iterate over any number of samples. You do not have to forever delete samples so that the dataset is divisible over a baked-in number of devices. Instead, each epoch a different selection of samples are repeated (none dropped) so that each device processes the same count.

dataset = StreamingDataset(...)
dl = DataLoader(dataset, num_workers=...)

Disk usage limits

Dynamically delete least recently used shards in order to keep disk usage under a specified limit. This is enabled by setting the StreamingDataset argument cache_limit. See the shuffling guide for more details.

dataset = StreamingDataset(
    cache_limit='100gb',
    ...
)

🏆 Project Showcase

Here are some projects and experiments that used StreamingDataset. Got something to add? Email community@mosaicml.com or join our Community Slack.

  • BioMedLM: a Domain Specific Large Language Model for BioMedicine by MosaicML and Stanford CRFM
  • Mosaic Diffusion Models: Training Stable Diffusion from Scratch Costs <$160k
  • Mosaic LLMs: GPT-3 quality for <$500k
  • Mosaic ResNet: Blazingly Fast Computer Vision Training with the Mosaic ResNet and Composer
  • Mosaic DeepLabv3: 5x Faster Image Segmentation Training with MosaicML Recipes
  • …more to come! Stay tuned!

💫 Contributors

We welcome any contributions, pull requests, or issues.

To start contributing, see our Contributing page.

P.S.: We're hiring!

If you like this project, give us a star and check out our other projects:

  • Composer - a modern PyTorch library that makes scalable, efficient neural network training easy
  • MosaicML Examples - reference examples for training ML models quickly and to high accuracy - featuring starter code for GPT / Large Language Models, Stable Diffusion, BERT, ResNet-50, and DeepLabV3
  • MosaicML Cloud - our training platform built to minimize training costs for LLMs, Diffusion Models, and other large models - featuring multi-cloud orchestration, effortless multi-node scaling, and under-the-hood optimizations for speeding up training time

✍️ Citation

@misc{mosaicml2022streaming,
    author = {The Mosaic ML Team},
    title = {streaming},
    year = {2022},
    howpublished = {\url{<https://github.com/mosaicml/streaming/>}},
}

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mosaicml-streaming-0.7.5.tar.gz (216.4 kB view details)

Uploaded Source

Built Distribution

mosaicml_streaming-0.7.5-py3-none-any.whl (253.7 kB view details)

Uploaded Python 3

File details

Details for the file mosaicml-streaming-0.7.5.tar.gz.

File metadata

  • Download URL: mosaicml-streaming-0.7.5.tar.gz
  • Upload date:
  • Size: 216.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.0.0 CPython/3.12.2

File hashes

Hashes for mosaicml-streaming-0.7.5.tar.gz
Algorithm Hash digest
SHA256 0acf31002e1709d8ce79b0889a9dcac03bea73d49802fe37cb349c7423aae95f
MD5 4332f3cbc76d0d73ca5541c6e9791d89
BLAKE2b-256 08013b27e240a799b0415f2444c66e35bbf89c012db420a7d0923742a4827d08

See more details on using hashes here.

File details

Details for the file mosaicml_streaming-0.7.5-py3-none-any.whl.

File metadata

File hashes

Hashes for mosaicml_streaming-0.7.5-py3-none-any.whl
Algorithm Hash digest
SHA256 e85cb8cad286ea7ebc30b2dc9e868c0ba987d3c4c1245971097958a9762a79fc
MD5 091c27a6d2bce68c9073381e3da0614b
BLAKE2b-256 c59da92ec4edd0d585b384c071477361671d092d7b43c45fa2ce538f16b5b4a8

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page