Skip to main content

A clean, expressive pipeline pattern for Python.

Project description

Lucid Pipeline

A clean, expressive pipeline pattern for Python — pass data through a series of steps with elegance.

Stop writing deeply nested function calls and messy for loops for multi-step data processing. Lucid Pipeline gives Python a clean, expressive way to pass data through a series of steps — readable, testable, and beautiful.

PyPI version Python 3.10+ License: MIT


Before & After

Without Lucid Pipeline:

data = get_request_data()
data = strip_whitespace(data)
data = validate_fields(data)
result = normalize_emails(data)
if user.is_premium:
    result = apply_premium_features(result)
try:
    return save_to_database(result)
except Exception:
    return handle_error(result)

With Lucid Pipeline:

from lucid_pipeline import Pipeline

result = (
    Pipeline(get_request_data())
    .through([strip_whitespace, validate_fields, normalize_emails])
    .when(user.is_premium, [apply_premium_features])
    .on_failure(handle_error)
    .then(save_to_database)
)

Installation

pip install lucid-pipeline

Requires Python 3.10 or higher. No dependencies.


Quick Start

Simple function pipeline

from lucid_pipeline import Pipeline

def double(value):
    return value * 2

def add_ten(value):
    return value + 10

def to_string(value):
    return f"Result: {value}"

result = Pipeline(5).through([double, add_ten, to_string]).then_return()
# result = "Result: 20"

With a final destination

result = Pipeline(5).through([double, add_ten]).then(to_string)
# result = "Result: 20"

Class-based pipes

from lucid_pipeline import Pipe

class TrimStrings(Pipe):
    def handle(self, data, next_pipe):
        if isinstance(data, dict):
            data = {k: v.strip() if isinstance(v, str) else v for k, v in data.items()}
        return next_pipe(data)

class ConvertEmptyStringsToNone(Pipe):
    def handle(self, data, next_pipe):
        if isinstance(data, dict):
            data = {k: (None if v == "" else v) for k, v in data.items()}
        return next_pipe(data)

result = (
    Pipeline({"name": "  John  ", "age": 30, "bio": ""})
    .through([TrimStrings(), ConvertEmptyStringsToNone()])
    .then_return()
)
# result = {"name": "John", "age": 30, "bio": None}

Full API Reference

Pipeline(passable)

Creates a new pipeline with the data to be passed through.

Parameter Type Description
passable Any The data that will flow through the pipeline.
pipeline = Pipeline({"email": "JOHN@EXAMPLE.COM"})

.through(pipes)

Sets the list of pipes the data will travel through. Can be called multiple times — each call appends to the existing pipe list (does not replace).

Parameter Type Description
pipes list[Callable | Pipe] List of functions or Pipe instances.

A pipe can be any of:

  1. A plain function — receives the passable, returns transformed passable.
  2. A Pipe subclass instance — must implement handle(self, data, next_pipe).
  3. A lambda — for inline transforms.
Pipeline(data).through([step_one, step_two]).through([step_three])
# All three steps run in order: step_one → step_two → step_three

.pipe(single_pipe)

Appends a single pipe. Syntactic sugar for .through([single_pipe]).

Pipeline(data).pipe(validate).pipe(transform).pipe(save).then_return()

.when(condition, pipes)

Conditionally adds pipes. The pipes only run if condition is truthy. If condition is a callable, it receives the current passable and is evaluated at execution time.

Parameter Type Description
condition bool | Callable[[Any], bool] Static value or callable returning bool.
pipes list[Callable | Pipe] Pipes to add if condition is met.
# Static condition (evaluated immediately)
Pipeline(order).when(order.has_coupon, [apply_discount]).then_return()

# Dynamic condition (evaluated at pipeline execution time)
Pipeline(order).when(lambda data: data.total > 100, [apply_bulk_discount]).then_return()

.unless(condition, pipes)

Inverse of .when() — pipes run only if the condition is falsy.

Pipeline(user).unless(user.is_verified, [send_verification_prompt]).then_return()

.tap(callback)

Runs a side effect without modifying the passable. Useful for logging, debugging, event emission.

Parameter Type Description
callback Callable[[Any], None] Called with the current passable. Return value is ignored.
Pipeline(request)
    .through([authenticate])
    .tap(lambda data: logger.info(f"Authenticated: {data.user_id}"))
    .through([authorize, process])
    .then_return()

.on_failure(handler)

Registers an exception handler. If any pipe raises an exception, the handler is called with (passable, exception) instead of the pipeline crashing.

