Skip to main content

A plumbing syntax for Python

Project description

plumbum

CI PyPI Python 3.12+ License: MIT Code style: ruff

A plumbing syntax for Python that provides a clear distinction between data and operators, enabling composable function pipelines with explicit control flow.

Overview

plumbum is a library for threading data through function calls using intuitive pipe operators. Inspired by Pipe, it offers a redesigned approach with clear separation between operator definition and execution.

This is primarily a syntax library focused on making data transformations more readable and composable. It is not optimized for performance-critical applications, but rather for clarity and expressiveness in your code.

Tutorial Notebook

Walk through synchronous and asynchronous pipelines step-by-step in the Tutorial.

Key Features

  • Clear Operator/Data Distinction: Define pipelines without executing them
  • Composable Operators: Combine operators using | to build reusable pipelines
  • Threading Syntax: Use >> to thread data through operators
  • Partial Application: Build up function arguments incrementally
  • First Parameter Threading: Data is threaded into the first parameter of functions
  • Any Data Type: No assumptions about data types—works with any Python values, not just iterators
  • Future Async Support: Planned async/await compatible version

Why plumbum?

Unlike the original Pipe library, plumbum makes it easy to define operators and compose them before execution:

# In plumbum - operators can be defined and composed without data
op = multiply(2) | add(3)  # Just defining the pipeline
result = 5 >> op  # Now execute: (5 * 2) + 3 = 13

# This is not possible in Pipe - execution starts immediately when applying "|'

This design enables:

  • Defining reusable operator pipelines as values
  • Assigning complex operations to variables
  • Composing operators before knowing what data they'll process
  • Clear distinction between pipeline structure and data flow

Installation

Install using pip:

pip install habemus-papadum-plumbum

Or using uv:

uv pip install habemus-papadum-plumbum

Quick Start

from pdum.plumbum import pb

# Define operators using the @pb decorator
@pb
def add(x, n):
    return x + n

@pb
def multiply(x, n):
    return x * n

@pb
def power(x, n):
    return x ** n

# Thread data through a single operator
result = 5 >> add(3)
# 8

# Combine operators into pipelines with |
result = 5 >> (add(3) | multiply(2))
# (5 + 3) * 2 = 16

# Define reusable operator pipelines
transform = multiply(2) | add(10) | power(2)
result = 3 >> transform
# ((3 * 2) + 10) ** 2 = 256

# Operators are just values - assign and reuse them
double_and_square = multiply(2) | power(2)
5 >> double_and_square  # (5 * 2) ** 2 = 100
10 >> double_and_square  # (10 * 2) ** 2 = 400

Core Concepts

The @pb Decorator

The @pb decorator converts functions into pipe-compatible operators. The first parameter of the function is where threaded data will be inserted.

As a Function Decorator

@pb
def add(x, n):  # x receives the threaded data
    return x + n

@pb
def format_number(value, prefix="Result:", decimals=2):
    return f"{prefix} {value:.{decimals}f}"

# Threading data
5 >> add(10)  # x=5, n=10 -> 15
3.14159 >> format_number(decimals=3)  # value=3.14159 -> "Result: 3.142"

As a One-Off Wrapper

You can use pb() directly to wrap functions inline without decorating them:

# Wrap built-in functions on-the-fly
5 >> pb(print)  # Prints: 5

# Wrap lambdas for one-off operations
"hello" >> pb(lambda s: s.upper()) >> pb(print)  # Prints: HELLO

# Wrap functions with keyword arguments
@pb
def add(x, n):
    return x + n

10 >> add(5) >> pb(print)  # Prints: 15

# Chain multiple one-off wraps
data = [1, 2, 3, 4, 5]
data >> pb(lambda x: [i * 2 for i in x]) >> pb(sum) >> pb(print)  # Prints: 30

# Useful for debugging pipelines
result = (
    10
    >> add(5)
    >> pb(print)  # Debug: prints 15
    >> multiply(2)
    >> pb(print)  # Debug: prints 30
)

With Keyword Arguments

Keyword arguments work seamlessly with partial application:

@pb
def greet(name, greeting="Hello", punctuation="!"):
    return f"{greeting}, {name}{punctuation}"

# Use with keyword arguments
"Alice" >> greet(greeting="Hi")  # "Hi, Alice!"
"Bob" >> greet(punctuation=".")  # "Hello, Bob."
"Charlie" >> greet(greeting="Hey", punctuation="!!!")  # "Hey, Charlie!!!"

# Mix positional and keyword arguments
"Diana" >> greet("Greetings", punctuation="...")  # "Greetings, Diana..."

# Build incrementally with keywords
formal_greet = greet(greeting="Good day", punctuation=".")
"Elizabeth" >> formal_greet  # "Good day, Elizabeth."

