A clean, expressive pipeline pattern for Python.
Project description
Lucid Pipeline
A clean, expressive pipeline pattern for Python — pass data through a series of steps with elegance.
Stop writing deeply nested function calls and messy for loops for multi-step data processing. Lucid Pipeline gives Python a clean, expressive way to pass data through a series of steps — readable, testable, and beautiful.
Before & After
Without Lucid Pipeline:
data = get_request_data()
data = strip_whitespace(data)
data = validate_fields(data)
result = normalize_emails(data)
if user.is_premium:
result = apply_premium_features(result)
try:
return save_to_database(result)
except Exception:
return handle_error(result)
With Lucid Pipeline:
from lucid_pipeline import Pipeline
result = (
Pipeline(get_request_data())
.through([strip_whitespace, validate_fields, normalize_emails])
.when(user.is_premium, [apply_premium_features])
.on_failure(handle_error)
.then(save_to_database)
)
Installation
pip install lucid-pipeline
Requires Python 3.10 or higher. No dependencies.
Quick Start
Simple function pipeline
from lucid_pipeline import Pipeline
def double(value):
return value * 2
def add_ten(value):
return value + 10
def to_string(value):
return f"Result: {value}"
result = Pipeline(5).through([double, add_ten, to_string]).then_return()
# result = "Result: 20"
With a final destination
result = Pipeline(5).through([double, add_ten]).then(to_string)
# result = "Result: 20"
Class-based pipes
from lucid_pipeline import Pipe
class TrimStrings(Pipe):
def handle(self, data, next_pipe):
if isinstance(data, dict):
data = {k: v.strip() if isinstance(v, str) else v for k, v in data.items()}
return next_pipe(data)
class ConvertEmptyStringsToNone(Pipe):
def handle(self, data, next_pipe):
if isinstance(data, dict):
data = {k: (None if v == "" else v) for k, v in data.items()}
return next_pipe(data)
result = (
Pipeline({"name": " John ", "age": 30, "bio": ""})
.through([TrimStrings(), ConvertEmptyStringsToNone()])
.then_return()
)
# result = {"name": "John", "age": 30, "bio": None}
Full API Reference
Pipeline(passable)
Creates a new pipeline with the data to be passed through.
| Parameter | Type | Description |
|---|---|---|
passable |
Any |
The data that will flow through the pipeline. |
pipeline = Pipeline({"email": "JOHN@EXAMPLE.COM"})
.through(pipes)
Sets the list of pipes the data will travel through. Can be called multiple times — each call appends to the existing pipe list (does not replace).
| Parameter | Type | Description |
|---|---|---|
pipes |
list[Callable | Pipe] |
List of functions or Pipe instances. |
A pipe can be any of:
- A plain function — receives the passable, returns transformed passable.
- A Pipe subclass instance — must implement
handle(self, data, next_pipe). - A lambda — for inline transforms.
Pipeline(data).through([step_one, step_two]).through([step_three])
# All three steps run in order: step_one → step_two → step_three
.pipe(single_pipe)
Appends a single pipe. Syntactic sugar for .through([single_pipe]).
Pipeline(data).pipe(validate).pipe(transform).pipe(save).then_return()
.when(condition, pipes)
Conditionally adds pipes. The pipes only run if condition is truthy. If condition is a callable, it receives the current passable and is evaluated at execution time.
| Parameter | Type | Description |
|---|---|---|
condition |
bool | Callable[[Any], bool] |
Static value or callable returning bool. |
pipes |
list[Callable | Pipe] |
Pipes to add if condition is met. |
# Static condition (evaluated immediately)
Pipeline(order).when(order.has_coupon, [apply_discount]).then_return()
# Dynamic condition (evaluated at pipeline execution time)
Pipeline(order).when(lambda data: data.total > 100, [apply_bulk_discount]).then_return()
.unless(condition, pipes)
Inverse of .when() — pipes run only if the condition is falsy.
Pipeline(user).unless(user.is_verified, [send_verification_prompt]).then_return()
.tap(callback)
Runs a side effect without modifying the passable. Useful for logging, debugging, event emission.
| Parameter | Type | Description |
|---|---|---|
callback |
Callable[[Any], None] |
Called with the current passable. Return value is ignored. |
Pipeline(request)
.through([authenticate])
.tap(lambda data: logger.info(f"Authenticated: {data.user_id}"))
.through([authorize, process])
.then_return()
.on_failure(handler)
Registers an exception handler. If any pipe raises an exception, the handler is called with (passable, exception) instead of the pipeline crashing.
| Parameter | Type | Description |
|---|---|---|
handler |
Callable[[Any, Exception], Any] |
Receives the passable and the exception. |
def handle_error(data, error):
log_error(error)
return {"success": False, "error": str(error)}
result = (
Pipeline(data)
.through([risky_step_one, risky_step_two])
.on_failure(handle_error)
.then_return()
)
If no failure handler is set, exceptions propagate normally.
.then(destination)
Executes the pipeline and passes the final result to a destination function.
| Parameter | Type | Description |
|---|---|---|
destination |
Callable[[Any], Any] |
Final function that receives the result. |
result = Pipeline(data).through([validate, transform]).then(save_to_database)
.then_return()
Executes the pipeline and returns the result directly without a destination.
result = Pipeline(data).through([validate, transform]).then_return()
Pipe (Base Class)
Abstract base class for class-based pipes.
from lucid_pipeline import Pipe
class MyPipe(Pipe):
def handle(self, data, next_pipe):
# Transform data
data["processed"] = True
# MUST call next_pipe to continue the pipeline
return next_pipe(data)
The next_pipe callback:
Class-based pipes receive a next_pipe callable. This is what makes them powerful — you can run logic before AND after the rest of the pipeline (like middleware):
class TimingPipe(Pipe):
def handle(self, data, next_pipe):
start = time.time()
result = next_pipe(data) # Run remaining pipes
elapsed = time.time() - start # This runs AFTER downstream pipes
print(f"Pipeline took {elapsed:.2f}s")
return result
Short-circuiting: Returning without calling next_pipe stops the pipeline early.
class AuthorizePipe(Pipe):
def handle(self, data, next_pipe):
if not data.get("is_authorized"):
return {"error": "Unauthorized"} # Pipeline stops here
return next_pipe(data)
Async Support
All pipeline methods have async equivalents. Use AsyncPipeline for async pipes.
from lucid_pipeline import AsyncPipeline, AsyncPipe
async def fetch_user(data):
data["user"] = await database.get_user(data["user_id"])
return data
async def enrich_profile(data):
data["profile"] = await external_api.get_profile(data["user"])
return data
class AsyncCachePipe(AsyncPipe):
async def handle(self, data, next_pipe):
cached = await cache.get(data["key"])
if cached:
return cached
result = await next_pipe(data)
await cache.set(data["key"], result, ttl=300)
return result
result = await (
AsyncPipeline({"user_id": 42})
.through([AsyncCachePipe(), fetch_user, enrich_profile])
.then_return()
)
The AsyncPipeline accepts both sync and async callables — sync pipes are executed normally within the async pipeline.
Real-World Examples
HTTP Request Middleware
class CORSMiddleware(Pipe):
def handle(self, request, next_pipe):
response = next_pipe(request)
response.headers["Access-Control-Allow-Origin"] = "*"
return response
class RateLimitMiddleware(Pipe):
def __init__(self, max_requests: int = 100):
self.max_requests = max_requests
def handle(self, request, next_pipe):
if get_request_count(request.ip) > self.max_requests:
return Response(status=429)
return next_pipe(request)
response = (
Pipeline(incoming_request)
.through([
CORSMiddleware(),
RateLimitMiddleware(max_requests=60),
AuthenticationMiddleware(),
])
.then(route_to_controller)
)
ETL / Data Processing
result = (
Pipeline(raw_csv_rows)
.through([
remove_empty_rows,
parse_dates,
normalize_currencies,
])
.when(config.remove_outliers, [filter_outliers])
.tap(lambda data: print(f"Processing {len(data)} rows"))
.through([aggregate_by_region])
.then(write_to_parquet)
)
Form Validation
class ValidateRequired(Pipe):
def __init__(self, fields: list[str]):
self.fields = fields
def handle(self, data, next_pipe):
missing = [f for f in self.fields if not data.get(f)]
if missing:
raise ValidationError(f"Missing required fields: {missing}")
return next_pipe(data)
class ValidateEmail(Pipe):
def handle(self, data, next_pipe):
import re
email = data.get("email", "")
if not re.match(r"^[^@]+@[^@]+\.[^@]+$", email):
raise ValidationError("Invalid email address")
return next_pipe(data)
class NormalizeData(Pipe):
def handle(self, data, next_pipe):
data["email"] = data["email"].lower().strip()
data["name"] = data["name"].strip().title()
return next_pipe(data)
def on_validation_error(data, error):
return {"valid": False, "errors": str(error), "data": data}
result = (
Pipeline(form_data)
.through([
ValidateRequired(["name", "email"]),
ValidateEmail(),
NormalizeData(),
])
.on_failure(on_validation_error)
.then(lambda data: {"valid": True, "data": data})
)
Architecture
Project Structure
lucid-pipeline/
├── src/
│ └── lucid_pipeline/
│ ├── __init__.py # Public API exports
│ ├── pipeline.py # Pipeline class
│ ├── async_pipeline.py # AsyncPipeline class
│ ├── pipe.py # Pipe base class
│ ├── async_pipe.py # AsyncPipe base class
│ └── exceptions.py # PipelineError, PipelineFlowError
├── tests/
│ ├── __init__.py
│ ├── test_pipeline.py # Pipeline unit tests
│ ├── test_async_pipeline.py # AsyncPipeline unit tests
│ ├── test_pipe.py # Pipe class tests
│ ├── test_conditional.py # .when() / .unless() tests
│ ├── test_error_handling.py # .on_failure() tests
│ ├── test_tap.py # .tap() tests
│ └── test_edge_cases.py # Empty pipes, short-circuit, etc.
├── pyproject.toml
├── README.md
├── LICENSE # MIT License
└── CHANGELOG.md
Implementation Notes
Pipeline execution model:
The pipeline uses a reducer pattern internally. For simple function pipes, each function receives the passable and returns the transformed value. For class-based Pipe instances, the pipeline builds a nested callable chain (like middleware onion) where each pipe's handle() receives a next_pipe callback pointing to the rest of the chain.
# Internal execution for function pipes:
# result = pipe_3(pipe_2(pipe_1(passable)))
# Internal execution for class-based pipes:
# Each handle() wraps the next, forming an onion:
# pipe_1.handle(data, lambda d: pipe_2.handle(d, lambda d: pipe_3.handle(d, identity)))
Mixed pipes (functions + classes):
When a pipeline contains both plain functions and Pipe instances, the pipeline wraps plain functions in a compatibility layer that calls next_pipe automatically:
# Internally, a plain function like:
def double(x): return x * 2
# Gets wrapped as if it were:
class _WrappedDouble(Pipe):
def handle(self, data, next_pipe):
return next_pipe(double(data))
Conditional evaluation:
.when(bool_value, pipes)— ifbool_valueis a plain bool, it's evaluated at pipeline construction time. Those pipes are either included or excluded from the chain before execution..when(callable, pipes)— if it's a callable, it's evaluated at execution time with the current passable value. This enables dynamic branching based on intermediate results.
Thread safety:
Pipeline instances are NOT thread-safe. Create a new Pipeline() per request/task. The Pipe classes themselves can be shared across pipelines if they hold no mutable state.
Type hints:
The package should be fully typed with generics where practical:
from lucid_pipeline import Pipeline
# Pipeline[InputType] tracks the passable type
pipeline: Pipeline[dict] = Pipeline({"key": "value"})
Public API (what __init__.py exports)
from lucid_pipeline.pipeline import Pipeline
from lucid_pipeline.async_pipeline import AsyncPipeline
from lucid_pipeline.pipe import Pipe
from lucid_pipeline.async_pipe import AsyncPipe
from lucid_pipeline.exceptions import PipelineError, PipelineFlowError
__all__ = [
"Pipeline",
"AsyncPipeline",
"Pipe",
"AsyncPipe",
"PipelineError",
"PipelineFlowError",
]
Exceptions
| Exception | When |
|---|---|
PipelineError |
Base exception for all pipeline errors. |
PipelineFlowError |
Raised when pipeline is misconfigured (e.g., no pipes set). |
pyproject.toml Specification
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "lucid-pipeline"
version = "0.1.0"
description = "A clean, expressive pipeline pattern for Python."
readme = "README.md"
license = "MIT"
requires-python = ">=3.10"
authors = [
{ name = "Your Name", email = "your@email.com" },
]
keywords = ["pipeline", "middleware", "chain", "pipe", "data-processing", "workflow"]
classifiers = [
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Topic :: Software Development :: Libraries :: Python Modules",
"Typing :: Typed",
]
[project.urls]
Homepage = "https://github.com/yourname/lucid-pipeline"
Documentation = "https://github.com/yourname/lucid-pipeline#readme"
Repository = "https://github.com/yourname/lucid-pipeline"
Issues = "https://github.com/yourname/lucid-pipeline/issues"
[tool.pytest.ini_options]
testpaths = ["tests"]
asyncio_mode = "auto"
[tool.mypy]
strict = true
[project.optional-dependencies]
dev = ["pytest>=7.0", "pytest-asyncio>=0.21", "mypy>=1.0", "ruff>=0.1"]
Test Cases to Implement
Core Pipeline
- Empty pipeline returns passable unchanged
- Single function pipe transforms data
- Multiple function pipes execute in order
then()passes result to destination functionthen_return()returns result directly- Pipeline is immutable — calling
.through()returns same instance but appends (chainable)
Class-Based Pipes
- Pipe subclass
handle()is called with data and next_pipe - Calling
next_pipecontinues the pipeline - NOT calling
next_pipeshort-circuits (returns early) - Pipe can run logic AFTER
next_pipe(middleware pattern) - Mixed function + class pipes work together
Conditional Pipes
.when(True, pipes)includes the pipes.when(False, pipes)skips the pipes.when(callable, pipes)evaluates callable at execution time.unless(True, pipes)skips the pipes.unless(False, pipes)includes the pipes- Callable condition receives current passable value
Tap
.tap()receives current passable.tap()return value is ignored (passable unchanged).tap()exceptions propagate normally
Error Handling
- Without
on_failure, exceptions propagate normally - With
on_failure, handler receives (passable, exception) - Handler return value becomes the pipeline result
- Handler is called with the original passable, not intermediate state
Async Pipeline
- Async function pipes are awaited
- Async Pipe subclass
handle()is awaited - Sync functions work inside AsyncPipeline
.when()/.unless()/.tap()/.on_failure()all work async- Async callable conditions in
.when()are awaited
Edge Cases
- Pipeline with no pipes returns passable unchanged
- Pipeline with
Noneas passable works - Pipe that returns
Noneexplicitly passesNoneforward - Deeply nested pipelines (100+ pipes) don't stack overflow
.pipe()is equivalent to.through([single_pipe]).through()called multiple times appends correctly
Publishing to PyPI
# Install build tools
pip install build twine
# Build the package
python -m build
# Upload to TestPyPI first
twine upload --repository testpypi dist/*
# Test install from TestPyPI
pip install --index-url https://test.pypi.org/simple/ lucid-pipeline
# Upload to real PyPI
twine upload dist/*
Part of the Lucid Ecosystem
Lucid Pipeline is the first package in the Lucid ecosystem — a collection of tools bringing clarity and convention to Python development.
Coming soon:
lucid-container— Dependency injection container with autowiringlucid-cache— Multi-driver cache with a unified APIlucid-config— Cascading configuration managementlucid-events— Event dispatcher with listeners and subscribers
License
MIT License. See LICENSE for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file lucid_pipeline-0.1.0.tar.gz.
File metadata
- Download URL: lucid_pipeline-0.1.0.tar.gz
- Upload date:
- Size: 13.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.11.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
dadfd7c29e95132b63202b655a55d70522bf63045a25342fb4e342fccd38aba0
|
|
| MD5 |
f3001c70854abb793d22b0d864193ade
|
|
| BLAKE2b-256 |
ddca4bc9ca2a058283b7c32736ced8911774e526ec1b1288b5c77af111b904cb
|
File details
Details for the file lucid_pipeline-0.1.0-py3-none-any.whl.
File metadata
- Download URL: lucid_pipeline-0.1.0-py3-none-any.whl
- Upload date:
- Size: 13.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.11.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ac4a8e3cb2c2683b1472fddb1416c1b1a2f1777a7114a4b6bcd45758f713ad35
|
|
| MD5 |
bd036376d5a96534ced9826655005361
|
|
| BLAKE2b-256 |
93be087a51b96f9be82f42e10d1783ddc289539ada09b92297bf6a06fd8b72b6
|