Parameter Type Description
handler Callable[[Any, Exception], Any] Receives the passable and the exception.
def handle_error(data, error):
    log_error(error)
    return {"success": False, "error": str(error)}

result = (
    Pipeline(data)
    .through([risky_step_one, risky_step_two])
    .on_failure(handle_error)
    .then_return()
)

If no failure handler is set, exceptions propagate normally.


.then(destination)

Executes the pipeline and passes the final result to a destination function.

Parameter Type Description
destination Callable[[Any], Any] Final function that receives the result.
result = Pipeline(data).through([validate, transform]).then(save_to_database)

.then_return()

Executes the pipeline and returns the result directly without a destination.

result = Pipeline(data).through([validate, transform]).then_return()

Pipe (Base Class)

Abstract base class for class-based pipes.

from lucid_pipeline import Pipe

class MyPipe(Pipe):
    def handle(self, data, next_pipe):
        # Transform data
        data["processed"] = True
        # MUST call next_pipe to continue the pipeline
        return next_pipe(data)

The next_pipe callback:

Class-based pipes receive a next_pipe callable. This is what makes them powerful — you can run logic before AND after the rest of the pipeline (like middleware):

class TimingPipe(Pipe):
    def handle(self, data, next_pipe):
        start = time.time()
        result = next_pipe(data)          # Run remaining pipes
        elapsed = time.time() - start     # This runs AFTER downstream pipes
        print(f"Pipeline took {elapsed:.2f}s")
        return result

Short-circuiting: Returning without calling next_pipe stops the pipeline early.

class AuthorizePipe(Pipe):
    def handle(self, data, next_pipe):
        if not data.get("is_authorized"):
            return {"error": "Unauthorized"}  # Pipeline stops here
        return next_pipe(data)

Async Support

All pipeline methods have async equivalents. Use AsyncPipeline for async pipes.

from lucid_pipeline import AsyncPipeline, AsyncPipe

async def fetch_user(data):
    data["user"] = await database.get_user(data["user_id"])
    return data

async def enrich_profile(data):
    data["profile"] = await external_api.get_profile(data["user"])
    return data

class AsyncCachePipe(AsyncPipe):
    async def handle(self, data, next_pipe):
        cached = await cache.get(data["key"])
        if cached:
            return cached
        result = await next_pipe(data)
        await cache.set(data["key"], result, ttl=300)
        return result

result = await (
    AsyncPipeline({"user_id": 42})
    .through([AsyncCachePipe(), fetch_user, enrich_profile])
    .then_return()
)

The AsyncPipeline accepts both sync and async callables — sync pipes are executed normally within the async pipeline.


Real-World Examples

HTTP Request Middleware

class CORSMiddleware(Pipe):
    def handle(self, request, next_pipe):
        response = next_pipe(request)
        response.headers["Access-Control-Allow-Origin"] = "*"
        return response

class RateLimitMiddleware(Pipe):
    def __init__(self, max_requests: int = 100):
        self.max_requests = max_requests

    def handle(self, request, next_pipe):
        if get_request_count(request.ip) > self.max_requests:
            return Response(status=429)
        return next_pipe(request)

response = (
    Pipeline(incoming_request)
    .through([
        CORSMiddleware(),
        RateLimitMiddleware(max_requests=60),
        AuthenticationMiddleware(),
    ])
    .then(route_to_controller)
)

ETL / Data Processing

result = (
    Pipeline(raw_csv_rows)
    .through([
        remove_empty_rows,
        parse_dates,
        normalize_currencies,
    ])
    .when(config.remove_outliers, [filter_outliers])
    .tap(lambda data: print(f"Processing {len(data)} rows"))
    .through([aggregate_by_region])
    .then(write_to_parquet)
)

Form Validation

class ValidateRequired(Pipe):
    def __init__(self, fields: list[str]):
        self.fields = fields

    def handle(self, data, next_pipe):
        missing = [f for f in self.fields if not data.get(f)]
        if missing:
            raise ValidationError(f"Missing required fields: {missing}")
        return next_pipe(data)

class ValidateEmail(Pipe):
    def handle(self, data, next_pipe):
        import re
        email = data.get("email", "")
        if not re.match(r"^[^@]+@[^@]+\.[^@]+$", email):
            raise ValidationError("Invalid email address")
        return next_pipe(data)

class NormalizeData(Pipe):
    def handle(self, data, next_pipe):
        data["email"] = data["email"].lower().strip()
        data["name"] = data["name"].strip().title()
        return next_pipe(data)

