Skip to main content

Python pipeline framework — composable Payload-Filter-Pipeline pattern

Project description

codeupipe

Python pipeline framework — composable Payload → Filter → Pipeline pattern with streaming support. Zero external dependencies.

Experimental successor to codeuchain (Python only).

Core Concepts

Concept Role
Payload Immutable data container flowing through the pipeline
MutablePayload Mutable sibling for performance-critical bulk edits
Filter Processing unit — takes a Payload in, returns a transformed Payload out (sync or async)
StreamFilter Streaming processing unit — receives one chunk, yields 0, 1, or N output chunks
Pipeline Orchestrator — .run() for batch, .stream() for streaming
Valve Conditional flow control — gates a Filter with a predicate
Tap Non-modifying observation point — inspect without changing (sync or async)
State Pipeline execution metadata — tracks what ran, what was skipped, errors, chunk counts
Hook Lifecycle hooks — before/after/on_error for pipeline execution (sync or async)
RetryFilter Resilience wrapper — retries a Filter up to N times before giving up
CircuitOpenError Raised when a pipeline circuit breaker is open and rejecting calls

Install

pip install -e .

Quick Start

import asyncio
from codeupipe import Payload, Pipeline

# Filters can be sync or async — both work
class CleanInput:
    def call(self, payload):
        return payload.insert("text", payload.get("text", "").strip())

class Validate:
    def call(self, payload):
        if not payload.get("text"):
            raise ValueError("Empty input")
        return payload

# Build and run
pipeline = Pipeline()
pipeline.add_filter(CleanInput(), name="clean")
pipeline.add_filter(Validate(), name="validate")

result = asyncio.run(pipeline.run(Payload({"text": "  hello  "})))
print(result.get("text"))  # "hello"

Valve (Conditional Flow)

from codeupipe import Valve

class DiscountFilter:
    def call(self, payload):
        price = payload.get("price", 0)
        return payload.insert("price", price * 0.9)

# Only applies when predicate returns True
pipeline.add_filter(
    Valve("discount", DiscountFilter(), lambda p: p.get("tier") == "premium"),
    name="discount",
)

Tap (Observation)

class AuditTap:
    async def observe(self, payload):
        print(f"Payload at this point: {payload.to_dict()}")

pipeline.add_tap(AuditTap(), name="audit")

Streaming

Process an async stream of chunks through the same pipeline at constant memory.

from codeupipe import Payload, Pipeline

class UppercaseFilter:
    def call(self, payload):
        return payload.insert("name", payload.get("name", "").upper())

async def names():
    for n in ["alice", "bob", "charlie"]:
        yield Payload({"name": n})

async def main():
    pipeline = Pipeline()
    pipeline.add_filter(UppercaseFilter(), name="upper")

    async for result in pipeline.stream(names()):
        print(result.get("name"))  # ALICE, BOB, CHARLIE

asyncio.run(main())

Use StreamFilter to drop, fan-out, or batch:

from typing import AsyncIterator

class DropEmpty:
    async def stream(self, chunk: Payload) -> AsyncIterator[Payload]:
        if chunk.get("line", "").strip():
            yield chunk

class SplitWords:
    async def stream(self, chunk: Payload) -> AsyncIterator[Payload]:
        for word in chunk.get("text", "").split():
            yield Payload({"word": word})

Synchronous Execution

No manual asyncio.run() needed — run_sync() handles it:

pipeline = Pipeline()
pipeline.add_filter(CleanInput(), name="clean")
pipeline.add_filter(Validate(), name="validate")

result = pipeline.run_sync(Payload({"text": "  hello  "}))
print(result.get("text"))  # "hello"

Parallel Execution (Fan-out / Fan-in)

Run independent filters concurrently; results merge back into the payload:

pipeline = Pipeline()
pipeline.add_parallel([
    FetchUserFilter(),
    FetchOrdersFilter(),
    FetchRecommendationsFilter(),
], name="fan-out")

result = pipeline.run_sync(Payload({"user_id": 42}))

Pipeline Nesting

Compose pipelines from smaller pipelines:

validation = Pipeline()
validation.add_filter(CleanInput(), name="clean")
validation.add_filter(Validate(), name="validate")

processing = Pipeline()
processing.add_pipeline(validation, name="validation-sub")
processing.add_filter(TransformFilter(), name="transform")

