Skip to main content

A pipeline library for Python

Project description

kpipe

kpipe is a simple pipeline library that allows you to write your complex application as a pipeline. This helps code organization, debugging, and testing.

How to use

kpipe is like a DSL language inside Python. It is based on "pipes" which can be combined into pipelines using the pipe primitives provided by the library.

Each pipe represents a function transforming an input into an output. Pipes are stateless and immutable. Below is an example defining two example pipes.

from kpipeline import Pipe

class AddOnePipe(Pipe[int, int, None]):
    def apply(self, input: int, metadata: None) -> int:
        return input + 1


class MulByTwoPipe(Pipe[int, int, None]):
    def apply(self, input: int, metadata: None) -> int:
        return input * 2

These pipes can be combined using the ChainPipe primitive (aliased into the | operator) two form a pipeline that performs these two pipes sequentially:

pipeline = AddOnePipe() | MulByTwoPipe()

# or

from kpipeline import ChainPipe
pipeline = ChainPipe(AddOnePipe(), MulByTwoPipe())

print(pipeline.apply(2))  # 6

Currently, the library defines these primitives:

Primitive Purpose
ChainPipe Execute two pipes sequentially
ConditionalPipe Execute a pipe if a given condition is true
BranchPipe Execute one of two pipes depending on whether the condition is true or false
SelectPipe Execute one of multiple pipes based on a selector key
ParallelPipe Execute multiple pipes and combine their results (only async implementation is actually parallel)
MetadataWrapperPipe Transform the metadata given to the pipeline for the subpipe
MapPipe Apply a subpipe into a sequence of inputs
FilterPipe Filter a sequence of inputs using a predicate
RetryPipe Run a pipe multiple times in case it fails
AsyncBatchCollectorPipe Waits for a specific number of seconds, accumulating incoming data, and then processes them as a batch (async only)

Tests

You can run all tests in this repository with

uv run pytest test
uv run mypy .

License

TBD

AI Use Disclosure

I hate writing tests so unit tests have been generated by Gemma 4 31B for your convenience. They are provided in the hope that they are better than nothing.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kpipeline-1.1.1.tar.gz (10.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kpipeline-1.1.1-py3-none-any.whl (11.3 kB view details)

Uploaded Python 3

File details

Details for the file kpipeline-1.1.1.tar.gz.

File metadata

  • Download URL: kpipeline-1.1.1.tar.gz
  • Upload date:
  • Size: 10.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.6 {"installer":{"name":"uv","version":"0.11.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Fedora Linux Asahi Remix","version":"43","id":"","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for kpipeline-1.1.1.tar.gz
Algorithm Hash digest
SHA256 7bd6f199987a88fffc14d92733d26f96f4f2ee9da61449b9d9bc46ec41748da8
MD5 1ed08dab0e3af7a1b208dc1701cd8272
BLAKE2b-256 c6fd43f76d5e17d81c4279aebf6f676af9abcba4ffe34790551d333b1188b307

See more details on using hashes here.

File details

Details for the file kpipeline-1.1.1-py3-none-any.whl.

File metadata

  • Download URL: kpipeline-1.1.1-py3-none-any.whl
  • Upload date:
  • Size: 11.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.6 {"installer":{"name":"uv","version":"0.11.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Fedora Linux Asahi Remix","version":"43","id":"","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for kpipeline-1.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 ca5e41b90db2f786538934b5dcf5e066f61c7b48874b22ff7676cda52598b9b8
MD5 83a438af56ef6d015d42beba6d66c09d
BLAKE2b-256 3dcdfd4e991b59a4fefb20104b53d6c91622f44c3bbb7ee06aa48d592dbb329b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page