Python pipeline framework — composable Payload-Filter-Pipeline pattern
Project description
codeupipe
Python pipeline framework — composable Payload → Filter → Pipeline pattern with streaming support. Zero external dependencies.
Experimental successor to codeuchain (Python only).
Core Concepts
| Concept | Role |
|---|---|
| Payload | Immutable data container flowing through the pipeline |
| MutablePayload | Mutable sibling for performance-critical bulk edits |
| Filter | Processing unit — takes a Payload in, returns a transformed Payload out (sync or async) |
| StreamFilter | Streaming processing unit — receives one chunk, yields 0, 1, or N output chunks |
| Pipeline | Orchestrator — .run() for batch, .stream() for streaming |
| Valve | Conditional flow control — gates a Filter with a predicate |
| Tap | Non-modifying observation point — inspect without changing (sync or async) |
| State | Pipeline execution metadata — tracks what ran, what was skipped, errors, chunk counts |
| Hook | Lifecycle hooks — before/after/on_error for pipeline execution (sync or async) |
| RetryFilter | Resilience wrapper — retries a Filter up to N times before giving up |
Install
pip install -e .
Quick Start
import asyncio
from codeupipe import Payload, Pipeline
# Filters can be sync or async — both work
class CleanInput:
def call(self, payload):
return payload.insert("text", payload.get("text", "").strip())
class Validate:
def call(self, payload):
if not payload.get("text"):
raise ValueError("Empty input")
return payload
# Build and run
pipeline = Pipeline()
pipeline.add_filter(CleanInput(), name="clean")
pipeline.add_filter(Validate(), name="validate")
result = asyncio.run(pipeline.run(Payload({"text": " hello "})))
print(result.get("text")) # "hello"
Valve (Conditional Flow)
from codeupipe import Valve
class DiscountFilter:
def call(self, payload):
price = payload.get("price", 0)
return payload.insert("price", price * 0.9)
# Only applies when predicate returns True
pipeline.add_filter(
Valve("discount", DiscountFilter(), lambda p: p.get("tier") == "premium"),
name="discount",
)
Tap (Observation)
class AuditTap:
async def observe(self, payload):
print(f"Payload at this point: {payload.to_dict()}")
pipeline.add_tap(AuditTap(), name="audit")
Streaming
Process an async stream of chunks through the same pipeline at constant memory.
from codeupipe import Payload, Pipeline
class UppercaseFilter:
def call(self, payload):
return payload.insert("name", payload.get("name", "").upper())
async def names():
for n in ["alice", "bob", "charlie"]:
yield Payload({"name": n})
async def main():
pipeline = Pipeline()
pipeline.add_filter(UppercaseFilter(), name="upper")
async for result in pipeline.stream(names()):
print(result.get("name")) # ALICE, BOB, CHARLIE
asyncio.run(main())
Use StreamFilter to drop, fan-out, or batch:
from typing import AsyncIterator
class DropEmpty:
async def stream(self, chunk: Payload) -> AsyncIterator[Payload]:
if chunk.get("line", "").strip():
yield chunk
class SplitWords:
async def stream(self, chunk: Payload) -> AsyncIterator[Payload]:
for word in chunk.get("text", "").split():
yield Payload({"word": word})
Execution State
result = await pipeline.run(payload)
print(pipeline.state.executed) # ['clean', 'validate']
print(pipeline.state.skipped) # ['admin_only']
print(pipeline.state.chunks_processed) # {'upper': 3} (streaming mode)
Docs
| Document | Purpose |
|---|---|
| INDEX.md | Project structure map (verified by cup doc-check) |
| CONCEPTS.md | Full API reference with runnable examples |
| BEST_PRACTICES.md | Project structure, naming, testing strategy |
| SKILL.md | Agent skill reference — types, patterns, conversion |
CLI (cup)
The cup command-line tool scaffolds, lints, and analyzes CUP projects:
cup new filter validate_email src/signup # Scaffold a filter + test
cup new pipeline signup src/signup --steps validate_email hash_password
cup list # Show available component types
cup bundle src/signup # Generate __init__.py re-exports
cup lint src/signup # Check CUP conventions (CUP000–CUP008)
cup coverage src/signup # Map component↔test coverage gaps
cup report src/signup # Health report with scores, orphans, staleness
cup doc-check . # Verify doc freshness (cup:ref markers)
Testing Utilities
codeupipe.testing provides zero-boilerplate test helpers:
from codeupipe.testing import run_filter, assert_payload, mock_filter
def test_my_filter():
result = run_filter(MyFilter(), {"input": "data"})
assert_payload(result, output="expected")
def test_with_mock():
f = mock_filter(status="ok")
result = run_filter(f, {"x": 1})
assert f.call_count == 1
Test
pytest # 909 tests
License
Apache 2.0
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file codeupipe-0.1.0.tar.gz.
File metadata
- Download URL: codeupipe-0.1.0.tar.gz
- Upload date:
- Size: 123.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
518685c1249eac6fc2712001c14fc05553be80fa9fcda363de0a758bc5af7ba0
|
|
| MD5 |
01487a31da30dcc87d95e987185a5320
|
|
| BLAKE2b-256 |
486c3474cf56f4e0dbd26249549813b726e1880567fcfedb3a5a10bdaff6e64d
|
File details
Details for the file codeupipe-0.1.0-py3-none-any.whl.
File metadata
- Download URL: codeupipe-0.1.0-py3-none-any.whl
- Upload date:
- Size: 69.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0d1d844f824b29073eb4934a8d2f9a44c5fa8d1c75e85f0915dedd00b45f17bd
|
|
| MD5 |
b8f498bd9a666532d46e47bf09a112b7
|
|
| BLAKE2b-256 |
719a9e00068972a8fd39baaf19ef39f4e76d42f44e4d68f980b8822912f956f5
|