Skip to main content

Python pipeline framework — composable Payload-Filter-Pipeline pattern

Project description

codeupipe

Python pipeline framework — composable Payload → Filter → Pipeline pattern with streaming support. Zero external dependencies.

Experimental successor to codeuchain (Python only).

Core Concepts

Concept Role
Payload Immutable data container flowing through the pipeline
MutablePayload Mutable sibling for performance-critical bulk edits
Filter Processing unit — takes a Payload in, returns a transformed Payload out (sync or async)
StreamFilter Streaming processing unit — receives one chunk, yields 0, 1, or N output chunks
Pipeline Orchestrator — .run() for batch, .stream() for streaming
Valve Conditional flow control — gates a Filter with a predicate
Tap Non-modifying observation point — inspect without changing (sync or async)
State Pipeline execution metadata — tracks what ran, what was skipped, errors, chunk counts
Hook Lifecycle hooks — before/after/on_error for pipeline execution (sync or async)
RetryFilter Resilience wrapper — retries a Filter up to N times before giving up

Install

pip install -e .

Quick Start

import asyncio
from codeupipe import Payload, Pipeline

# Filters can be sync or async — both work
class CleanInput:
    def call(self, payload):
        return payload.insert("text", payload.get("text", "").strip())

class Validate:
    def call(self, payload):
        if not payload.get("text"):
            raise ValueError("Empty input")
        return payload

# Build and run
pipeline = Pipeline()
pipeline.add_filter(CleanInput(), name="clean")
pipeline.add_filter(Validate(), name="validate")

result = asyncio.run(pipeline.run(Payload({"text": "  hello  "})))
print(result.get("text"))  # "hello"

Valve (Conditional Flow)

from codeupipe import Valve

class DiscountFilter:
    def call(self, payload):
        price = payload.get("price", 0)
        return payload.insert("price", price * 0.9)

# Only applies when predicate returns True
pipeline.add_filter(
    Valve("discount", DiscountFilter(), lambda p: p.get("tier") == "premium"),
    name="discount",
)

Tap (Observation)

class AuditTap:
    async def observe(self, payload):
        print(f"Payload at this point: {payload.to_dict()}")

pipeline.add_tap(AuditTap(), name="audit")

Streaming

Process an async stream of chunks through the same pipeline at constant memory.

from codeupipe import Payload, Pipeline

class UppercaseFilter:
    def call(self, payload):
        return payload.insert("name", payload.get("name", "").upper())

async def names():
    for n in ["alice", "bob", "charlie"]:
        yield Payload({"name": n})

async def main():
    pipeline = Pipeline()
    pipeline.add_filter(UppercaseFilter(), name="upper")

    async for result in pipeline.stream(names()):
        print(result.get("name"))  # ALICE, BOB, CHARLIE

asyncio.run(main())

Use StreamFilter to drop, fan-out, or batch:

from typing import AsyncIterator

class DropEmpty:
    async def stream(self, chunk: Payload) -> AsyncIterator[Payload]:
        if chunk.get("line", "").strip():
            yield chunk

class SplitWords:
    async def stream(self, chunk: Payload) -> AsyncIterator[Payload]:
        for word in chunk.get("text", "").split():
            yield Payload({"word": word})

Execution State

result = await pipeline.run(payload)
print(pipeline.state.executed)           # ['clean', 'validate']
print(pipeline.state.skipped)            # ['admin_only']
print(pipeline.state.chunks_processed)   # {'upper': 3}  (streaming mode)

Docs

Document Purpose
INDEX.md Project structure map (verified by cup doc-check)
CONCEPTS.md Full API reference with runnable examples
BEST_PRACTICES.md Project structure, naming, testing strategy
SKILL.md Agent skill reference — types, patterns, conversion

CLI (cup)

The cup command-line tool scaffolds, lints, and analyzes CUP projects:

cup new filter validate_email src/signup   # Scaffold a filter + test
cup new pipeline signup src/signup --steps validate_email hash_password
cup list                                   # Show available component types
cup bundle src/signup                      # Generate __init__.py re-exports
cup lint src/signup                        # Check CUP conventions (CUP000–CUP008)
cup coverage src/signup                    # Map component↔test coverage gaps
cup report src/signup                      # Health report with scores, orphans, staleness
cup doc-check .                            # Verify doc freshness (cup:ref markers)

Testing Utilities

codeupipe.testing provides zero-boilerplate test helpers:

from codeupipe.testing import run_filter, assert_payload, mock_filter

def test_my_filter():
    result = run_filter(MyFilter(), {"input": "data"})
    assert_payload(result, output="expected")

def test_with_mock():
    f = mock_filter(status="ok")
    result = run_filter(f, {"x": 1})
    assert f.call_count == 1

Test

pytest  # 909 tests

License

Apache 2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

codeupipe-0.1.0.tar.gz (123.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

codeupipe-0.1.0-py3-none-any.whl (69.7 kB view details)

Uploaded Python 3

File details

Details for the file codeupipe-0.1.0.tar.gz.

File metadata

  • Download URL: codeupipe-0.1.0.tar.gz
  • Upload date:
  • Size: 123.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for codeupipe-0.1.0.tar.gz
Algorithm Hash digest
SHA256 518685c1249eac6fc2712001c14fc05553be80fa9fcda363de0a758bc5af7ba0
MD5 01487a31da30dcc87d95e987185a5320
BLAKE2b-256 486c3474cf56f4e0dbd26249549813b726e1880567fcfedb3a5a10bdaff6e64d

See more details on using hashes here.

File details

Details for the file codeupipe-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: codeupipe-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 69.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for codeupipe-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 0d1d844f824b29073eb4934a8d2f9a44c5fa8d1c75e85f0915dedd00b45f17bd
MD5 b8f498bd9a666532d46e47bf09a112b7
BLAKE2b-256 719a9e00068972a8fd39baaf19ef39f4e76d42f44e4d68f980b8822912f956f5

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page