Operators: | (Pipe) and >> (Thread)

  • | (pipe): Combines operators into a pipeline without executing
  • >> (thread): Threads data through an operator or pipeline
# | creates pipelines (no execution yet)
pipeline = add(1) | multiply(2) | add(3)

# >> executes the pipeline with data
result = 5 >> pipeline  # ((5 + 1) * 2) + 3 = 15

Partial Application

Call operators with arguments to partially apply them:

@pb
def add_three(x, a, b, c):
    return x + a + b + c

# Build up arguments incrementally
op = add_three(1)      # x will be threaded, a=1
op = op(2)             # Add b=2
op = op(3)             # Add c=3
result = 10 >> op      # 10 + 1 + 2 + 3 = 16

Iterable Helpers (select and where)

Use the built-in iterable operators to transform and filter collections without dropping out of pipeline composition:

from pdum.plumbum import pb, select, where

# Transform every item in an iterable
double_values = select(lambda value: value * 2)
list([1, 2, 3] >> double_values)
# [2, 4, 6]

# Keep only values that satisfy a predicate
only_evens = where(lambda value: value % 2 == 0)
list([1, 2, 3, 4, 5] >> only_evens)
# [2, 4]

# Compose transformations and filters
normalize = select(lambda value: value + 1) | where(lambda value: value % 2 == 0)
list([1, 2, 3, 4] >> normalize)
# [2, 4]

# Embed a pipeline as a function using to_function()
@pb
def add_one(value: int) -> int:
    return value + 1

@pb
def mul_two(value: int) -> int:
    return value * 2

combine = select(add_one | mul_two) | where(lambda value: value % 2 == 0)
list([1, 2, 3, 4] >> combine)
# [4, 6, 8, 10]

Async Iterable Helpers (aiter, aselect, and awhere)

The async counterparts mirror the synchronous helpers and work with both sync and async callables:

import asyncio

from pdum.plumbum import apb, pb
from pdum.plumbum.aiterops import aiter, aselect, awhere

@pb
def inc(value: int) -> int:
    return value + 1

@apb
async def async_double(value: int) -> int:
    await asyncio.sleep(0)
    return value * 2

async def main() -> list[int]:
    pipeline = (
        aiter
        | aselect(inc)  # sync mapper
        | awhere(lambda value: value % 2 == 0)  # sync predicate
        | aselect(async_double)  # async mapper
    )
    iterator = await ([1, 2, 3, 4] >> pipeline)
    return [item async for item in iterator]

asyncio.run(main())
# [4, 6]

Plain Functions as Operators

Functions are automatically wrapped when used in pipelines:

def plain_increment(x):
    return x + 3

def plain_add(x, n):
    return x + n

# Automatically wrapped in PbFunc when used with |
pipeline = multiply(2) | plain_increment  # plain_increment gets wrapped
5 >> pipeline  # (5 * 2) + 3 = 13

# Combine with functools.partial to supply extra arguments
from functools import partial

pipeline_with_extra = multiply(2) | partial(plain_add, n=3)
5 >> pipeline_with_extra  # (5 * 2) + 3 = 13

Data Type Flexibility

plumbum makes no assumptions about the data you thread through operators. Unlike libraries focused on iterators or streams, you can use any Python type:

# Works with numbers
5 >> add(3) >> multiply(2)

# Works with strings
"hello" >> pb(str.upper) >> pb(lambda s: s + "!")

# Works with dictionaries
{"a": 1, "b": 2} >> pb(lambda d: d.copy()) >> pb(lambda d: {**d, "c": 3})

# Works with custom objects
class Point:
    def __init__(self, x, y):
        self.x, self.y = x, y

@pb
def translate(point, dx, dy):
    return Point(point.x + dx, point.y + dy)

Point(1, 2) >> translate(5, 3)  # Point(6, 5)

# Works with lists (but no special iterator handling)
[1, 2, 3] >> pb(lambda lst: [x * 2 for x in lst])  # [2, 4, 6]

The data simply flows through your functions—plumbum is purely a syntax wrapper around normal function calls.

Advanced Examples

Data Processing Pipeline

from pdum.plumbum import pb

@pb
def filter_positive(numbers):
    """Generator that yields only positive numbers"""
    for n in numbers:
        if n > 0:
            yield n

@pb
def square_all(numbers):
    """Generator that yields squared values"""
    for n in numbers:
        yield n ** 2

# Define a reusable data processing pipeline
# Note: Decorated functions can be used directly (no parentheses needed)
# sum is a plain function, automatically wrapped by PbPair
process = filter_positive | square_all | sum

# Apply to different datasets
[-2, 3, -1, 4, 5] >> process  # 3² + 4² + 5² = 50
[-10, 2, -5, 6] >> process    # 2² + 6² = 40

String Processing

@pb
def strip(s):
    return s.strip()

