Plug-in-driven column-oriented data processing CLI tool with Polars LazyFrame at its core.
Project description
cryoflow
A plugin-driven columnar data processing CLI tool built on Polars LazyFrame.
cryoflow is a powerful tool for processing Apache Arrow (IPC/Parquet) format data through a customizable chain of plugins. It enables you to perform complex data transformations, validations, and output operations with a simple configuration file.
Features
- 🔌 Plugin-Driven Architecture: Extend functionality by writing custom plugins
- ⚡ Lazy Evaluation: Powered by Polars LazyFrame for efficient data processing
- 📋 Configuration-Based: Define your data pipeline in TOML configuration files
- ✅ Dry-Run Mode: Validate your pipeline configuration and schema before processing
- 🛡️ Robust Error Handling: Railway-oriented programming with Result types
- 🔄 Streaming Support: Process both Parquet and Apache Arrow IPC formats
- 🎯 Type-Safe: End-to-end type safety from data loading to output
Prerequisites
- Python 3.14 or higher
- uv (recommended) or pip
Installation
Using uv (recommended)
uv tool install cryoflow
Try without installing:
uvx cryoflow --help
Using pip
pip install cryoflow
Using Nix
If you have Nix installed, you can run cryoflow directly:
nix run github:yasunori0418/cryoflow -- --help
Or add cryoflow to your NixOS configuration or flake.nix:
inputs = {
cryoflow.url = "github:yasunori0418/cryoflow";
};
From Source
git clone https://github.com/yasunori0418/cryoflow
cd cryoflow
uv sync # or pip install -e .
Development with Nix Flake and direnv
For development purposes, you can use either direnv or nix CLI:
Using direnv (recommended)
# Copy the example configuration
cp example.envrc .envrc
# Allow direnv to load the configuration
direnv allow
This automatically loads the development environment with Nix, including uv, ruff, and pyright tools. When you enter the directory, direnv automatically activates the environment defined in dev/flake.nix.
Using Nix CLI
# Enter the development environment from dev/flake.nix
nix develop ./dev
Quick Start
1. Create a Configuration File
Create a config.toml file:
[[input_plugins]]
name = "parquet-scan"
module = "cryoflow_plugin_collections.input.parquet_scan"
label = "default"
[input_plugins.options]
input_path = "data/input.parquet"
[[transform_plugins]]
name = "column-multiplier"
module = "cryoflow_plugin_collections.transform.multiplier"
enabled = true
[transform_plugins.options]
column_name = "amount"
multiplier = 2
[[output_plugins]]
name = "parquet-writer"
module = "cryoflow_plugin_collections.output.parquet_writer"
enabled = true
[output_plugins.options]
output_path = "data/output.parquet"
2. Run the Pipeline
cryoflow run -c config.toml
3. Validate Configuration
Before running, validate your configuration:
cryoflow check -c config.toml
Usage
CLI Commands
run - Execute Data Processing Pipeline
Executes the complete data processing pipeline as defined in the configuration file.
# Use default configuration file (searches XDG_CONFIG_HOME/cryoflow/config.toml)
cryoflow run
# Specify custom configuration file
cryoflow run -c path/to/config.toml
# Output detailed logs for debugging
cryoflow run -c config.toml -v
check - Validate Configuration & Schema
Validates pipeline configuration and schema without processing actual data. This is useful for pre-flight checks.
# Verify configuration validity
cryoflow check -c config.toml
# Verify with detailed logs
cryoflow check -c config.toml -v
Use cases for check command:
- Validate TOML syntax of configuration file
- Verify that all required plugins can be loaded
- Validate schema transformations (confirm transformed column types)
- Pre-flight validation before running large data processing jobs
Configuration File
The configuration file uses TOML format and defines:
- input_plugins: Array of input plugin configurations (data source definitions)
- transform_plugins: Array of transform plugin configurations
- output_plugins: Array of output plugin configurations
Each plugin entry specifies:
- name: Plugin identifier
- module: Python module path to load the plugin from
- label: Data stream label for routing (default:
"default") - enabled: Whether the plugin should be executed (true/false)
- options: Plugin-specific configuration options
Path Resolution
Important: All file paths in the configuration are resolved relative to the directory containing the configuration file, not the current working directory.
- Absolute paths: Used as-is (e.g.,
/absolute/path/to/file.parquet) - Relative paths: Resolved relative to the config file's directory (e.g.,
data/input.parquet)
This design ensures that configuration files are portable - you can move your entire project directory without breaking path references.
Example:
project/
config/
config.toml # Configuration file here
data/
input.parquet # Referenced as "data/input.parquet"
output.parquet # Referenced as "data/output.parquet"
Recommendation: Use relative paths in your configuration files to maximize portability.
Example Configuration
# Input plugin: data source (relative to config file directory)
[[input_plugins]]
name = "parquet-scan"
module = "cryoflow_plugin_collections.input.parquet_scan"
label = "default"
[input_plugins.options]
input_path = "data/sample_sales.parquet"
# Transform plugin: data transformation
[[transform_plugins]]
name = "column-multiplier"
module = "cryoflow_plugin_collections.transform.multiplier"
enabled = true
[transform_plugins.options]
column_name = "total_amount"
multiplier = 2
# Output plugin: write result (path is also relative to config file directory)
[[output_plugins]]
name = "parquet-writer"
module = "cryoflow_plugin_collections.output.parquet_writer"
enabled = true
[output_plugins.options]
output_path = "data/output.parquet"
Configuration File Locations
cryoflow searches for configuration files in the following order:
- Explicitly specified path via
-coption $XDG_CONFIG_HOME/cryoflow/config.toml(typically~/.config/cryoflow/config.toml)- Default examples configuration (if available)
Plugin System
Plugins are the core extension mechanism in cryoflow. There are three types of plugins:
InputPlugin
Loads data from a source. Takes no arguments and returns a FrameData result.
class MyInputPlugin(InputPlugin):
def execute(self) -> Result[FrameData, Exception]:
# Your data loading logic
return Success(pl.scan_parquet(self.options['input_path']))
def dry_run(self) -> Result[dict[str, pl.DataType], Exception]:
# Return expected schema without loading data
return Success(expected_schema)
TransformPlugin
Transforms data in the pipeline. Receives a DataFrame/LazyFrame and returns the transformed result.
class MyTransformPlugin(TransformPlugin):
def execute(self, df: FrameData) -> Result[FrameData, Exception]:
# Your transformation logic
return Success(df.with_columns(...))
def dry_run(self, schema: dict) -> Result[dict, Exception]:
# Validate transformation with schema
return Success(new_schema)
OutputPlugin
Outputs data to storage. Receives the final DataFrame/LazyFrame and handles output operations.
class MyOutputPlugin(OutputPlugin):
def execute(self, df: FrameData) -> Result[None, Exception]:
# Your output logic
df.sink_parquet("output.parquet")
return Success(None)
def dry_run(self, schema: dict[str, pl.DataType]) -> Result[dict[str, pl.DataType], Exception]:
# Validate output capability
return Success(schema)
All plugins have a dry_run method that enables schema validation without processing actual data.
Error Handling
cryoflow uses the returns library for robust error handling through railway-oriented programming with the Result type.
- Data passed between plugins is wrapped in
Result[FrameData, Exception] - Pipeline control uses
flow/bindcombinators and immediately halts processing when aFailureoccurs - No silent failures - all errors are explicitly handled
Data Flow Architecture
Config Load
↓
Plugin Discovery
↓
Pipeline Construction
↓
Execution / Output
- Config Load: Load and validate configuration from TOML file using Pydantic
- Plugin Discovery: Load specified modules via
importliband register withpluggy - Pipeline Construction: Convert source data to LazyFrame, execute
TransformPluginhooks to build the computation graph - Execution / Output: Execute
OutputPluginhooks wherecollect()orsink_*()is called and processing actually runs
Technology Stack
| Category | Library/Technology | Purpose |
|---|---|---|
| Core | Polars | Columnar data processing engine (LazyFrame-based) |
| CLI | Typer | Modern CLI framework and command definitions |
| Plugin | pluggy + importlib | Plugin mechanism, hook management, dynamic loading |
| Config | Pydantic + TOML | Type-safe configuration definition and validation |
| Path | xdg-base-dirs | XDG Base Directory specification compliance |
| Error | returns | Functional error handling via Result Monad |
| Base | ABC (Standard Lib) | Plugin interface definitions |
Examples
Sample data and configuration files are provided in the examples/ directory:
# Run the sample pipeline
cryoflow run -c examples/config.toml
# Check the sample configuration
cryoflow check -c examples/config.toml
The examples directory includes:
config.toml: Sample pipeline configurationdata/sample_sales.parquet: Sample sales data (Parquet format)data/sample_sales.ipc: Same data in Arrow IPC formatdata/sensor_readings.parquet: Sensor data example
Documentation
For detailed information, see:
- Specification - Complete API specification and interface design
- Plugin Development Guide - Guide for developing custom plugins
- CI/CD Documentation - Workflow and automation details
- Implementation Plan - Early development planning document (archived)
- Progress - Early development progress tracking (archived)
English documentation is available as docs/{filename}.md.
日本語ドキュメントは docs/{filename}_ja.md をご参照ください。
Troubleshooting
Configuration File Not Found
Error: Configuration file not found
Solution:
- Check that the file path is correct
- Use
-coption to specify the configuration file explicitly - Ensure
~/.config/cryoflow/config.tomlexists if using default location
Plugin Not Found
Error: Module not found: cryoflow_plugin_collections
Solution:
- Install required plugins:
pip install cryoflow-plugin-collections - Verify the module path in the configuration file
Schema Validation Error
Error: Schema validation failed
Solution:
- Run
cryoflow check -c config.toml -vto see detailed validation logs - Verify column names and types match your input data
- Check plugin options are correct
For More Help
- Check example configurations in
examples/directory - Run with
-vflag for verbose logging - See documentation in
docs/directory
License
This project is licensed under the MIT License. See LICENSE file for details.
Credits
cryoflow re-exports APIs from the following libraries to simplify plugin development. We gratefully acknowledge the authors and contributors of these projects.
Polars
cryoflow_plugin_collections.libs.polars re-exports the complete Polars public API to reduce dependency management overhead for plugin developers.
Polars is licensed under the MIT License.
Copyright (c) 2025 Ritchie Vink
Copyright (c) 2024 (Some portions) NVIDIA CORPORATION & AFFILIATES. All rights reserved.
returns
cryoflow_plugin_collections.libs.returns re-exports Result, Success, Failure, ResultE, safe, Maybe, Some, Nothing, and maybe from the returns library to provide railway-oriented programming utilities for plugin developers.
returns is licensed under the BSD 2-Clause License.
Copyright 2016-2021 dry-python organization
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
Support
For issues, questions, or suggestions, please open an issue on the GitHub repository.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file cryoflow-0.2.3.tar.gz.
File metadata
- Download URL: cryoflow-0.2.3.tar.gz
- Upload date:
- Size: 1.6 MB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.9 {"installer":{"name":"uv","version":"0.10.9","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
758019da9f8c2caccadfa921f9375fbd16dcfffbfa295247ce4dee0cc7a58797
|
|
| MD5 |
2541493f4f1de67a38c7b75c4fb500b1
|
|
| BLAKE2b-256 |
e9e44ac7211e3e9aa3f847841836fd4605c16d6b35fd411396a443d4b73ec3df
|
File details
Details for the file cryoflow-0.2.3-py3-none-any.whl.
File metadata
- Download URL: cryoflow-0.2.3-py3-none-any.whl
- Upload date:
- Size: 7.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.9 {"installer":{"name":"uv","version":"0.10.9","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
eb697aa293800c929bb0809f5c54ccb4cd09be40d6034e7689fe45b6886ea9e2
|
|
| MD5 |
72d58f0aceb5c528301a7fd71da9f60d
|
|
| BLAKE2b-256 |
b5ae0ecb682d181d90d1ae4521cde689b8293d228e141999b038c0124159e3f2
|