def on_validation_error(data, error):
    return {"valid": False, "errors": str(error), "data": data}

result = (
    Pipeline(form_data)
    .through([
        ValidateRequired(["name", "email"]),
        ValidateEmail(),
        NormalizeData(),
    ])
    .on_failure(on_validation_error)
    .then(lambda data: {"valid": True, "data": data})
)

Architecture

Project Structure

lucid-pipeline/
├── src/
│   └── lucid_pipeline/
│       ├── __init__.py          # Public API exports
│       ├── pipeline.py          # Pipeline class
│       ├── async_pipeline.py    # AsyncPipeline class
│       ├── pipe.py              # Pipe base class
│       ├── async_pipe.py        # AsyncPipe base class
│       └── exceptions.py        # PipelineError, PipelineFlowError
├── tests/
│   ├── __init__.py
│   ├── test_pipeline.py         # Pipeline unit tests
│   ├── test_async_pipeline.py   # AsyncPipeline unit tests
│   ├── test_pipe.py             # Pipe class tests
│   ├── test_conditional.py      # .when() / .unless() tests
│   ├── test_error_handling.py   # .on_failure() tests
│   ├── test_tap.py              # .tap() tests
│   └── test_edge_cases.py       # Empty pipes, short-circuit, etc.
├── pyproject.toml
├── README.md
├── LICENSE                      # MIT License
└── CHANGELOG.md

Implementation Notes

Pipeline execution model:

The pipeline uses a reducer pattern internally. For simple function pipes, each function receives the passable and returns the transformed value. For class-based Pipe instances, the pipeline builds a nested callable chain (like middleware onion) where each pipe's handle() receives a next_pipe callback pointing to the rest of the chain.

# Internal execution for function pipes:
# result = pipe_3(pipe_2(pipe_1(passable)))

# Internal execution for class-based pipes:
# Each handle() wraps the next, forming an onion:
# pipe_1.handle(data, lambda d: pipe_2.handle(d, lambda d: pipe_3.handle(d, identity)))

Mixed pipes (functions + classes):

When a pipeline contains both plain functions and Pipe instances, the pipeline wraps plain functions in a compatibility layer that calls next_pipe automatically:

# Internally, a plain function like:
def double(x): return x * 2

# Gets wrapped as if it were:
class _WrappedDouble(Pipe):
    def handle(self, data, next_pipe):
        return next_pipe(double(data))

Conditional evaluation:

  • .when(bool_value, pipes) — if bool_value is a plain bool, it's evaluated at pipeline construction time. Those pipes are either included or excluded from the chain before execution.
  • .when(callable, pipes) — if it's a callable, it's evaluated at execution time with the current passable value. This enables dynamic branching based on intermediate results.

Thread safety:

Pipeline instances are NOT thread-safe. Create a new Pipeline() per request/task. The Pipe classes themselves can be shared across pipelines if they hold no mutable state.

Type hints:

The package should be fully typed with generics where practical:

from lucid_pipeline import Pipeline

# Pipeline[InputType] tracks the passable type
pipeline: Pipeline[dict] = Pipeline({"key": "value"})

Public API (what __init__.py exports)

from lucid_pipeline.pipeline import Pipeline
from lucid_pipeline.async_pipeline import AsyncPipeline
from lucid_pipeline.pipe import Pipe
from lucid_pipeline.async_pipe import AsyncPipe
from lucid_pipeline.exceptions import PipelineError, PipelineFlowError

__all__ = [
    "Pipeline",
    "AsyncPipeline",
    "Pipe",
    "AsyncPipe",
    "PipelineError",
    "PipelineFlowError",
]

Exceptions

Exception When
PipelineError Base exception for all pipeline errors.
PipelineFlowError Raised when pipeline is misconfigured (e.g., no pipes set).

pyproject.toml Specification

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[project]
name = "lucid-pipeline"
version = "0.1.0"
description = "A clean, expressive pipeline pattern for Python."
readme = "README.md"
license = "MIT"
requires-python = ">=3.10"
authors = [
    { name = "Your Name", email = "your@email.com" },
]
keywords = ["pipeline", "middleware", "chain", "pipe", "data-processing", "workflow"]
classifiers = [
    "Development Status :: 4 - Beta",
    "Intended Audience :: Developers",
    "License :: OSI Approved :: MIT License",
    "Programming Language :: Python :: 3",
    "Programming Language :: Python :: 3.10",
    "Programming Language :: Python :: 3.11",
    "Programming Language :: Python :: 3.12",
    "Programming Language :: Python :: 3.13",
    "Topic :: Software Development :: Libraries :: Python Modules",
    "Typing :: Typed",
]

