Skip to main content

An easy-to-use pipeline builder.

Project description

PipeCo 🔗

Type-safe, composable data pipelines built on Pydantic

PipeCo is a lightweight Python framework for building robust data processing pipelines with compile-time type checking. Define your steps with clear input/output contracts, and PipeCo ensures they connect correctly.

✨ Features

  • Type Safety: Pydantic-based validation at every step boundary
  • Compile-Time Checks: Catches type mismatches before execution
  • Clean Abstractions: Simple Step base class for building reusable components
  • Registry System: Register and discover steps by name
  • Context Sharing: Pass loggers, resources, and cache between steps
  • Easy Composition: Chain steps together with confidence

📦 Installation

pip install pipeco

Or with uv:

uv add pipeco

🚀 Quick Start

1. Define Your Data Models

from pipeco import BaseModel

class InputData(BaseModel):
    text: str

class ProcessedData(BaseModel):
    upper_text: str
    length: int

2. Create Pipeline Steps

from pipeco import Step, Context, register

@register("uppercase")
class UppercaseStep(Step[InputData, ProcessedData, Nothing]):
    input_model = InputData
    output_model = ProcessedData
    config_model = Nothing
    
    def process(self, data: InputData, ctx: Context) -> ProcessedData:
        return ProcessedData(
            upper_text=data.text.upper(),
            length=len(data.text)
        )

3. Build and Run Your Pipeline

from pipeco import Pipeline, get_step

# Create pipeline
pipeline = Pipeline(steps=[
    get_step("uppercase")()
])

# Run it
result = pipeline.run(InputData(text="hello world"))
print(result.upper_text)  # "HELLO WORLD"
print(result.length)       # 11

📚 Core Concepts

Step

The Step class is the building block of pipelines. Each step:

  • Declares its input type (I), output type (O), and configuration type (C)
  • Implements the process() method for business logic
  • Automatically validates inputs and outputs using Pydantic

Pipeline

The Pipeline chains steps together and:

  • Verifies type compatibility at initialization (compile-time checking)
  • Validates data at each step boundary (runtime checking)
  • Passes context through all steps

Context

The Context object flows through your pipeline, providing:

  • Logger: Centralized logging
  • Resources: Shared objects (DB connections, API clients, etc.)
  • Cache: Data sharing between non-adjacent steps

Registry

Use @register() to make steps discoverable by name:

from pipeco import get_step

StepClass = get_step("step-name")
step_instance = StepClass(config)

🎯 Example: CSV Processing

See examples/pipelines.py for a complete example that:

  1. Reads a CSV file
  2. Transforms the data
  3. Saves the result
pipeline = Pipeline(steps=[
    CSVPathToDict(),
    ChangeFavoriteFood(),
    SaveDictToCSV({"save_path": "output.csv"})
])

pipeline.run(ExampleCSVModel(csv_path="input.csv"))

🛡️ Type Safety

PipeCo catches mismatches early:

# This raises TypeError at pipeline creation:
Pipeline(steps=[
    StepA(),  # outputs TypeX
    StepB(),  # expects TypeY (incompatible!)
])

📖 API Reference

Step[I, O, C]

Base class for pipeline steps.

  • process(data: I, ctx: Context) -> O: Override this method

Pipeline

  • __init__(steps: list[Step]): Create pipeline with type checking
  • run(data: BaseModel, ctx: Context | None) -> BaseModel: Execute pipeline

Context

  • logger: logging.Logger instance
  • resources: Shared resources dict
  • cache: Data cache dict

@register(name: str)

Decorator to register step classes.

get_step(name: str) -> type[Step]

Retrieve registered step class by name.

🤝 Contributing

Contributions welcome! This is a lightweight framework designed to stay simple.

📄 License

See LICENSE file for details.

🔗 Links

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pipeco-0.1.2.tar.gz (4.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pipeco-0.1.2-py3-none-any.whl (5.6 kB view details)

Uploaded Python 3

File details

Details for the file pipeco-0.1.2.tar.gz.

File metadata

  • Download URL: pipeco-0.1.2.tar.gz
  • Upload date:
  • Size: 4.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.1

File hashes

Hashes for pipeco-0.1.2.tar.gz
Algorithm Hash digest
SHA256 031408eba2660d6b852bca86c721cb5537cb4a7a4339e71f593a6441590ee8a4
MD5 bd84d67f1f0aae1196d3894046880b8a
BLAKE2b-256 5f28c7784776142d668a1c3653a2e5430e52df7dc636fee0e8067700d6364a24

See more details on using hashes here.

File details

Details for the file pipeco-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: pipeco-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 5.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.1

File hashes

Hashes for pipeco-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 c9026877a13b35a36959056c35761484676add5ba05af6c12092615a14ab66fa
MD5 1724b930478d75dd17fca21562551348
BLAKE2b-256 956484de540a9545da4984b3d1b53e2518ba7ba85db06f7ab5b1ed1eccd9e633

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page