Skip to main content

Plug-in-driven column-oriented data processing CLI tool with Polars LazyFrame at its core.

Project description

cryoflow

A plugin-driven columnar data processing CLI tool built on Polars LazyFrame.

cryoflow is a powerful tool for processing Apache Arrow (IPC/Parquet) format data through a customizable chain of plugins. It enables you to perform complex data transformations, validations, and output operations with a simple configuration file.

Features

  • 🔌 Plugin-Driven Architecture: Extend functionality by writing custom plugins
  • Lazy Evaluation: Powered by Polars LazyFrame for efficient data processing
  • 📋 Configuration-Based: Define your data pipeline in TOML configuration files
  • Dry-Run Mode: Validate your pipeline configuration and schema before processing
  • 🛡️ Robust Error Handling: Railway-oriented programming with Result types
  • 🔄 Streaming Support: Process both Parquet and Apache Arrow IPC formats
  • 🎯 Type-Safe: End-to-end type safety from data loading to output

Prerequisites

  • Python 3.14 or higher
  • uv (recommended) or pip

Installation

Using uv (recommended)

uv tool install cryoflow

Try without installing:

uvx cryoflow --help

Using pip

pip install cryoflow

Using Nix

If you have Nix installed, you can run cryoflow directly:

nix run github:yasunori0418/cryoflow -- --help

Or add cryoflow to your NixOS configuration or flake.nix:

inputs = {
  cryoflow.url = "github:yasunori0418/cryoflow";
};

From Source

git clone https://github.com/yasunori0418/cryoflow
cd cryoflow
uv sync  # or pip install -e .

Development with Nix Flake and direnv

For development purposes, you can use either direnv or nix CLI:

Using direnv (recommended)

# Copy the example configuration
cp example.envrc .envrc

# Allow direnv to load the configuration
direnv allow

This automatically loads the development environment with Nix, including uv, ruff, and pyright tools. When you enter the directory, direnv automatically activates the environment defined in dev/flake.nix.

Using Nix CLI

# Enter the development environment from dev/flake.nix
nix develop ./dev

Quick Start

1. Create a Configuration File

Create a config.toml file:

[[input_plugins]]
name = "parquet-scan"
module = "cryoflow_plugin_collections.input.parquet_scan"
label = "default"
[input_plugins.options]
input_path = "data/input.parquet"

[[transform_plugins]]
name = "column-multiplier"
module = "cryoflow_plugin_collections.transform.multiplier"
enabled = true
[transform_plugins.options]
column_name = "amount"
multiplier = 2

[[output_plugins]]
name = "parquet-writer"
module = "cryoflow_plugin_collections.output.parquet_writer"
enabled = true
[output_plugins.options]
output_path = "data/output.parquet"

2. Run the Pipeline

cryoflow run -c config.toml

3. Validate Configuration

Before running, validate your configuration:

cryoflow check -c config.toml

Usage

CLI Commands

run - Execute Data Processing Pipeline

Executes the complete data processing pipeline as defined in the configuration file.

# Use default configuration file (searches XDG_CONFIG_HOME/cryoflow/config.toml)
cryoflow run

# Specify custom configuration file
cryoflow run -c path/to/config.toml

# Output detailed logs for debugging
cryoflow run -c config.toml -v

check - Validate Configuration & Schema

Validates pipeline configuration and schema without processing actual data. This is useful for pre-flight checks.

# Verify configuration validity
cryoflow check -c config.toml

# Verify with detailed logs
cryoflow check -c config.toml -v

Use cases for check command:

  • Validate TOML syntax of configuration file
  • Verify that all required plugins can be loaded
  • Validate schema transformations (confirm transformed column types)
  • Pre-flight validation before running large data processing jobs

Configuration File

The configuration file uses TOML format and defines:

  • input_plugins: Array of input plugin configurations (data source definitions)
  • transform_plugins: Array of transform plugin configurations
  • output_plugins: Array of output plugin configurations

Each plugin entry specifies:

  • name: Plugin identifier
  • module: Python module path to load the plugin from
  • label: Data stream label for routing (default: "default")
  • enabled: Whether the plugin should be executed (true/false)
  • options: Plugin-specific configuration options

