Skip to main content

Write streaming data to Parquet files with automatic sharding.

Project description

parquet-stream-writer

parquet-stream-writer provides a memory-efficient way to write streaming data to Parquet. It buffers incoming records and writes them incrementally to disk. When a configurable size threshold is reached, it starts a new Parquet shard, avoiding the need to load the entire dataset into memory. This makes this library suitable for datasets that are too large to fit in the available memory or for continuously generated data.

Installation

You can install parquet-stream-writer from PyPI using pip or from conda-forge with Pixi.

Using pip

pip install parquet-stream-writer

Using pixi

pixi init my_workspace && cd my_workspace
pixi add parquet-stream-writer

Usage

The core class is ParquetStreamWriter. It works as a context manager to ensure files are closed properly.

import pyarrow as pa
from parquet_stream_writer import ParquetStreamWriter

# Define your schema
schema = pa.schema([
    ("timestamp", pa.int64()),
    ("event_type", pa.string()),
    ("value", pa.float64())
])

# Simulate a data stream
def data_stream():
    for i in range(100):
        yield {
            "timestamp": [i],
            "event_type": ["reading"],
            "value": [float(i)]
        }

# Initialize the writer
# This will write to ./output_data/shard-0.parquet, ./output_data/shard-1.parquet, etc.
with ParquetStreamWriter("output_data", schema, overwrite=True) as writer:
    for batch in data_stream():
        writer.write_batch(batch)

Configuring file size and naming

You can configure when new files are created and how they are named. Additional PyArrow parameters can be passed through via **kwargs.

with ParquetStreamWriter(
    "data_stream",
    schema,
    shard_size_bytes=50 * 1024 * 1024,   # Shards will be approx. 50 MiB each
    file_prefix="events",                # Output name: events-0.parquet
    compression="snappy"
) as writer:
    for batch in stream:
        writer.write_batch(batch)

Accessing created files

After the writer closes, you can inspect which files were actually created. This is useful for logging or triggering downstream processes.

with ParquetStreamWriter("output", schema) as writer:
    writer.write_batch(batch_1)
    writer.write_batch(batch_2)

# The 'writer' object stores a list of the files it created
print("Data was written to the following files:")
for file_path in writer.written_files:
    print(file_path)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

parquet_stream_writer-0.1.1.tar.gz (4.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

parquet_stream_writer-0.1.1-py3-none-any.whl (5.9 kB view details)

Uploaded Python 3

File details

Details for the file parquet_stream_writer-0.1.1.tar.gz.

File metadata

  • Download URL: parquet_stream_writer-0.1.1.tar.gz
  • Upload date:
  • Size: 4.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.9

File hashes

Hashes for parquet_stream_writer-0.1.1.tar.gz
Algorithm Hash digest
SHA256 f9c306ea5ded59ca809741824a3a0fb4b6ab173af046f135dd770d54a9097d43
MD5 9b5b8c0911efad065a85c2493e0f22c7
BLAKE2b-256 24d94d7f83aae79733a3969c2ac9506521cc551bd6d188e307dbce8331abc4f7

See more details on using hashes here.

File details

Details for the file parquet_stream_writer-0.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for parquet_stream_writer-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 9478d44ada8d90bfa95f28aa3796fc9bff1a90f9baed740741173d457a04e949
MD5 60c0b22f14579745b081149074024ede
BLAKE2b-256 7d58ca2127832e57cecce5ae5911a56c00778cccd01dc8ecb8e162b0ae4ee6d0

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page