Skip to main content

ZooPipe is a data processing framework that allows you to process data in a declarative way.

Project description

ZooPipe

ZooPipe is a lightweight, high-performance data processing framework for Python that combines the simplicity of Pydantic validation with the power of parallel processing.

Whether you're migrating data, cleaning CSVs, or processing streams, ZooPipe provides a structured way to handle validation, transformation, and error management without the complexity of big data frameworks.


โœจ Key Features

  • ๐Ÿ” Declarative Validation: Use Pydantic models to define and validate your data structures
  • ๐Ÿ”Œ Pluggable Architecture: Easily swap Input Adapters, Output Adapters, and Executors
  • โšก Parallel Processing: Scale from single-threaded to distributed computing with MultiprocessingExecutor, ThreadExecutor, DaskExecutor and RayExecutor
  • ๐Ÿ—œ๏ธ High-Performance Serialization: Uses msgpack and optional LZ4 compression for efficient inter-process communication
  • ๐Ÿ“Š Built-in Format Support: Direct support for CSV, JSON (array & JSONL), Parquet, and S3-compatible storage (Boto3, MinIO)
  • ๐Ÿšจ Automated Error Handling: Dedicated error output adapter to capture records that fail validation
  • ๐Ÿช Hooks System: Transform and enrich data at various pipeline stages with built-in and custom hooks
  • ๐Ÿ”„ Async Ready: Base adapters provided for async implementations
  • ๐Ÿ›ก๏ธ Type Safe: Fully type-hinted for a better developer experience

๐Ÿš€ Quick Start

Installation

uv add zoopipe

Or using pip:

pip install zoopipe

Simple Example

from pydantic import BaseModel, ConfigDict
from zoopipe import Pipe
from zoopipe.executor.sync_fifo import SyncFifoExecutor
from zoopipe.input_adapter.csv import CSVInputAdapter
from zoopipe.output_adapter.csv import CSVOutputAdapter

class UserSchema(BaseModel):
    model_config = ConfigDict(extra="ignore")
    name: str
    last_name: str
    age: int

pipe = Pipe(
    input_adapter=CSVInputAdapter("users.csv"),
    output_adapter=CSVOutputAdapter("processed_users.csv"),
    error_output_adapter=CSVOutputAdapter("errors.csv"),
    executor=SyncFifoExecutor(UserSchema),
)

# Start the pipewith context manager to ensure cleanup
with pipe:
    report = pipe.start()
    report.wait()

print(f"Finished! Processed {report.total_processed} items.")

๐Ÿ’ก Common Data Flows

CSV โ†’ PostgreSQL (via SQLAlchemy)

from zoopipe import Pipe
from zoopipe.executor.multiprocessing import MultiProcessingExecutor
from zoopipe.input_adapter.csv import CSVInputAdapter
from zoopipe.output_adapter.sqlalchemy import SQLAlchemyOutputAdapter

pipe = Pipe(
    input_adapter=CSVInputAdapter("data.csv"),
    output_adapter=SQLAlchemyOutputAdapter("postgresql://user:pass@localhost/db", "users"),
    executor=MultiProcessingExecutor(UserSchema, max_workers=4),
)

S3 โ†’ DuckDB (with JIT fetching)

from zoopipe import Pipe
from zoopipe.executor.ray import RayExecutor
from zoopipe.input_adapter.boto3 import Boto3InputAdapter
from zoopipe.output_adapter.duckdb import DuckDBOutputAdapter

pipe = Pipe(
    input_adapter=Boto3InputAdapter("my-bucket", prefix="data/", jit=True),
    output_adapter=DuckDBOutputAdapter("analytics.duckdb", "events", batch_size=5000),
    executor=RayExecutor(EventSchema),
)

SQLAlchemy โ†’ Parquet

from zoopipe import Pipe
from zoopipe.executor.dask import DaskExecutor
from zoopipe.input_adapter.sqlalchemy import SQLAlchemyInputAdapter
from zoopipe.output_adapter.arrow import ArrowOutputAdapter

pipe = Pipe(
    input_adapter=SQLAlchemyInputAdapter("mysql://localhost/olddb", "legacy_table"),
    output_adapter=ArrowOutputAdapter("output.parquet", format="parquet"),
    executor=DaskExecutor(MigrationSchema),
)

๐Ÿ“š Documentation

Getting Started

Core Concepts

  • Executors - Learn about SyncFifoExecutor, MultiprocessingExecutor, ThreadExecutor, DaskExecutor and RayExecutor
  • Adapters - Input and Output adapters for various data sources
  • Examples - Practical examples for common use cases

