Skip to main content

ZooPipe is a data processing framework that allows you to process data in a declarative way.

Project description

ZooPipe

ZooPipe is a lightweight, high-performance data processing framework for Python that combines the simplicity of Pydantic validation with the power of parallel processing.

Whether you're migrating data, cleaning CSVs, or processing streams, ZooPipe provides a structured way to handle validation, transformation, and error management without the complexity of big data frameworks.


โœจ Key Features

  • ๐Ÿ” Declarative Validation: Use Pydantic models to define and validate your data structures
  • ๐Ÿ”Œ Pluggable Architecture: Easily swap Input Adapters, Output Adapters, and Executors
  • โšก Parallel Processing: Scale from single-threaded to distributed computing with MultiprocessingExecutor, ThreadExecutor, DaskExecutor and RayExecutor
  • ๐Ÿ—œ๏ธ High-Performance Serialization: Uses msgpack and optional LZ4 compression for efficient inter-process communication
  • ๐Ÿ“Š Built-in Format Support: Direct support for CSV, JSON (array & JSONL), and Parquet files
  • ๐Ÿšจ Automated Error Handling: Dedicated error output adapter to capture records that fail validation
  • ๐Ÿช Hooks System: Transform and enrich data at various pipeline stages with built-in and custom hooks
  • ๐Ÿ”„ Async Ready: Base adapters provided for async implementations
  • ๐Ÿ›ก๏ธ Type Safe: Fully type-hinted for a better developer experience

๐Ÿš€ Quick Start

Installation

uv add zoopipe

Or using pip:

pip install zoopipe

Simple Example

from pydantic import BaseModel, ConfigDict
from zoopipe import Pipe
from zoopipe.executor.sync_fifo import SyncFifoExecutor
from zoopipe.input_adapter.csv import CSVInputAdapter
from zoopipe.output_adapter.csv import CSVOutputAdapter

class UserSchema(BaseModel):
    model_config = ConfigDict(extra="ignore")
    name: str
    last_name: str
    age: int

pipe = Pipe(
    input_adapter=CSVInputAdapter("users.csv"),
    output_adapter=CSVOutputAdapter("processed_users.csv"),
    error_output_adapter=CSVOutputAdapter("errors.csv"),
    executor=SyncFifoExecutor(UserSchema),
)

# Start the pipewith context manager to ensure cleanup
with pipe:
    report = pipe.start()
    report.wait()

print(f"Finished! Processed {report.total_processed} items.")

๐Ÿ“š Documentation

Getting Started

Core Concepts

  • Executors - Learn about SyncFifoExecutor, MultiprocessingExecutor, ThreadExecutor, DaskExecutor and RayExecutor
  • Adapters - Input and Output adapters for various data sources
  • Examples - Practical examples for common use cases

๐ŸŽฏ Use Cases

ZooPipe excels at:

  • Legacy Data Migrations: Moving data between heterogeneous databases with validation
  • ETL Pipelines: Extract, Transform, Load workflows with error handling
  • Data Cleaning: Processing manually generated files (Excel/CSV) with inconsistent formats
  • Quality Filters: Acting as a validation layer before loading data into Data Lakes or ML models
  • Batch Processing: Processing large datasets with parallel execution

๐Ÿงฉ Architecture

ZooPipe uses a decoupled architecture based on four components:

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”      โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”      โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚  Input Adapter  โ”‚โ”€โ”€โ”€โ”€โ”€โ–ถโ”‚   Executor   โ”‚โ”€โ”€โ”€โ”€โ”€โ–ถโ”‚ Output Adapter  โ”‚
โ”‚ (CSV, JSON,     โ”‚      โ”‚ (Validation  โ”‚      โ”‚ (CSV, JSON,     โ”‚
โ”‚  Parquet, DB,   โ”‚      โ”‚  & Transform)โ”‚      โ”‚  Parquet, DB,   โ”‚
โ”‚    API, etc)    โ”‚      โ”‚              โ”‚      โ”‚   API, etc)     โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜      โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜      โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                                โ”‚
                                โ”‚ (errors)
                                โ–ผ
                         โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
                         โ”‚ Error Output โ”‚
                         โ”‚   Adapter    โ”‚
                         โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
  • Input Adapter: Reads data from sources (CSV, JSON, Parquet, SQL, API)
  • Executor: Validates with Pydantic and processes data (sequential or parallel)
  • Output Adapter: Persists validated data
  • Error Output Adapter: Captures failed validations (Dead Letter Queue)

Learn more in the Architecture RFC.


๐Ÿ”ง Executors

ZooPipe provides three execution strategies:

Executor Best For Parallelism
SyncFifoExecutor Small datasets, debugging Single-threaded
MultiprocessingExecutor Large datasets on single machine Multi-process (CPU cores)
ThreadExecutor IO-bound tasks (network/DB) Multi-thread
DaskExecutor ETL pipelines, Dask users Dask cluster
RayExecutor Massive datasets, distributed Ray cluster

See the Executors documentation for detailed information.


๐Ÿ›  Development

Setup

git clone https://github.com/albertobadia/zoopipe.git
cd zoopipe
uv sync

Running Tests

uv run pytest -v

Linting

./lint.sh

๐Ÿ“„ License

This project is licensed under the MIT License - see the LICENSE file for details.


๐Ÿค Contributing

Contributions are welcome! Please feel free to submit a Pull Request.


๐Ÿ“ง Contact

Alberto Daniel Badia
Email: alberto_badia@enlacepatagonia.com
GitHub: @albertobadia

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

zoopipe-2026.1.9.tar.gz (30.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

zoopipe-2026.1.9-py3-none-any.whl (31.7 kB view details)

Uploaded Python 3

File details

Details for the file zoopipe-2026.1.9.tar.gz.

File metadata

  • Download URL: zoopipe-2026.1.9.tar.gz
  • Upload date:
  • Size: 30.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.22 {"installer":{"name":"uv","version":"0.9.22","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Arch Linux ARM","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for zoopipe-2026.1.9.tar.gz
Algorithm Hash digest
SHA256 ee8af7772f4fac9d3187c1e1d579c79d3c615958fc784c747ba40c18c9392a4e
MD5 340ab3262b566bb901b56a9fd8033ae1
BLAKE2b-256 8327dc507156f393b282fad1ebe3d9467afe676c1b040b715ac4632e6352fd3f

See more details on using hashes here.

File details

Details for the file zoopipe-2026.1.9-py3-none-any.whl.

File metadata

  • Download URL: zoopipe-2026.1.9-py3-none-any.whl
  • Upload date:
  • Size: 31.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.22 {"installer":{"name":"uv","version":"0.9.22","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Arch Linux ARM","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for zoopipe-2026.1.9-py3-none-any.whl
Algorithm Hash digest
SHA256 f51bf0636b67323c70eeb660fbd85c21fc225d4d559470a9163ebb38dc39d68d
MD5 d6a984e4f121973a0b58f7b754507ece
BLAKE2b-256 bff98f5c194398e91e488e54cc78eb326ceeabb1050e04e95a151411b27a23be

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page