Path Resolution

Important: All file paths in the configuration are resolved relative to the directory containing the configuration file, not the current working directory.

  • Absolute paths: Used as-is (e.g., /absolute/path/to/file.parquet)
  • Relative paths: Resolved relative to the config file's directory (e.g., data/input.parquet)

This design ensures that configuration files are portable - you can move your entire project directory without breaking path references.

Example:

project/
  config/
    config.toml         # Configuration file here
    data/
      input.parquet     # Referenced as "data/input.parquet"
      output.parquet    # Referenced as "data/output.parquet"

Recommendation: Use relative paths in your configuration files to maximize portability.

Example Configuration

# Input plugin: data source (relative to config file directory)
[[input_plugins]]
name = "parquet-scan"
module = "cryoflow_plugin_collections.input.parquet_scan"
label = "default"
[input_plugins.options]
input_path = "data/sample_sales.parquet"

# Transform plugin: data transformation
[[transform_plugins]]
name = "column-multiplier"
module = "cryoflow_plugin_collections.transform.multiplier"
enabled = true
[transform_plugins.options]
column_name = "total_amount"
multiplier = 2

# Output plugin: write result (path is also relative to config file directory)
[[output_plugins]]
name = "parquet-writer"
module = "cryoflow_plugin_collections.output.parquet_writer"
enabled = true
[output_plugins.options]
output_path = "data/output.parquet"

Configuration File Locations

cryoflow searches for configuration files in the following order:

  1. Explicitly specified path via -c option
  2. $XDG_CONFIG_HOME/cryoflow/config.toml (typically ~/.config/cryoflow/config.toml)
  3. Default examples configuration (if available)

Plugin System

Plugins are the core extension mechanism in cryoflow. There are three types of plugins:

InputPlugin

Loads data from a source. Takes no arguments and returns a FrameData result.

class MyInputPlugin(InputPlugin):
    def execute(self) -> Result[FrameData, Exception]:
        # Your data loading logic
        return Success(pl.scan_parquet(self.options['input_path']))

    def dry_run(self) -> Result[dict[str, pl.DataType], Exception]:
        # Return expected schema without loading data
        return Success(expected_schema)

TransformPlugin

Transforms data in the pipeline. Receives a DataFrame/LazyFrame and returns the transformed result.

class MyTransformPlugin(TransformPlugin):
    def execute(self, df: FrameData) -> Result[FrameData, Exception]:
        # Your transformation logic
        return Success(df.with_columns(...))

    def dry_run(self, schema: dict) -> Result[dict, Exception]:
        # Validate transformation with schema
        return Success(new_schema)

OutputPlugin

Outputs data to storage. Receives the final DataFrame/LazyFrame and handles output operations.

class MyOutputPlugin(OutputPlugin):
    def execute(self, df: FrameData) -> Result[None, Exception]:
        # Your output logic
        df.sink_parquet("output.parquet")
        return Success(None)

    def dry_run(self, schema: dict[str, pl.DataType]) -> Result[dict[str, pl.DataType], Exception]:
        # Validate output capability
        return Success(schema)

All plugins have a dry_run method that enables schema validation without processing actual data.

Error Handling

cryoflow uses the returns library for robust error handling through railway-oriented programming with the Result type.

  • Data passed between plugins is wrapped in Result[FrameData, Exception]
  • Pipeline control uses flow/bind combinators and immediately halts processing when a Failure occurs
  • No silent failures - all errors are explicitly handled

Data Flow Architecture

Config Load
    ↓
Plugin Discovery
    ↓
Pipeline Construction
    ↓
Execution / Output
  1. Config Load: Load and validate configuration from TOML file using Pydantic
  2. Plugin Discovery: Load specified modules via importlib and register with pluggy
  3. Pipeline Construction: Convert source data to LazyFrame, execute TransformPlugin hooks to build the computation graph
  4. Execution / Output: Execute OutputPlugin hooks where collect() or sink_*() is called and processing actually runs

Technology Stack