[project.urls]
Homepage = "https://github.com/yourname/lucid-pipeline"
Documentation = "https://github.com/yourname/lucid-pipeline#readme"
Repository = "https://github.com/yourname/lucid-pipeline"
Issues = "https://github.com/yourname/lucid-pipeline/issues"

[tool.pytest.ini_options]
testpaths = ["tests"]
asyncio_mode = "auto"

[tool.mypy]
strict = true

[project.optional-dependencies]
dev = ["pytest>=7.0", "pytest-asyncio>=0.21", "mypy>=1.0", "ruff>=0.1"]

Test Cases to Implement

Core Pipeline

  • Empty pipeline returns passable unchanged
  • Single function pipe transforms data
  • Multiple function pipes execute in order
  • then() passes result to destination function
  • then_return() returns result directly
  • Pipeline is immutable — calling .through() returns same instance but appends (chainable)

Class-Based Pipes

  • Pipe subclass handle() is called with data and next_pipe
  • Calling next_pipe continues the pipeline
  • NOT calling next_pipe short-circuits (returns early)
  • Pipe can run logic AFTER next_pipe (middleware pattern)
  • Mixed function + class pipes work together

Conditional Pipes

  • .when(True, pipes) includes the pipes
  • .when(False, pipes) skips the pipes
  • .when(callable, pipes) evaluates callable at execution time
  • .unless(True, pipes) skips the pipes
  • .unless(False, pipes) includes the pipes
  • Callable condition receives current passable value

Tap

  • .tap() receives current passable
  • .tap() return value is ignored (passable unchanged)
  • .tap() exceptions propagate normally

Error Handling

  • Without on_failure, exceptions propagate normally
  • With on_failure, handler receives (passable, exception)
  • Handler return value becomes the pipeline result
  • Handler is called with the original passable, not intermediate state

Async Pipeline

  • Async function pipes are awaited
  • Async Pipe subclass handle() is awaited
  • Sync functions work inside AsyncPipeline
  • .when() / .unless() / .tap() / .on_failure() all work async
  • Async callable conditions in .when() are awaited

Edge Cases

  • Pipeline with no pipes returns passable unchanged
  • Pipeline with None as passable works
  • Pipe that returns None explicitly passes None forward
  • Deeply nested pipelines (100+ pipes) don't stack overflow
  • .pipe() is equivalent to .through([single_pipe])
  • .through() called multiple times appends correctly

Publishing to PyPI

# Install build tools
pip install build twine

# Build the package
python -m build

# Upload to TestPyPI first
twine upload --repository testpypi dist/*

# Test install from TestPyPI
pip install --index-url https://test.pypi.org/simple/ lucid-pipeline

# Upload to real PyPI
twine upload dist/*

Part of the Lucid Ecosystem

Lucid Pipeline is the first package in the Lucid ecosystem — a collection of tools bringing clarity and convention to Python development.

Coming soon:

  • lucid-container — Dependency injection container with autowiring
  • lucid-cache — Multi-driver cache with a unified API
  • lucid-config — Cascading configuration management
  • lucid-events — Event dispatcher with listeners and subscribers

License

MIT License. See LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

lucid_pipeline-0.1.1.tar.gz (13.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

lucid_pipeline-0.1.1-py3-none-any.whl (13.1 kB view details)

Uploaded Python 3

File details

Details for the file lucid_pipeline-0.1.1.tar.gz.

File metadata

  • Download URL: lucid_pipeline-0.1.1.tar.gz
  • Upload date:
  • Size: 13.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.11

File hashes

Hashes for lucid_pipeline-0.1.1.tar.gz
Algorithm Hash digest
SHA256 7572bb96b2278ed512c9faa7f05ab026104c683a19d4faaccd7712c5a0a685a5
MD5 11b94823b0782192afe485eec4067cd0
BLAKE2b-256 73a1fa46df2267ffe594d41c987d12cba47ba6b3f1535b801a1eeaece5b80fed

See more details on using hashes here.

File details

Details for the file lucid_pipeline-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: lucid_pipeline-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 13.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.11

File hashes

Hashes for lucid_pipeline-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 eae618a7858f0ace4432f59e82497a4737dafc435443aee61faeed29c4003432
MD5 ee4ab96f89b77060d9371089ba6f957d
BLAKE2b-256 4f311917c1e641685b16a7eff3a17acc0d86c69e6f62619029ebc256999c33fc

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page