Skip to main content

An easy-to-use pipeline builder.

Project description

PipeCo 🔗

Type-safe, composable data pipelines built on Pydantic

PipeCo is a lightweight Python framework for building robust data processing pipelines with compile-time type checking. Define your steps with clear input/output contracts, and PipeCo ensures they connect correctly.

✨ Features

  • Type Safety: Pydantic-based validation at every step boundary
  • Compile-Time Checks: Catches type mismatches before execution
  • Clean Abstractions: Simple Step base class for building reusable components
  • Registry System: Register and discover steps by name
  • Context Sharing: Pass loggers, resources, and cache between steps
  • Easy Composition: Chain steps together with confidence

📦 Installation

pip install pipeco

Or with uv:

uv add pipeco

🚀 Quick Start

1. Define Your Data Models

from pipeco import BaseModel

class InputData(BaseModel):
    text: str

class ProcessedData(BaseModel):
    upper_text: str
    length: int

2. Create Pipeline Steps

from pipeco import Step, Context, register

@register("uppercase")
class UppercaseStep(Step[InputData, ProcessedData, Nothing]):
    input_model = InputData
    output_model = ProcessedData
    config_model = Nothing
    
    def process(self, data: InputData, ctx: Context) -> ProcessedData:
        return ProcessedData(
            upper_text=data.text.upper(),
            length=len(data.text)
        )

3. Build and Run Your Pipeline

from pipeco import Pipeline, get_step

# Create pipeline
pipeline = Pipeline(steps=[
    get_step("uppercase")()
])

# Run it
result = pipeline.run(InputData(text="hello world"))
print(result.upper_text)  # "HELLO WORLD"
print(result.length)       # 11

📚 Core Concepts

Step

The Step class is the building block of pipelines. Each step:

  • Declares its input type (I), output type (O), and configuration type (C)
  • Implements the process() method for business logic
  • Automatically validates inputs and outputs using Pydantic

Pipeline

The Pipeline chains steps together and:

  • Verifies type compatibility at initialization (compile-time checking)
  • Validates data at each step boundary (runtime checking)
  • Passes context through all steps

Context

The Context object flows through your pipeline, providing:

  • Logger: Centralized logging
  • Resources: Shared objects (DB connections, API clients, etc.)
  • Cache: Data sharing between non-adjacent steps

Registry

Use @register() to make steps discoverable by name:

from pipeco import get_step

StepClass = get_step("step-name")
step_instance = StepClass(config)

🎯 Example: CSV Processing

See examples/pipelines.py for a complete example that:

  1. Reads a CSV file
  2. Transforms the data
  3. Saves the result
pipeline = Pipeline(steps=[
    CSVPathToDict(),
    ChangeFavoriteFood(),
    SaveDictToCSV({"save_path": "output.csv"})
])

pipeline.run(ExampleCSVModel(csv_path="input.csv"))

🛡️ Type Safety

PipeCo catches mismatches early:

# This raises TypeError at pipeline creation:
Pipeline(steps=[
    StepA(),  # outputs TypeX
    StepB(),  # expects TypeY (incompatible!)
])

📖 API Reference

Step[I, O, C]

Base class for pipeline steps.

  • process(data: I, ctx: Context) -> O: Override this method

Pipeline

  • __init__(steps: list[Step]): Create pipeline with type checking
  • run(data: BaseModel, ctx: Context | None) -> BaseModel: Execute pipeline

Context

  • logger: logging.Logger instance
  • resources: Shared resources dict
  • cache: Data cache dict

@register(name: str)

Decorator to register step classes.

get_step(name: str) -> type[Step]

Retrieve registered step class by name.

🤝 Contributing

Contributions welcome! This is a lightweight framework designed to stay simple.

📄 License

See LICENSE file for details.

🔗 Links

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pipeco-0.1.1.tar.gz (4.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pipeco-0.1.1-py3-none-any.whl (5.5 kB view details)

Uploaded Python 3

File details

Details for the file pipeco-0.1.1.tar.gz.

File metadata

  • Download URL: pipeco-0.1.1.tar.gz
  • Upload date:
  • Size: 4.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.1

File hashes

Hashes for pipeco-0.1.1.tar.gz
Algorithm Hash digest
SHA256 e557f89a11b2af6f8144265f7e4a591ec051635dda2ae07d347c6b5ad4d9ae2b
MD5 1f52ae71a1dec4e8e548ac96c8c4278c
BLAKE2b-256 4e6a8673e81a1ee5dba462e2b62b10642a9981a611be0794f300883e12b6f311

See more details on using hashes here.

File details

Details for the file pipeco-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: pipeco-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 5.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.1

File hashes

Hashes for pipeco-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 6dbfb8bfae1647c632b666c0950b5909cf695e08ece41d37e39fd4ffc83b9a45
MD5 85824e520b5694d86904b1c0b299cb28
BLAKE2b-256 4c5e6d0d747f3aefbf30223479e1c082edceaf5c53266b0b408ae123d03c784e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page