ZooPipe is a data processing framework that allows you to process data in a declarative way.
Project description
ZooPipe
ZooPipe is a lightweight, high-performance data processing framework for Python that combines the simplicity of Pydantic validation with the power of parallel processing.
Whether you're migrating data, cleaning CSVs, or processing streams, ZooPipe provides a structured way to handle validation, transformation, and error management without the complexity of big data frameworks.
โจ Key Features
- ๐ Declarative Validation: Use Pydantic models to define and validate your data structures
- ๐ Pluggable Architecture: Easily swap Input Adapters, Output Adapters, and Executors
- โก Parallel Processing: Scale from single-threaded to distributed computing with
MultiprocessingExecutor,ThreadExecutor,DaskExecutorandRayExecutor - ๐๏ธ High-Performance Serialization: Uses msgpack and optional LZ4 compression for efficient inter-process communication
- ๐ Built-in Format Support: Direct support for CSV, JSON (array & JSONL), Parquet, and S3-compatible storage (Boto3, MinIO)
- ๐จ Automated Error Handling: Dedicated error output adapter to capture records that fail validation
- ๐ช Hooks System: Transform and enrich data at various pipeline stages with built-in and custom hooks
- ๐ Async Ready: Base adapters provided for async implementations
- ๐ก๏ธ Type Safe: Fully type-hinted for a better developer experience
๐ Quick Start
Installation
uv add zoopipe
Or using pip:
pip install zoopipe
Simple Example
from pydantic import BaseModel, ConfigDict
from zoopipe import Pipe
from zoopipe.executor.sync_fifo import SyncFifoExecutor
from zoopipe.input_adapter.csv import CSVInputAdapter
from zoopipe.output_adapter.csv import CSVOutputAdapter
class UserSchema(BaseModel):
model_config = ConfigDict(extra="ignore")
name: str
last_name: str
age: int
pipe = Pipe(
input_adapter=CSVInputAdapter("users.csv"),
output_adapter=CSVOutputAdapter("processed_users.csv"),
error_output_adapter=CSVOutputAdapter("errors.csv"),
executor=SyncFifoExecutor(UserSchema),
)
# Start the pipewith context manager to ensure cleanup
with pipe:
report = pipe.start()
report.wait()
print(f"Finished! Processed {report.total_processed} items.")
๐ก Common Data Flows
CSV โ PostgreSQL (via SQLAlchemy)
from zoopipe import Pipe
from zoopipe.executor.multiprocessing import MultiProcessingExecutor
from zoopipe.input_adapter.csv import CSVInputAdapter
from zoopipe.output_adapter.sqlalchemy import SQLAlchemyOutputAdapter
pipe = Pipe(
input_adapter=CSVInputAdapter("data.csv"),
output_adapter=SQLAlchemyOutputAdapter("postgresql://user:pass@localhost/db", "users"),
executor=MultiProcessingExecutor(UserSchema, max_workers=4),
)
S3 โ DuckDB (with JIT fetching)
from zoopipe import Pipe
from zoopipe.executor.ray import RayExecutor
from zoopipe.input_adapter.boto3 import Boto3InputAdapter
from zoopipe.output_adapter.duckdb import DuckDBOutputAdapter
pipe = Pipe(
input_adapter=Boto3InputAdapter("my-bucket", prefix="data/", jit=True),
output_adapter=DuckDBOutputAdapter("analytics.duckdb", "events", batch_size=5000),
executor=RayExecutor(EventSchema),
)
SQLAlchemy โ Parquet
from zoopipe import Pipe
from zoopipe.executor.dask import DaskExecutor
from zoopipe.input_adapter.sqlalchemy import SQLAlchemyInputAdapter
from zoopipe.output_adapter.arrow import ArrowOutputAdapter
pipe = Pipe(
input_adapter=SQLAlchemyInputAdapter("mysql://localhost/olddb", "legacy_table"),
output_adapter=ArrowOutputAdapter("output.parquet", format="parquet"),
executor=DaskExecutor(MigrationSchema),
)
๐ Documentation
Getting Started
- Installation & First Steps - Get up and running quickly
Core Concepts
- Executors - Learn about SyncFifoExecutor, MultiprocessingExecutor, ThreadExecutor, DaskExecutor and RayExecutor
- Adapters - Input and Output adapters for various data sources
- Examples - Practical examples for common use cases
๐ฏ Use Cases
ZooPipe excels at:
- Legacy Data Migrations: Moving data between heterogeneous databases with validation
- ETL Pipelines: Extract, Transform, Load workflows with error handling
- Data Cleaning: Processing manually generated files (Excel/CSV) with inconsistent formats
- Quality Filters: Acting as a validation layer before loading data into Data Lakes or ML models
- Batch Processing: Processing large datasets with parallel execution
๐งฉ Architecture
ZooPipe uses a decoupled architecture based on four components:
โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ
โ Input Adapter โโโโโโโถโ Executor โโโโโโโถโ Output Adapter โ
โ (CSV, JSON, โ โ (Validation โ โ (CSV, JSON, โ
โ Parquet, DB, โ โ & Transform)โ โ Parquet, DB, โ
โ S3, API, etc) โ โ โ โ S3, API, etc) โ
โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ
โ
โ (errors)
โผ
โโโโโโโโโโโโโโโโ
โ Error Output โ
โ Adapter โ
โโโโโโโโโโโโโโโโ
- Input Adapter: Reads data from sources (CSV, JSON, Parquet, SQL, S3, API)
- Executor: Validates with Pydantic and processes data (sequential or parallel)
- Output Adapter: Persists validated data
- Error Output Adapter: Captures failed validations (Dead Letter Queue)
๐ง Executors
ZooPipe provides three execution strategies:
| Executor | Best For | Parallelism |
|---|---|---|
SyncFifoExecutor |
Small datasets, debugging | Single-threaded |
MultiprocessingExecutor |
Large datasets on single machine | Multi-process (CPU cores) |
ThreadExecutor |
IO-bound tasks (network/DB) | Multi-thread |
AsyncIOExecutor |
Async workflows, async hooks | Asyncio concurrency |
DaskExecutor |
ETL pipelines, Dask users | Dask cluster |
RayExecutor |
Massive datasets, distributed | Ray cluster |
See the Executors documentation for detailed information.
๐ Development
Setup
git clone https://github.com/albertobadia/zoopipe.git
cd zoopipe
uv sync
Running Tests
uv run pytest -v
Linting
./lint.sh
๐ License
This project is licensed under the MIT License - see the LICENSE file for details.
๐ค Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
๐ง Contact
Alberto Daniel Badia
Email: alberto_badia@enlacepatagonia.com
GitHub: @albertobadia
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file zoopipe-2026.1.10.tar.gz.
File metadata
- Download URL: zoopipe-2026.1.10.tar.gz
- Upload date:
- Size: 48.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.22 {"installer":{"name":"uv","version":"0.9.22","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Arch Linux ARM","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9687a2b53ccc5bf6a0bda5e39d26722be87f8e7a5601d2a27edb367c19d1d7c0
|
|
| MD5 |
508486895c75db0aaeaa86e4c3c764eb
|
|
| BLAKE2b-256 |
f326461d5d52ba7b09629e71aa25ef68b8396604e4dd22c4977b340f03823d12
|
File details
Details for the file zoopipe-2026.1.10-py3-none-any.whl.
File metadata
- Download URL: zoopipe-2026.1.10-py3-none-any.whl
- Upload date:
- Size: 51.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.22 {"installer":{"name":"uv","version":"0.9.22","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Arch Linux ARM","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d9c8279ff215a033efa008fb926f740fa42f16da266745c8910da0566a4373b9
|
|
| MD5 |
2a04f779e95561687a73ed8adca1cd84
|
|
| BLAKE2b-256 |
00fd920fee311ba0926fe005cc9d5d9ec268fcb07e828f7b46ed68a9402b0f52
|