๐ŸŽฏ Use Cases

ZooPipe excels at:

  • Legacy Data Migrations: Moving data between heterogeneous databases with validation
  • ETL Pipelines: Extract, Transform, Load workflows with error handling
  • Data Cleaning: Processing manually generated files (Excel/CSV) with inconsistent formats
  • Quality Filters: Acting as a validation layer before loading data into Data Lakes or ML models
  • Batch Processing: Processing large datasets with parallel execution

๐Ÿงฉ Architecture

ZooPipe uses a decoupled architecture based on four components:

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”      โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”      โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚  Input Adapter  โ”‚โ”€โ”€โ”€โ”€โ”€โ–ถโ”‚   Executor   โ”‚โ”€โ”€โ”€โ”€โ”€โ–ถโ”‚ Output Adapter  โ”‚
โ”‚ (CSV, JSON,     โ”‚      โ”‚ (Validation  โ”‚      โ”‚ (CSV, JSON,     โ”‚
โ”‚  Parquet, DB,   โ”‚      โ”‚  & Transform)โ”‚      โ”‚  Parquet, DB,   โ”‚
โ”‚  S3, API, etc)  โ”‚      โ”‚              โ”‚      โ”‚  S3, API, etc)  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜      โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜      โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                                โ”‚
                                โ”‚ (errors)
                                โ–ผ
                         โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
                         โ”‚ Error Output โ”‚
                         โ”‚   Adapter    โ”‚
                         โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
  • Input Adapter: Reads data from sources (CSV, JSON, Parquet, SQL, S3, API)
  • Executor: Validates with Pydantic and processes data (sequential or parallel)
  • Output Adapter: Persists validated data
  • Error Output Adapter: Captures failed validations (Dead Letter Queue)

๐Ÿ”ง Executors

ZooPipe provides three execution strategies:

Executor Best For Parallelism
SyncFifoExecutor Small datasets, debugging Single-threaded
MultiprocessingExecutor Large datasets on single machine Multi-process (CPU cores)
ThreadExecutor IO-bound tasks (network/DB) Multi-thread
AsyncIOExecutor Async workflows, async hooks Asyncio concurrency
DaskExecutor ETL pipelines, Dask users Dask cluster
RayExecutor Massive datasets, distributed Ray cluster

See the Executors documentation for detailed information.


๐Ÿ›  Development

Setup

git clone https://github.com/albertobadia/zoopipe.git
cd zoopipe
uv sync

Running Tests

uv run pytest -v

Linting

./lint.sh

๐Ÿ“„ License

This project is licensed under the MIT License - see the LICENSE file for details.


๐Ÿค Contributing

Contributions are welcome! Please feel free to submit a Pull Request.


๐Ÿ“ง Contact

Alberto Daniel Badia
Email: alberto_badia@enlacepatagonia.com
GitHub: @albertobadia

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

zoopipe-2026.1.10.tar.gz (48.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

zoopipe-2026.1.10-py3-none-any.whl (51.6 kB view details)

Uploaded Python 3

File details

Details for the file zoopipe-2026.1.10.tar.gz.

File metadata

  • Download URL: zoopipe-2026.1.10.tar.gz
  • Upload date:
  • Size: 48.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.22 {"installer":{"name":"uv","version":"0.9.22","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Arch Linux ARM","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for zoopipe-2026.1.10.tar.gz
Algorithm Hash digest
SHA256 9687a2b53ccc5bf6a0bda5e39d26722be87f8e7a5601d2a27edb367c19d1d7c0
MD5 508486895c75db0aaeaa86e4c3c764eb
BLAKE2b-256 f326461d5d52ba7b09629e71aa25ef68b8396604e4dd22c4977b340f03823d12

See more details on using hashes here.

File details

Details for the file zoopipe-2026.1.10-py3-none-any.whl.

File metadata

  • Download URL: zoopipe-2026.1.10-py3-none-any.whl
  • Upload date:
  • Size: 51.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.22 {"installer":{"name":"uv","version":"0.9.22","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Arch Linux ARM","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for zoopipe-2026.1.10-py3-none-any.whl
Algorithm Hash digest
SHA256 d9c8279ff215a033efa008fb926f740fa42f16da266745c8910da0566a4373b9
MD5 2a04f779e95561687a73ed8adca1cd84
BLAKE2b-256 00fd920fee311ba0926fe005cc9d5d9ec268fcb07e828f7b46ed68a9402b0f52

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page