Python pipeline framework — composable Payload-Filter-Pipeline pattern
Project description
codeupipe
Python pipeline framework — composable Payload → Filter → Pipeline pattern with streaming support. Zero external dependencies.
Experimental successor to codeuchain (Python only).
Core Concepts
| Concept | Role |
|---|---|
| Payload | Immutable data container flowing through the pipeline |
| MutablePayload | Mutable sibling for performance-critical bulk edits |
| Filter | Processing unit — takes a Payload in, returns a transformed Payload out (sync or async) |
| StreamFilter | Streaming processing unit — receives one chunk, yields 0, 1, or N output chunks |
| Pipeline | Orchestrator — .run() for batch, .stream() for streaming |
| Valve | Conditional flow control — gates a Filter with a predicate |
| Tap | Non-modifying observation point — inspect without changing (sync or async) |
| State | Pipeline execution metadata — tracks what ran, what was skipped, errors, chunk counts |
| Hook | Lifecycle hooks — before/after/on_error for pipeline execution (sync or async) |
| RetryFilter | Resilience wrapper — retries a Filter up to N times before giving up |
| CircuitOpenError | Raised when a pipeline circuit breaker is open and rejecting calls |
Install
pip install -e .
Quick Start
import asyncio
from codeupipe import Payload, Pipeline
# Filters can be sync or async — both work
class CleanInput:
def call(self, payload):
return payload.insert("text", payload.get("text", "").strip())
class Validate:
def call(self, payload):
if not payload.get("text"):
raise ValueError("Empty input")
return payload
# Build and run
pipeline = Pipeline()
pipeline.add_filter(CleanInput(), name="clean")
pipeline.add_filter(Validate(), name="validate")
result = asyncio.run(pipeline.run(Payload({"text": " hello "})))
print(result.get("text")) # "hello"
Valve (Conditional Flow)
from codeupipe import Valve
class DiscountFilter:
def call(self, payload):
price = payload.get("price", 0)
return payload.insert("price", price * 0.9)
# Only applies when predicate returns True
pipeline.add_filter(
Valve("discount", DiscountFilter(), lambda p: p.get("tier") == "premium"),
name="discount",
)
Tap (Observation)
class AuditTap:
async def observe(self, payload):
print(f"Payload at this point: {payload.to_dict()}")
pipeline.add_tap(AuditTap(), name="audit")
Streaming
Process an async stream of chunks through the same pipeline at constant memory.
from codeupipe import Payload, Pipeline
class UppercaseFilter:
def call(self, payload):
return payload.insert("name", payload.get("name", "").upper())
async def names():
for n in ["alice", "bob", "charlie"]:
yield Payload({"name": n})
async def main():
pipeline = Pipeline()
pipeline.add_filter(UppercaseFilter(), name="upper")
async for result in pipeline.stream(names()):
print(result.get("name")) # ALICE, BOB, CHARLIE
asyncio.run(main())
Use StreamFilter to drop, fan-out, or batch:
from typing import AsyncIterator
class DropEmpty:
async def stream(self, chunk: Payload) -> AsyncIterator[Payload]:
if chunk.get("line", "").strip():
yield chunk
class SplitWords:
async def stream(self, chunk: Payload) -> AsyncIterator[Payload]:
for word in chunk.get("text", "").split():
yield Payload({"word": word})
Synchronous Execution
No manual asyncio.run() needed — run_sync() handles it:
pipeline = Pipeline()
pipeline.add_filter(CleanInput(), name="clean")
pipeline.add_filter(Validate(), name="validate")
result = pipeline.run_sync(Payload({"text": " hello "}))
print(result.get("text")) # "hello"
Parallel Execution (Fan-out / Fan-in)
Run independent filters concurrently; results merge back into the payload:
pipeline = Pipeline()
pipeline.add_parallel([
FetchUserFilter(),
FetchOrdersFilter(),
FetchRecommendationsFilter(),
], name="fan-out")
result = pipeline.run_sync(Payload({"user_id": 42}))
Pipeline Nesting
Compose pipelines from smaller pipelines:
validation = Pipeline()
validation.add_filter(CleanInput(), name="clean")
validation.add_filter(Validate(), name="validate")
processing = Pipeline()
processing.add_pipeline(validation, name="validation-sub")
processing.add_filter(TransformFilter(), name="transform")
result = processing.run_sync(Payload({"text": " hello "}))
Retry & Circuit Breaker
Pipeline-level resilience wrappers:
# Retry the entire pipeline up to 3 times on failure
retrying = pipeline.with_retry(max_retries=3)
result = retrying.run_sync(Payload({"input": "data"}))
# Open the circuit after 5 consecutive failures
from codeupipe import CircuitOpenError
breaker = pipeline.with_circuit_breaker(failure_threshold=5)
try:
result = breaker.run_sync(Payload({"input": "data"}))
except CircuitOpenError:
print("Service unavailable — circuit is open")
Execution State
result = await pipeline.run(payload)
print(pipeline.state.executed) # ['clean', 'validate']
print(pipeline.state.skipped) # ['admin_only']
print(pipeline.state.chunks_processed) # {'upper': 3} (streaming mode)
Docs
| Document | Purpose |
|---|---|
| INDEX.md | Project structure map (verified by cup doc-check) |
| CONCEPTS.md | Full API reference with runnable examples |
| BEST_PRACTICES.md | Project structure, naming, testing strategy |
| SKILL.md | Agent skill reference — types, patterns, conversion |
| ROADMAP.md | Expansion rings — from framework to platform |
CLI (cup)
The cup command-line tool scaffolds, lints, and analyzes CUP projects:
cup new filter validate_email src/signup # Scaffold a filter + test
cup new pipeline signup src/signup --steps validate_email hash_password
cup list # Show available component types
cup bundle src/signup # Generate __init__.py re-exports
cup lint src/signup # Check CUP conventions (CUP000–CUP008)
cup coverage src/signup # Map component↔test coverage gaps
cup report src/signup # Health report with scores, orphans, staleness
cup doc-check . # Verify doc freshness (cup:ref markers)
cup run pipeline.json --discover ./filters # Execute a pipeline from config
cup connect --list # Show configured connectors
cup connect --health # Run connector health checks
cup describe pipeline.json # Inspect pipeline inputs, outputs, steps
cup describe pipeline.json --json # Machine-readable output (--json works globally)
cup distribute checkpoint cp.json --status # Manage payload checkpoints
cup distribute remote https://api.example # Test a remote filter endpoint
cup test # Smart test runner with markers
cup doctor # Project health diagnostics
cup graph pipeline.json # Mermaid pipeline visualization
cup version --bump patch # Show/bump semver
Testing Utilities
codeupipe.testing provides zero-boilerplate test helpers:
from codeupipe.testing import run_filter, assert_payload, mock_filter
def test_my_filter():
result = run_filter(MyFilter(), {"input": "data"})
assert_payload(result, output="expected")
def test_with_mock():
f = mock_filter(status="ok")
result = run_filter(f, {"x": 1})
assert f.call_count == 1
Test
pytest # 1676 tests
License
Apache 2.0
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file codeupipe-0.10.0.tar.gz.
File metadata
- Download URL: codeupipe-0.10.0.tar.gz
- Upload date:
- Size: 273.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ac91e17064af689f8822cf8cdaa7283fac3633961b10cdc58acd05fc7c28048d
|
|
| MD5 |
90240612d38ac7a8b6b474d7e229d788
|
|
| BLAKE2b-256 |
988d345c93a2a363f3a8a8e6aea07ba5bb571a0c3179152c45052bdad4c352de
|
File details
Details for the file codeupipe-0.10.0-py3-none-any.whl.
File metadata
- Download URL: codeupipe-0.10.0-py3-none-any.whl
- Upload date:
- Size: 195.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
12f56d968a26b75d9b81a9f349865ad4a5b21711f6b4f1b2733c1341e0a8b329
|
|
| MD5 |
b98f2e00c2f4dcfaba90abf1e7819316
|
|
| BLAKE2b-256 |
db985e909837b3e5a7cae4564fdd7da7cdf75fa44327a83605a4840f85550a98
|