result = processing.run_sync(Payload({"text": "  hello  "}))

Retry & Circuit Breaker

Pipeline-level resilience wrappers:

# Retry the entire pipeline up to 3 times on failure
retrying = pipeline.with_retry(max_retries=3)
result = retrying.run_sync(Payload({"input": "data"}))

# Open the circuit after 5 consecutive failures
from codeupipe import CircuitOpenError

breaker = pipeline.with_circuit_breaker(failure_threshold=5)
try:
    result = breaker.run_sync(Payload({"input": "data"}))
except CircuitOpenError:
    print("Service unavailable — circuit is open")

Execution State

result = await pipeline.run(payload)
print(pipeline.state.executed)           # ['clean', 'validate']
print(pipeline.state.skipped)            # ['admin_only']
print(pipeline.state.chunks_processed)   # {'upper': 3}  (streaming mode)

Docs

Document Purpose
INDEX.md Project structure map (verified by cup doc-check)
CONCEPTS.md Full API reference with runnable examples
BEST_PRACTICES.md Project structure, naming, testing strategy
SKILL.md Agent skill reference — types, patterns, conversion
ROADMAP.md Expansion rings — from framework to platform

CLI (cup)

The cup command-line tool scaffolds, lints, and analyzes CUP projects:

cup new filter validate_email src/signup   # Scaffold a filter + test
cup new pipeline signup src/signup --steps validate_email hash_password
cup list                                   # Show available component types
cup bundle src/signup                      # Generate __init__.py re-exports
cup lint src/signup                        # Check CUP conventions (CUP000–CUP008)
cup coverage src/signup                    # Map component↔test coverage gaps
cup report src/signup                      # Health report with scores, orphans, staleness
cup doc-check .                            # Verify doc freshness (cup:ref markers)
cup run pipeline.json --discover ./filters # Execute a pipeline from config
cup connect --list                         # Show configured connectors
cup connect --health                       # Run connector health checks
cup describe pipeline.json                 # Inspect pipeline inputs, outputs, steps
cup describe pipeline.json --json          # Machine-readable output (--json works globally)
cup distribute checkpoint cp.json --status # Manage payload checkpoints
cup distribute remote https://api.example  # Test a remote filter endpoint
cup test                                   # Smart test runner with markers
cup doctor                                 # Project health diagnostics
cup graph pipeline.json                    # Mermaid pipeline visualization
cup version --bump patch                   # Show/bump semver

Testing Utilities

codeupipe.testing provides zero-boilerplate test helpers:

from codeupipe.testing import run_filter, assert_payload, mock_filter

def test_my_filter():
    result = run_filter(MyFilter(), {"input": "data"})
    assert_payload(result, output="expected")

def test_with_mock():
    f = mock_filter(status="ok")
    result = run_filter(f, {"x": 1})
    assert f.call_count == 1

Test

pytest  # 1676 tests

License

Apache 2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

codeupipe-0.10.0.tar.gz (273.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

codeupipe-0.10.0-py3-none-any.whl (195.5 kB view details)

Uploaded Python 3

File details

Details for the file codeupipe-0.10.0.tar.gz.

File metadata

  • Download URL: codeupipe-0.10.0.tar.gz
  • Upload date:
  • Size: 273.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for codeupipe-0.10.0.tar.gz
Algorithm Hash digest
SHA256 ac91e17064af689f8822cf8cdaa7283fac3633961b10cdc58acd05fc7c28048d
MD5 90240612d38ac7a8b6b474d7e229d788
BLAKE2b-256 988d345c93a2a363f3a8a8e6aea07ba5bb571a0c3179152c45052bdad4c352de

See more details on using hashes here.

File details

Details for the file codeupipe-0.10.0-py3-none-any.whl.

File metadata

  • Download URL: codeupipe-0.10.0-py3-none-any.whl
  • Upload date:
  • Size: 195.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for codeupipe-0.10.0-py3-none-any.whl
Algorithm Hash digest
SHA256 12f56d968a26b75d9b81a9f349865ad4a5b21711f6b4f1b2733c1341e0a8b329
MD5 b98f2e00c2f4dcfaba90abf1e7819316
BLAKE2b-256 db985e909837b3e5a7cae4564fdd7da7cdf75fa44327a83605a4840f85550a98

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page