Category Library/Technology Purpose
Core Polars Columnar data processing engine (LazyFrame-based)
CLI Typer Modern CLI framework and command definitions
Plugin pluggy + importlib Plugin mechanism, hook management, dynamic loading
Config Pydantic + TOML Type-safe configuration definition and validation
Path xdg-base-dirs XDG Base Directory specification compliance
Error returns Functional error handling via Result Monad
Base ABC (Standard Lib) Plugin interface definitions

Examples

Sample data and configuration files are provided in the examples/ directory:

# Run the sample pipeline
cryoflow run -c examples/config.toml

# Check the sample configuration
cryoflow check -c examples/config.toml

The examples directory includes:

  • config.toml: Sample pipeline configuration
  • data/sample_sales.parquet: Sample sales data (Parquet format)
  • data/sample_sales.ipc: Same data in Arrow IPC format
  • data/sensor_readings.parquet: Sensor data example

Documentation

For detailed information, see:

English documentation is available as docs/{filename}.md. 日本語ドキュメントは docs/{filename}_ja.md をご参照ください。

Troubleshooting

Configuration File Not Found

Error: Configuration file not found

Solution:

  • Check that the file path is correct
  • Use -c option to specify the configuration file explicitly
  • Ensure ~/.config/cryoflow/config.toml exists if using default location

Plugin Not Found

Error: Module not found: cryoflow_plugin_collections

Solution:

  • Install required plugins: pip install cryoflow-plugin-collections
  • Verify the module path in the configuration file

Schema Validation Error

Error: Schema validation failed

Solution:

  • Run cryoflow check -c config.toml -v to see detailed validation logs
  • Verify column names and types match your input data
  • Check plugin options are correct

For More Help

  • Check example configurations in examples/ directory
  • Run with -v flag for verbose logging
  • See documentation in docs/ directory

License

This project is licensed under the MIT License. See LICENSE file for details.

Credits

cryoflow re-exports APIs from the following libraries to simplify plugin development. We gratefully acknowledge the authors and contributors of these projects.

Polars

cryoflow_plugin_collections.libs.polars re-exports the complete Polars public API to reduce dependency management overhead for plugin developers.

Polars is licensed under the MIT License.

Copyright (c) 2025 Ritchie Vink
Copyright (c) 2024 (Some portions) NVIDIA CORPORATION & AFFILIATES. All rights reserved.

returns

cryoflow_plugin_collections.libs.returns re-exports Result, Success, Failure, ResultE, safe, Maybe, Some, Nothing, and maybe from the returns library to provide railway-oriented programming utilities for plugin developers.

returns is licensed under the BSD 2-Clause License.

Copyright 2016-2021 dry-python organization

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

Support

For issues, questions, or suggestions, please open an issue on the GitHub repository.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cryoflow-0.2.3.tar.gz (1.6 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

cryoflow-0.2.3-py3-none-any.whl (7.9 kB view details)

Uploaded Python 3

File details

Details for the file cryoflow-0.2.3.tar.gz.

File metadata

  • Download URL: cryoflow-0.2.3.tar.gz
  • Upload date:
  • Size: 1.6 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.9 {"installer":{"name":"uv","version":"0.10.9","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for cryoflow-0.2.3.tar.gz
Algorithm Hash digest
SHA256 758019da9f8c2caccadfa921f9375fbd16dcfffbfa295247ce4dee0cc7a58797
MD5 2541493f4f1de67a38c7b75c4fb500b1
BLAKE2b-256 e9e44ac7211e3e9aa3f847841836fd4605c16d6b35fd411396a443d4b73ec3df

See more details on using hashes here.

File details

Details for the file cryoflow-0.2.3-py3-none-any.whl.

File metadata

  • Download URL: cryoflow-0.2.3-py3-none-any.whl
  • Upload date:
  • Size: 7.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.9 {"installer":{"name":"uv","version":"0.10.9","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for cryoflow-0.2.3-py3-none-any.whl
Algorithm Hash digest
SHA256 eb697aa293800c929bb0809f5c54ccb4cd09be40d6034e7689fe45b6886ea9e2
MD5 72d58f0aceb5c528301a7fd71da9f60d
BLAKE2b-256 b5ae0ecb682d181d90d1ae4521cde689b8293d228e141999b038c0124159e3f2

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page