A pipeline library for Python
Project description
kpipeline
kpipeline is a simple pipeline library that allows you to write your complex application as a pipeline. This helps code organization, debugging, and testing.
How to use
kpipe is like a DSL language inside Python. It is based on "pipes" which can be combined into pipelines using the pipe primitives provided by the library.
Each pipe represents a function transforming an input into an output. Pipes are stateless and immutable (with the exception of certain async only pipes). Below is an example defining two example pipes.
from kpipeline import Pipe
class AddOnePipe(Pipe[int, int, None]):
def apply(self, input: int, metadata: None) -> int:
return input + 1
class MulByTwoPipe(Pipe[int, int, None]):
def apply(self, input: int, metadata: None) -> int:
return input * 2
These pipes can be combined using the ChainPipe primitive (aliased into the | operator) two form a pipeline that performs these two pipes sequentially:
pipeline = AddOnePipe() | MulByTwoPipe()
# or
from kpipeline import ChainPipe
pipeline = ChainPipe(AddOnePipe(), MulByTwoPipe())
print(pipeline.apply(2)) # 6
Currently, the library defines these primitives:
| Primitive | Purpose |
|---|---|
ChainPipe |
Execute two pipes sequentially |
ConditionalPipe |
Execute a pipe if a given condition is true |
BranchPipe |
Execute one of two pipes depending on whether the condition is true or false |
SelectPipe |
Execute one of multiple pipes based on a selector key |
ParallelPipe |
Execute multiple pipes and combine their results (only async implementation is actually parallel) |
MetadataWrapperPipe |
Transform the metadata given to the pipeline for the subpipe |
MapPipe |
Apply a subpipe into a sequence of inputs |
FilterPipe |
Filter a sequence of inputs using a predicate |
RetryPipe |
Run a pipe multiple times in case it fails |
FallbackPipe |
Runs a fallback pipe in case the subpipe fails |
AsyncBatchCollectorPipe |
Waits for a specific number of seconds, accumulating incoming data, and then processes them as a batch (async only, stateful) |
AsyncSemaphorePipe |
Limits the number of concurrent calls to the subpipe (async only, stateful) |
AsyncTimeoutPipe |
Cancels the subpipe after a certain amount of time (async only) |
Tests
You can run all tests in this repository with
uv run pytest test
uv run mypy .
License
TBD
AI Use Disclosure
I dislike writing tests so unit tests have been generated by Gemma 4 31B / Claude Opus 4.6 for your convenience. They are provided in the hope that they are better than nothing.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file kpipeline-1.2.tar.gz.
File metadata
- Download URL: kpipeline-1.2.tar.gz
- Upload date:
- Size: 14.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.6 {"installer":{"name":"uv","version":"0.11.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Fedora Linux Asahi Remix","version":"43","id":"","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
16690d3346c3e8cbe0db5839dd29b2ca731fba4e3bbc6c56de1f0db6169fc7c4
|
|
| MD5 |
077dcb651dc45bac174f79d40308d3f5
|
|
| BLAKE2b-256 |
2ff64b3a1e5d11602c6422f3754d339b7d228de9e21789dcba99ae38425ed1fe
|
File details
Details for the file kpipeline-1.2-py3-none-any.whl.
File metadata
- Download URL: kpipeline-1.2-py3-none-any.whl
- Upload date:
- Size: 13.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.6 {"installer":{"name":"uv","version":"0.11.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Fedora Linux Asahi Remix","version":"43","id":"","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2351d97fb36bf890f377494e83916d4da99e83ba9c9b569d467f22a9bdd37362
|
|
| MD5 |
77a6af2979eb257e720b0f7c687878bc
|
|
| BLAKE2b-256 |
e6a04ae75e40b6a7e4bae41c66b9af7c8ad28569360a6661825325a83e547b68
|