A high-performance file format for ML data storage with parallel I/O and random access
Project description
ArrayRecord Python - High-Performance Data Storage for ML
ArrayRecord is a high-performance file format designed for machine learning workloads, offering parallel I/O, random access by record index, and efficient compression. Built on top of Riegeli, ArrayRecord provides a new frontier of IO efficiency for training and evaluating ML models.
โจ Key Features
- ๐ High Performance: Optimized for both sequential and random access patterns
- โก Parallel I/O: Built-in support for concurrent read and write operations
- ๐ฏ Random Access: Efficient access to any record by index without full file scanning
- ๐๏ธ Advanced Compression: Multiple algorithms (Brotli, Zstd, Snappy) with configurable levels
- ๐ Apache Beam Integration: Seamless integration for large-scale data processing
- ๐ Cross-Platform: Available for Linux (x86_64, aarch64) and macOS (aarch64)
- ๐ง Framework Integration: Works with TensorFlow, JAX, PyTorch, and more
๐ Quick Start
Installation
# Basic installation
pip install array-record-python
# With Apache Beam support for large-scale processing
pip install array-record-python[beam]
# With all optional dependencies
pip install array-record-python[beam,test]
Note: This is a source distribution that includes Python bindings and Apache Beam integration. The C++ components need to be built separately using Bazel. For pre-compiled binaries, consider using the original
array_recordpackage or building from source following the development guide.
Basic Usage
ArrayRecord provides a simple and intuitive API for high-performance data storage:
from array_record.python import array_record_module, array_record_data_source
# Writing data
writer = array_record_module.ArrayRecordWriter('dataset.array_record')
for i in range(1000):
data = f"Record {i}".encode('utf-8')
writer.write(data)
writer.close()
# Reading data with random access
with array_record_data_source.ArrayRecordDataSource('dataset.array_record') as ds:
print(f"Dataset contains {len(ds)} records")
# Random access
record_100 = ds[100]
# Batch access
batch = ds[[10, 50, 100, 500]]
# Sequential access
for i in range(len(ds)):
record = ds[i]
Performance Optimization
Configure ArrayRecord for your specific use case:
# High compression for storage efficiency
writer = array_record_module.ArrayRecordWriter(
'compressed.array_record',
'group_size:10000,brotli:9,max_parallelism:4'
)
# Optimized for random access
reader_options = {
'readahead_buffer_size': '0',
'max_parallelism': '0'
}
ds = array_record_data_source.ArrayRecordDataSource(
'dataset.array_record',
reader_options=reader_options
)
# High throughput sequential processing
reader_options = {
'readahead_buffer_size': '64MB',
'max_parallelism': '8'
}
๐ Apache Beam Integration
Process large datasets efficiently with Apache Beam:
import apache_beam as beam
from array_record.beam.pipelines import convert_tf_to_arrayrecord_gcs
# Convert TFRecords to ArrayRecord on Google Cloud
pipeline = convert_tf_to_arrayrecord_gcs(
args=[
'--input', 'gs://source-bucket/tfrecords/*',
'--output', 'gs://dest-bucket/arrayrecords/'
],
pipeline_options=beam.options.pipeline_options.PipelineOptions([
'--runner=DataflowRunner',
'--project=my-project',
'--region=us-central1'
])
)
pipeline.run().wait_until_finish()
๐ง Framework Integration
TensorFlow
import tensorflow as tf
from array_record.python import array_record_data_source
def create_tf_dataset(filename):
def generator():
with array_record_data_source.ArrayRecordDataSource(filename) as ds:
for i in range(len(ds)):
record = ds[i]
# Process record as needed
yield record
return tf.data.Dataset.from_generator(
generator,
output_signature=tf.TensorSpec(shape=(), dtype=tf.string)
)
dataset = create_tf_dataset('data.array_record')
dataset = dataset.batch(32).prefetch(tf.data.AUTOTUNE)
JAX/Grain
ArrayRecord works seamlessly with Grain for JAX workflows:
import grain
from array_record.python import array_record_data_source
# Use ArrayRecord as a Grain data source
data_source = array_record_data_source.ArrayRecordDataSource('data.array_record')
dataset = (
grain.MapDataset.source(data_source)
.shuffle(seed=42)
.map(lambda x: process_record(x))
.batch(batch_size=32)
)
for batch in dataset:
# Training step
pass
PyTorch
import torch
from torch.utils.data import Dataset, DataLoader
from array_record.python import array_record_data_source
class ArrayRecordDataset(Dataset):
def __init__(self, filename):
self.data_source = array_record_data_source.ArrayRecordDataSource(filename)
def __len__(self):
return len(self.data_source)
def __getitem__(self, idx):
record = self.data_source[idx]
# Process record as needed
return record
dataset = ArrayRecordDataset('data.array_record')
dataloader = DataLoader(dataset, batch_size=32, shuffle=True, num_workers=4)
๐๏ธ File Format and Architecture
ArrayRecord files are organized for optimal performance:
โโโโโโโโโโโโโโโโโโโโโโโ
โ User Data โ โ Your records, compressed in chunks
โ Riegeli Chunk 1 โ
โโโโโโโโโโโโโโโโโโโโโโโค
โ User Data โ
โ Riegeli Chunk 2 โ
โโโโโโโโโโโโโโโโโโโโโโโค
โ ... โ
โโโโโโโโโโโโโโโโโโโโโโโค
โ Footer Chunk โ โ Index for random access
โ (Index Data) โ
โโโโโโโโโโโโโโโโโโโโโโโค
โ Postscript โ โ File metadata (64KB)
โ (File Metadata) โ
โโโโโโโโโโโโโโโโโโโโโโโ
Key Concepts
- Records: Basic units of data (any binary data, protocol buffers, etc.)
- Chunks: Groups of records compressed together (configurable group size)
- Index: Enables O(1) random access to any record
- Compression: Multiple algorithms with different speed/size trade-offs
โก Performance Guide
Access Patterns
| Pattern | Configuration | Use Case |
|---|---|---|
| Sequential | readahead_buffer_size: 16MBmax_parallelism: auto |
Training loops, data validation |
| Random | readahead_buffer_size: 0max_parallelism: 0 |
Inference, sampling, debugging |
| Batch | readahead_buffer_size: 4MBmax_parallelism: 2 |
Mini-batch processing |
Compression Comparison
| Algorithm | Ratio | Speed | Use Case |
|---|---|---|---|
| Uncompressed | 1.0x | โกโกโกโก | Maximum speed |
| Snappy | 2-4x | โกโกโก | High throughput |
| Brotli (default) | 4-6x | โกโก | Balanced |
| Zstd | 4-6x | โกโกโก | Fast alternative |
| Brotli (max) | 5-7x | โก | Maximum compression |
Benchmarking
import time
from array_record.python import array_record_data_source
def benchmark_access_pattern(filename, access_pattern='sequential'):
start_time = time.time()
if access_pattern == 'sequential':
reader_options = {'readahead_buffer_size': '16MB'}
else: # random
reader_options = {'readahead_buffer_size': '0', 'max_parallelism': '0'}
with array_record_data_source.ArrayRecordDataSource(
filename, reader_options=reader_options
) as ds:
if access_pattern == 'sequential':
for i in range(len(ds)):
_ = ds[i]
else: # random
import random
indices = random.sample(range(len(ds)), 1000)
for idx in indices:
_ = ds[idx]
elapsed = time.time() - start_time
print(f"{access_pattern.title()} access: {elapsed:.2f}s")
# Benchmark your data
benchmark_access_pattern('your_data.array_record', 'sequential')
benchmark_access_pattern('your_data.array_record', 'random')
๐ Platform Support
| Platform | x86_64 | aarch64 |
|---|---|---|
| Linux | โ | โ |
| macOS | โ | โ |
| Windows | โ | โ |
๐ Examples and Use Cases
Machine Learning Datasets
import json
from array_record.python import array_record_module
# Store structured ML data
def create_ml_dataset(output_file, samples):
writer = array_record_module.ArrayRecordWriter(
output_file,
'group_size:1000,brotli:6' # Balanced compression
)
for sample in samples:
# Store as JSON for flexibility
record = {
'features': sample['features'],
'label': sample['label'],
'metadata': sample.get('metadata', {})
}
writer.write(json.dumps(record).encode('utf-8'))
writer.close()
# Usage
samples = [
{'features': [1.0, 2.0, 3.0], 'label': 0, 'metadata': {'id': 'sample_1'}},
{'features': [4.0, 5.0, 6.0], 'label': 1, 'metadata': {'id': 'sample_2'}},
# ... more samples
]
create_ml_dataset('ml_dataset.array_record', samples)
Large-Scale Data Processing
import apache_beam as beam
from array_record.beam.arrayrecordio import WriteToArrayRecord
# Process and store large datasets
with beam.Pipeline() as pipeline:
# Read from various sources
data = (
pipeline
| 'ReadCSV' >> beam.io.ReadFromText('gs://bucket/data/*.csv')
| 'ParseCSV' >> beam.Map(parse_csv_line)
| 'ProcessData' >> beam.Map(process_record)
| 'SerializeRecords' >> beam.Map(lambda x: json.dumps(x).encode('utf-8'))
)
# Write to ArrayRecord
data | WriteToArrayRecord(
file_path_prefix='gs://output-bucket/processed/data',
file_name_suffix='.array_record',
num_shards=100
)
Multi-Modal Data Storage
import base64
from array_record.python import array_record_module
def store_multimodal_data(output_file, samples):
writer = array_record_module.ArrayRecordWriter(output_file)
for sample in samples:
record = {
'text': sample['text'],
'image_data': base64.b64encode(sample['image_bytes']).decode(),
'audio_features': sample['audio_features'],
'labels': sample['labels']
}
writer.write(json.dumps(record).encode('utf-8'))
writer.close()
๐ Documentation
- Full Documentation - Complete API reference and guides
- Quick Start Guide - Get started quickly
- Performance Guide - Optimization strategies
- Apache Beam Integration - Large-scale processing
- Examples - Real-world use cases
๐ ๏ธ Development
Building from Source
# Clone the repository
git clone https://github.com/bzantium/array_record.git
cd array_record
# Install development dependencies
pip install -e .[test,beam]
# Build C++ components (requires Bazel)
bazel build //...
# Run tests
bazel test //...
pytest python/ -v
Contributing
We welcome contributions! Please see our Contributing Guide for details.
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Make your changes
- Add tests for new functionality
- Run the test suite
- Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
๐ Issues and Support
- Bug Reports - Report bugs or request features
- Discussions - Ask questions and share ideas
- Documentation Issues - Help improve our docs
๐ License
This project is licensed under the Apache License 2.0 - see the LICENSE file for details.
๐ Acknowledgments
- Built on top of Riegeli compression library
- Inspired by Grain for JAX data processing
- Originally developed by the Google ArrayRecord team
- Enhanced and maintained by the community
๐ Related Projects
- Grain - JAX-focused data processing library
- TensorFlow - Machine learning framework with tf.data integration
- Apache Beam - Unified programming model for batch and streaming data
- Riegeli - Fast and efficient file format
ArrayRecord Python - Making high-performance data storage accessible for machine learning workflows.
Star โญ this repository if you find it useful!
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
File details
Details for the file array-record-python-0.8.1.tar.gz.
File metadata
- Download URL: array-record-python-0.8.1.tar.gz
- Upload date:
- Size: 64.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.9.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
51bab5294d1ae5b1f08836330ba2b26f5293486e0d310e78c3f0b909555f92e4
|
|
| MD5 |
a2b68206fa63f23abcb7bc4385cae256
|
|
| BLAKE2b-256 |
15eb3c0fa14ec952ccc1539328ebfb041f9e7f58a767bc64a06a759f96a02910
|