Write streaming data to Parquet files with automatic sharding.
Project description
parquet-stream-writer
parquet-stream-writer enables streaming data to be written to Parquet files with automatic sharding (splitting data across multiple files). When a file reaches a user-defined size limit, the writer automatically creates a new file. This prevents the accumulation of unwieldy, monolithic Parquet files during stream processing.
Installation
You can install parquet-stream-writer from PyPI using pip or from conda-forge with Pixi.
Using pip
pip install parquet-stream-writer
Using pixi
pixi init my_workspace && cd my_workspace
pixi add parquet-stream-writer
Usage
The library's core class is ParquetStreamWriter, which works as a context manager and lets you write data incrementally using its write_batch method.
import pyarrow as pa
from parquet_stream_writer import ParquetStreamWriter
# Define your schema
schema = pa.schema(
[("col_a", pa.int64()), ("col_b", pa.string()), ("col_c", pa.bool_())]
)
# Simulate a data stream
def data_stream():
for i in range(1_000):
yield {"col_a": [i, i + 1], "col_b": ["foo", "bar"], "col_c": [True, False]}
# Initialize an instance of `ParquetStreamWriter` and write data to `output_data.parquet`
with ParquetStreamWriter("output_data.parquet", schema, overwrite=True) as writer:
for batch in data_stream():
writer.write_batch(batch)
Writing with automatic sharding
By default, ParquetStreamWriter writes to a single Parquet file. However, you can enable automatic sharding to split the output into multiple files based on a specified size threshold. To do that, use the shard_size_bytes to set the approximate maximum uncompressed size for each file. In this mode, path acts as the base directory where shards will be written.
When sharding is enabled, the prefix of the generated files defaults to the name of the output directory. For example, if path="my_dataset", the files will be named my_dataset-0.parquet, my_dataset-1.parquet, etc. You can override this using the file_prefix parameter.
with ParquetStreamWriter(
"my_dataset", # Base directory path
schema,
shard_size_bytes=50 * 1024 * 1024, # Shards will be approx. 50 MiB
file_prefix="prefix", # Custom prefix
) as writer:
for batch in data_stream():
writer.write_batch(batch)
Configuring buffer size
By default, ParquetStreamWriter uses an in-memory buffer of 16 MiB to accumulate data before writing it to disk. You can adjust this size using the buffer_size_bytes parameter. A larger buffer can improve write performance by reducing the number of write operations, but it also increases memory usage. Smaller buffers will lead to more frequent writes and larger files, as encoding overhead is incurred with each write.
with ParquetStreamWriter(
"my_dataset", # Base directory path
schema,
buffer_size_bytes=200 * 1024 * 1024, # The in-memory buffer will be approx. 200 MiB
) as writer:
for batch in data_stream():
writer.write_batch(batch)
Configuring row group size
The row_group_size parameter controls how rows are grouped together within the file. By default, it is set to None, which means the group size will be either the total number of rows or 1,048,576, whichever is smaller. Setting a specific value, like 10,000, can make searching and filtering faster because it allows the reader to skip over groups of rows that don't match what you're looking for.
with ParquetStreamWriter(
"output_data.parquet",
schema,
overwrite=True,
row_group_size=10_000
) as writer:
for batch in data_stream():
writer.write_batch(batch)
Passing additional parameters to ParquetWriter
ParquetStreamWriter uses PyArrow's ParquetWriter class under the hood. You can further customize the Parquet writing behavior by passing any additional parameters supported by ParquetWriter via **kwargs.
with ParquetStreamWriter(
"output_data.parquet",
schema,
overwrite=True
compression="zstd" # Use ZSTD for compression
use_content_defined_chunking=True, # Write data pages according to content-defined chunk boundaries
) as writer:
for batch in data_stream():
writer.write_batch(batch)
Accessing created files
After the writer closes, you can inspect which files it created via the written_files attribute.
# The 'writer' object stores a list of the files it created
print("Data was written to the following files:")
for file_path in writer.written_files:
print(f"{file_path}: {file_path.stat().st_size} bytes")
ParquetStreamWriter API reference
A writer for writing streaming data to Parquet files with automatic file rollover.
This class manages writing large or infinite datasets to multiple Parquet files
(shards), automatically creating new files when a size threshold is reached.
Parameters
----------
path : str or Path
Path where Parquet files will be written. If shard_size_bytes is None,
this is the path to the single output file. If shard_size_bytes is set,
this is the base directory where shards will be created.
schema : pa.Schema
PyArrow schema defining the structure of the data to be written.
shard_size_bytes : int or None, default None
Approximate maximum uncompressed memory size in bytes for each shard
before starting to write to a new file. If None (default), sharding is
disabled and a single file is written to path. If set to an integer,
path is treated as a base directory and shards are created inside it.
row_group_size : int or None, default None
Maximum number of rows in written row group.
buffer_size_bytes : int, default 16_777_216
Maximum size in bytes of the in-memory buffer before flushing to disk.
Must be <= shard_size_bytes.
file_prefix : str or None, default None
Prefix to use for generated filenames (only used when sharding is
enabled). If None (default), the value of `path` will be used as the
prefix and files will be named '{file_prefix}-{index}.parquet'.
overwrite : bool, default False
If True, deletes existing output file or directory before writing.
If False, raises FileExistsError when the output exists.
Default is False.
**kwargs : dict, optional
Additional keyword arguments passed to pyarrow.parquet.ParquetWriter.
Attributes
----------
path : Path
The output path.
schema : pa.Schema
The PyArrow schema for the data.
shard_size_bytes : int or None
Maximum uncompressed size threshold for each file.
row_group_size : int or None
Maximum number of rows in written row group.
buffer_size_bytes : int or None
Maximum size of in-memory buffer before flushing.
file_prefix : str
Prefix used for naming files if sharding is enabled.
writer : pq.ParquetWriter or None
Current active Parquet writer instance.
written_files : list[Path]
List of absolute paths to all successfully created Parquet files.
Methods
-------
write_batch
Write a data batch to the output.
flush
Flush buffered data to the current shard.
Raises
------
FileExistsError
If the output path already exists and overwrite is False.
FileNotFoundError
If the parent directory of the output path does not exist.
ValueError
If shard_size_bytes or buffer_size_bytes is negative.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file parquet_stream_writer-0.2.0.tar.gz.
File metadata
- Download URL: parquet_stream_writer-0.2.0.tar.gz
- Upload date:
- Size: 6.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2b74fc464c41722498e5ce8481f8cd6bddd1607db7d39b87760a6e50740760cc
|
|
| MD5 |
d7a5f34ab727242d49e50a310fa7c7bc
|
|
| BLAKE2b-256 |
1dc0e830aa314ba46c4813f4b8a287a530d59512813473ab4425a55a04ffd9b8
|
File details
Details for the file parquet_stream_writer-0.2.0-py3-none-any.whl.
File metadata
- Download URL: parquet_stream_writer-0.2.0-py3-none-any.whl
- Upload date:
- Size: 8.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
044e0132827de9e66545c4d80876aba97f553ca1dcdaa9ac36aa49eef34c36ba
|
|
| MD5 |
aefaef99e92102981a93b0ee9c877c5a
|
|
| BLAKE2b-256 |
f5c0606f6053311b2e78ba1677e3ed8ac36eb6ba7c063127db2a6a2fd5e38d95
|