Skip to main content

An easy-to-use pipeline builder.

Project description

PipeCo 🔗

Type-safe, composable data pipelines built on Pydantic

PipeCo is a lightweight Python framework for building robust data processing pipelines with compile-time type checking. Define your steps with clear input/output contracts, and PipeCo ensures they connect correctly.

✨ Features

  • Type Safety: Pydantic-based validation at every step boundary
  • Compile-Time Checks: Catches type mismatches before execution
  • Clean Abstractions: Simple Step base class for building reusable components
  • Registry System: Register and discover steps by name
  • Context Sharing: Pass loggers, resources, and cache between steps
  • Easy Composition: Chain steps together with confidence

📦 Installation

pip install pipeco

Or with uv:

uv add pipeco

🚀 Quick Start

1. Define Your Data Models

from pipeco import BaseModel

class InputData(BaseModel):
    text: str

class ProcessedData(BaseModel):
    upper_text: str
    length: int

2. Create Pipeline Steps

from pipeco import Step, Context, register

@register("uppercase")
class UppercaseStep(Step[InputData, ProcessedData, Nothing]):
    input_model = InputData
    output_model = ProcessedData
    config_model = Nothing
    
    def process(self, data: InputData, ctx: Context) -> ProcessedData:
        return ProcessedData(
            upper_text=data.text.upper(),
            length=len(data.text)
        )

3. Build and Run Your Pipeline

from pipeco import Pipeline, get_step

# Create pipeline
pipeline = Pipeline(steps=[
    get_step("uppercase")()
])

# Run it
result = pipeline.run(InputData(text="hello world"))
print(result.upper_text)  # "HELLO WORLD"
print(result.length)       # 11

📚 Core Concepts

Step

The Step class is the building block of pipelines. Each step:

  • Declares its input type (I), output type (O), and configuration type (C)
  • Implements the process() method for business logic
  • Automatically validates inputs and outputs using Pydantic

Pipeline

The Pipeline chains steps together and:

  • Verifies type compatibility at initialization (compile-time checking)
  • Validates data at each step boundary (runtime checking)
  • Passes context through all steps

Context

The Context object flows through your pipeline, providing:

  • Logger: Centralized logging
  • Resources: Shared objects (DB connections, API clients, etc.)
  • Cache: Data sharing between non-adjacent steps

Registry

Use @register() to make steps discoverable by name:

from pipeco import get_step

StepClass = get_step("step-name")
step_instance = StepClass(config)

🎯 Example: CSV Processing

See examples/pipelines.py for a complete example that:

  1. Reads a CSV file
  2. Transforms the data
  3. Saves the result
pipeline = Pipeline(steps=[
    CSVPathToDict(),
    ChangeFavoriteFood(),
    SaveDictToCSV({"save_path": "output.csv"})
])

pipeline.run(ExampleCSVModel(csv_path="input.csv"))

🛡️ Type Safety

PipeCo catches mismatches early:

# This raises TypeError at pipeline creation:
Pipeline(steps=[
    StepA(),  # outputs TypeX
    StepB(),  # expects TypeY (incompatible!)
])

📖 API Reference

Step[I, O, C]

Base class for pipeline steps.

  • process(data: I, ctx: Context) -> O: Override this method

Pipeline

  • __init__(steps: list[Step]): Create pipeline with type checking
  • run(data: BaseModel, ctx: Context | None) -> BaseModel: Execute pipeline

Context

  • logger: logging.Logger instance
  • resources: Shared resources dict
  • cache: Data cache dict

@register(name: str)

Decorator to register step classes.

get_step(name: str) -> type[Step]

Retrieve registered step class by name.

🤝 Contributing

Contributions welcome! This is a lightweight framework designed to stay simple.

📄 License

See LICENSE file for details.

🔗 Links

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pipeco-0.1.3.tar.gz (4.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pipeco-0.1.3-py3-none-any.whl (5.7 kB view details)

Uploaded Python 3

File details

Details for the file pipeco-0.1.3.tar.gz.

File metadata

  • Download URL: pipeco-0.1.3.tar.gz
  • Upload date:
  • Size: 4.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.1

File hashes

Hashes for pipeco-0.1.3.tar.gz
Algorithm Hash digest
SHA256 62f375b71c52dec8637e61b58e7b80548b0c07afb44db7aa9a6a9fee880b8402
MD5 399cd957b099e7f6c999f630c28068d1
BLAKE2b-256 bc746dc4b07cd75b683df327d10fcb4301afb8eedf1aed0e2cf7034948ec6117

See more details on using hashes here.

File details

Details for the file pipeco-0.1.3-py3-none-any.whl.

File metadata

  • Download URL: pipeco-0.1.3-py3-none-any.whl
  • Upload date:
  • Size: 5.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.1

File hashes

Hashes for pipeco-0.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 86e9dc5af62a9855390e48e4144b68eee5adfccaa8a548df444d274068e5b8dd
MD5 bf079a6f8f00c0974a1756944e3af990
BLAKE2b-256 13ae7d77e45d3ac151c5dd68e57607fe8eaebb7bce3f8f6b759d299ea9e33bad

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page