@pb
def uppercase(s):
    return s.upper()

@pb
def replace(s, old, new):
    return s.replace(old, new)

# Build text transformation pipeline
clean_text = strip() | replace(" ", "_") | uppercase()

"  hello world  " >> clean_text  # "HELLO_WORLD"

Chaining with Built-ins

# Wrap built-in functions and compose them
pipeline = pb(str.strip) | pb(str.upper) | pb(print)
"  test  " >> pipeline  # Prints: TEST

Design Philosophy

Syntax Over Performance

plumbum prioritizes readability and composability over raw performance. Each >> operation involves some overhead from operator dispatch and wrapper objects. For performance-critical code paths:

  • Use traditional function composition
  • Consider profiling before adopting plumbum in hot loops
  • plumbum shines in data transformation scripts, exploratory code, and ETL pipelines where clarity matters more than microseconds

General Purpose Data Threading

plumbum is not an iterator/stream library. It's a general-purpose syntax for threading any data through any functions. Whether you're working with:

  • Single values (numbers, strings, objects)
  • Collections (lists, dicts, sets)
  • Custom classes
  • Mixed types in a pipeline

The library imposes zero constraints on your data types. It simply provides a cleaner syntax for f(g(h(x)))x >> h() >> g() >> f().

Roadmap

  • ✅ Core pipe operations (| and >>)
  • ✅ Partial application
  • ✅ Automatic function wrapping
  • ✅ Async/await support
  • 🔜 Additional utility operators

API Reference

@pb Decorator

Converts a function into a PbFunc operator.

Pb (Abstract Base Class)

Base class for all pipe operators. Implements | and >> operators.

PbFunc

Wraps a function for use in pipelines. Supports partial application and threading.

PbPair

Represents two operators combined with |. Threads data sequentially through both.

Development

This project uses UV for dependency management.

Setup

# Install UV if you haven't already
curl -LsSf https://astral.sh/uv/install.sh | sh

# Clone the repository
git clone https://github.com/habemus-papadum/pdum_plumbum.git
cd pdum_plumbum

# Provision the entire toolchain (uv sync, pnpm install, widget build, pre-commit hooks)
./scripts/setup.sh

Important for Development:

  • ./scripts/setup.sh is idempotent—rerun it after pulling dependency changes
  • Use uv sync --frozen to ensure the lockfile is respected when installing Python deps

Running Tests

# Run all tests
uv run pytest

# Run a specific test file
uv run pytest tests/test_example.py

# Run a specific test function
uv run pytest tests/test_example.py::test_version

# Run tests with coverage
uv run pytest --cov=src/pdum/plumbum --cov-report=xml --cov-report=term

Code Quality

# Check code with ruff
uv run ruff check .

# Format code with ruff
uv run ruff format .

# Fix auto-fixable issues
uv run ruff check --fix .

Building

# Build Python + TypeScript artifacts
./scripts/build.sh

# Or build just the Python distribution artifacts
uv build

Publishing

# Build and publish to PyPI (requires credentials)
./scripts/publish.sh

Automation scripts

  • ./scripts/setup.sh – bootstrap uv, pnpm, widget bundle, and pre-commit hooks
  • ./scripts/build.sh – reproduce the release build locally
  • ./scripts/pre-release.sh – run the full battery of quality checks
  • ./scripts/release.sh – orchestrate the release (creates tags, publishes to PyPI/GitHub)
  • ./scripts/test_notebooks.sh – execute demo notebooks (uses ./scripts/nb.sh under the hood)
  • ./scripts/setup-visual-tests.sh – install Playwright browsers for visual tests

License

MIT License - see LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

habemus_papadum_plumbum-0.3.0.tar.gz (131.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

habemus_papadum_plumbum-0.3.0-py3-none-any.whl (13.6 kB view details)

Uploaded Python 3

File details

Details for the file habemus_papadum_plumbum-0.3.0.tar.gz.

File metadata

File hashes

Hashes for habemus_papadum_plumbum-0.3.0.tar.gz
Algorithm Hash digest
SHA256 87617fbdfb13fa4cb9c84351aff1be017024f761ef645227c051ae63d33669d2
MD5 f24c52b17cfde2a00dcd8740fe6d37e0
BLAKE2b-256 d9008614a423bd23da8725abd26318e9f2b16f1ef3d7d3fe516c1b722696f43a

See more details on using hashes here.

File details

Details for the file habemus_papadum_plumbum-0.3.0-py3-none-any.whl.

File metadata

File hashes

Hashes for habemus_papadum_plumbum-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 fd17c02b7ffb9e29b86e539d780de318d556db311d6bf8e864877861c7ce535c
MD5 aaa6a07e7f10581277f172a672488156
BLAKE2b-256 576ffe879cf5951dc277c3684e423777a0227c5b673ff8ac633729738d282125

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page