An easy-to-use pipeline builder.
Project description
PipeCo 🔗
Type-safe, composable data pipelines built on Pydantic
PipeCo is a lightweight Python framework for building robust data processing pipelines with compile-time type checking. Define your steps with clear input/output contracts, and PipeCo ensures they connect correctly.
✨ Features
- Type Safety: Pydantic-based validation at every step boundary
- Compile-Time Checks: Catches type mismatches before execution
- Clean Abstractions: Simple
Stepbase class for building reusable components - Registry System: Register and discover steps by name
- Context Sharing: Pass loggers, resources, and cache between steps
- Easy Composition: Chain steps together with confidence
📦 Installation
pip install pipeco
Or with uv:
uv add pipeco
🚀 Quick Start
1. Define Your Data Models
from pipeco import BaseModel
class InputData(BaseModel):
text: str
class ProcessedData(BaseModel):
upper_text: str
length: int
2. Create Pipeline Steps
from pipeco import Step, Context, register
@register("uppercase")
class UppercaseStep(Step[InputData, ProcessedData, Nothing]):
input_model = InputData
output_model = ProcessedData
config_model = Nothing
def process(self, data: InputData, ctx: Context) -> ProcessedData:
return ProcessedData(
upper_text=data.text.upper(),
length=len(data.text)
)
3. Build and Run Your Pipeline
from pipeco import Pipeline, get_step
# Create pipeline
pipeline = Pipeline(steps=[
get_step("uppercase")()
])
# Run it
result = pipeline.run(InputData(text="hello world"))
print(result.upper_text) # "HELLO WORLD"
print(result.length) # 11
📚 Core Concepts
Step
The Step class is the building block of pipelines. Each step:
- Declares its input type (
I), output type (O), and configuration type (C) - Implements the
process()method for business logic - Automatically validates inputs and outputs using Pydantic
Pipeline
The Pipeline chains steps together and:
- Verifies type compatibility at initialization (compile-time checking)
- Validates data at each step boundary (runtime checking)
- Passes context through all steps
Context
The Context object flows through your pipeline, providing:
- Logger: Centralized logging
- Resources: Shared objects (DB connections, API clients, etc.)
- Cache: Data sharing between non-adjacent steps
Registry
Use @register() to make steps discoverable by name:
from pipeco import get_step
StepClass = get_step("step-name")
step_instance = StepClass(config)
🎯 Example: CSV Processing
See examples/pipelines.py for a complete example that:
- Reads a CSV file
- Transforms the data
- Saves the result
pipeline = Pipeline(steps=[
CSVPathToDict(),
ChangeFavoriteFood(),
SaveDictToCSV({"save_path": "output.csv"})
])
pipeline.run(ExampleCSVModel(csv_path="input.csv"))
🛡️ Type Safety
PipeCo catches mismatches early:
# This raises TypeError at pipeline creation:
Pipeline(steps=[
StepA(), # outputs TypeX
StepB(), # expects TypeY (incompatible!)
])
📖 API Reference
Step[I, O, C]
Base class for pipeline steps.
process(data: I, ctx: Context) -> O: Override this method
Pipeline
__init__(steps: list[Step]): Create pipeline with type checkingrun(data: BaseModel, ctx: Context | None) -> BaseModel: Execute pipeline
Context
logger: logging.Logger instanceresources: Shared resources dictcache: Data cache dict
@register(name: str)
Decorator to register step classes.
get_step(name: str) -> type[Step]
Retrieve registered step class by name.
🤝 Contributing
Contributions welcome! This is a lightweight framework designed to stay simple.
📄 License
See LICENSE file for details.
🔗 Links
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pipeco-0.1.3.tar.gz.
File metadata
- Download URL: pipeco-0.1.3.tar.gz
- Upload date:
- Size: 4.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.1
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
62f375b71c52dec8637e61b58e7b80548b0c07afb44db7aa9a6a9fee880b8402
|
|
| MD5 |
399cd957b099e7f6c999f630c28068d1
|
|
| BLAKE2b-256 |
bc746dc4b07cd75b683df327d10fcb4301afb8eedf1aed0e2cf7034948ec6117
|
File details
Details for the file pipeco-0.1.3-py3-none-any.whl.
File metadata
- Download URL: pipeco-0.1.3-py3-none-any.whl
- Upload date:
- Size: 5.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.1
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
86e9dc5af62a9855390e48e4144b68eee5adfccaa8a548df444d274068e5b8dd
|
|
| MD5 |
bf079a6f8f00c0974a1756944e3af990
|
|
| BLAKE2b-256 |
13ae7d77e45d3ac151c5dd68e57607fe8eaebb7bce3f8f6b759d299ea9e33bad
|