Skip to main content

A pipeline library for Python

Project description

kpipeline

kpipeline is a simple pipeline library that allows you to write your complex application as a pipeline. This helps code organization, debugging, and testing.

How to use

kpipe is like a DSL language inside Python. It is based on "pipes" which can be combined into pipelines using the pipe primitives provided by the library.

Each pipe represents a function transforming an input into an output. Pipes are stateless and immutable (with the exception of certain async only pipes). Below is an example defining two example pipes.

from kpipeline import Pipe

class AddOnePipe(Pipe[int, int, None]):
    def apply(self, input: int, metadata: None) -> int:
        return input + 1


class MulByTwoPipe(Pipe[int, int, None]):
    def apply(self, input: int, metadata: None) -> int:
        return input * 2

These pipes can be combined using the ChainPipe primitive (aliased into the | operator) two form a pipeline that performs these two pipes sequentially:

pipeline = AddOnePipe() | MulByTwoPipe()

# or

from kpipeline import ChainPipe
pipeline = ChainPipe(AddOnePipe(), MulByTwoPipe())

print(pipeline.apply(2))  # 6

Currently, the library defines these primitives:

Primitive Purpose
ChainPipe Execute two pipes sequentially
ConditionalPipe Execute a pipe if a given condition is true
BranchPipe Execute one of two pipes depending on whether the condition is true or false
SelectPipe Execute one of multiple pipes based on a selector key
ParallelPipe Execute multiple pipes and combine their results (only async implementation is actually parallel)
MetadataWrapperPipe Transform the metadata given to the pipeline for the subpipe
MapPipe Apply a subpipe into a sequence of inputs
FilterPipe Filter a sequence of inputs using a predicate
RetryPipe Run a pipe multiple times in case it fails
FallbackPipe Runs a fallback pipe in case the subpipe fails
AsyncBatchCollectorPipe Waits for a specific number of seconds, accumulating incoming data, and then processes them as a batch (async only, stateful)
AsyncSemaphorePipe Limits the number of concurrent calls to the subpipe (async only, stateful)
AsyncTimeoutPipe Cancels the subpipe after a certain amount of time (async only)

Tests

You can run all tests in this repository with

uv run pytest test
uv run mypy .

License

TBD

AI Use Disclosure

I dislike writing tests so unit tests have been generated by Gemma 4 31B / Claude Opus 4.6 for your convenience. They are provided in the hope that they are better than nothing.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kpipeline-1.2.tar.gz (14.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kpipeline-1.2-py3-none-any.whl (13.8 kB view details)

Uploaded Python 3

File details

Details for the file kpipeline-1.2.tar.gz.

File metadata

  • Download URL: kpipeline-1.2.tar.gz
  • Upload date:
  • Size: 14.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.6 {"installer":{"name":"uv","version":"0.11.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Fedora Linux Asahi Remix","version":"43","id":"","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for kpipeline-1.2.tar.gz
Algorithm Hash digest
SHA256 16690d3346c3e8cbe0db5839dd29b2ca731fba4e3bbc6c56de1f0db6169fc7c4
MD5 077dcb651dc45bac174f79d40308d3f5
BLAKE2b-256 2ff64b3a1e5d11602c6422f3754d339b7d228de9e21789dcba99ae38425ed1fe

See more details on using hashes here.

File details

Details for the file kpipeline-1.2-py3-none-any.whl.

File metadata

  • Download URL: kpipeline-1.2-py3-none-any.whl
  • Upload date:
  • Size: 13.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.6 {"installer":{"name":"uv","version":"0.11.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Fedora Linux Asahi Remix","version":"43","id":"","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for kpipeline-1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 2351d97fb36bf890f377494e83916d4da99e83ba9c9b569d467f22a9bdd37362
MD5 77a6af2979eb257e720b0f7c687878bc
BLAKE2b-256 e6a04ae75e40b6a7e4bae41c66b9af7c8ad28569360a6661825325a83e547b68

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page