Skip to main content

Brings functional programming data pipelines with robust validation to python and mypy

Project description

checkpipe

To bring functional programming data pipelines with robust validation to python and mypy

License: MIT Buy me a Coffee

Built with ❤︎ by Mohammed Alzakariya

Why checkpipe?

One problem is trying to express python functions in terms of dataflows. Think of a function that progresses in stages like the following:

source_input -> filter -> transform -> filter -> sum

Dataflows can be more naturally represented with infix notation, with the preceding stage leading to the following stage through chaining. But in python we would find ourselves writing

sum(filter(transform(filter(source_input))))

which is not very handy. Another approach would be creating placeholder variables to store each stage, but this also introduces unnecessary state. If this state is mutable, it goes against the principle of purity in functional programming and the function does not clearly denote a mathematical function.

In data analysis and ETL contexts, we may have to build large dataflows so a better approach is necessary.

a major inspiration for this project and a project which solves the above problem is pipe by Julien Palard. It allows for infix notation and gives a simple @Pipe decorator to extend this to any functionality the user needs.

This project aims to build on Julien Palard's project, but with new design considerations:

  • Intellisense and mypy friendly: No untyped lambdas, get full autocomplete ability and type checking by mypy.
  • Extended built-in support for error-checking which is integrated into the dataflow. This integrates with the rustedpy/Result which brings Rust-like Result[T, E] into python.

The project aims to make it easier to write pure python functions with robust error-checking and all the benefits of static analysis tools like mypy.

Install

pip install checkpipe

Use Cases

Basic filtering and mapping

import checkpipe as pipe

print(
    [1, 2, 3]
        | pipe.OfIter[int].map(lambda n: 
            n * 2
        )
        | pipe.OfIter[int].filter(lambda n: 
            n != 4
        )
        | pipe.OfIter[int].to_list()
)
[2, 6]

The above example takes a source input [1, 2, 3] and transforms it by multiplying each value by 2 into, then keeping only results that aren't 4 and finally consuming this lazy iterator chain into a list result.

When using checkpipe, we are relying on specifying the type of the source in order for our lambdas to be typed. [1, 2, 3] is a List[int] and also can be iterated through as an Iterable[int]. Working with this type of source, we use pipe.OfIter[int]. This makes use of generics to give us expectations on the signature of the higher order functions passed to functions like .map and .filter. These expectations can be automatically checked by mypy. And vscode is able to know that n is an integer in the lambdas.

Direct transformations outside iterators

import checkpipe as pipe

print(
    3
        | pipe.Of[int].to(lambda n: 
            n+1
        )
)
4

checkpipe does not only work with iterators. It works directly with types and allows transformations to the source object as well. In this case, no consumption of an iterator is jnecessary. .to(...) will return the transformed source directly.

Basic validation in dataflows

import checkpipe as pipe
from result import Result

print(
    [1, 2, 3]
        | pipe.OfIter[int].map(lambda n: 
            n * 2
        )
        | pipe.OfIter[int].check(lambda n: 
            n != 4
        )
        | pipe.OfIter[Result[int, int]].to_list()
)
[Ok(2), Err(4), Ok(6)]

Here, we are able to use .OfIter[int].check to apply a tag on all values in the source. Ok[int] when they pass the check n != 4 otherwise Err[int]. This allows us to propogate errors and handle errors in the pipeline itself. Note that when we're consuming the iterator pipeline with .to_list(), we are referring to a new source Iterator[Result[int, int]] to reflect the Ok/Err tagging.

We can now proceed to perform more computations on the Ok[int] results only:

import checkpipe as pipe
from result import Result

print(
    [1, 2, 3]
        | pipe.OfIter[int].map(lambda n:
            n * 2
        )
        | pipe.OfIter[int].check(lambda n: 
            n != 4
        )
        | pipe.OfResultIter[int, int].on_ok(lambda n: 
            n + 1
        )
        | pipe.OfIter[Result[int, int]].to_list()
)
[Ok(3), Err(4), Ok(7)]

Here, .OfResultIter[int, int] works with an iterable of Results as a source, and only when it detects an Ok, it performs the computation n+1. So we can see that Ok(2) became Ok(3) and Ok(6) became Ok(7), but Err(4) remains untouched.

We can also use a different type for the error:

import checkpipe as pipe
from result import Result

print(
    [1, 2, 3, 4]
        | pipe.OfIter[int].map(lambda n: 
            n + 2
        )
        | pipe.OfResultIter[int, str].check(
            lambda n: n % 2 != 0,
            lambda n: f'Evens like {n} are not allowd!')
        | pipe.OfIter[Result[int, str]].to_list()
)
[Ok(3), Err('Evens like 4 are not allowd!'), Ok(5), Err('Evens like 6 are not allowd!')]

Here OfResultIter[int, str] specifies that errors will be in type str and Ok is in type int. It takes two functions, a predicate to check if the int is okay, and a function that maps from that int to some error message. We can then continue processing on just the Ok[int] results with .on_ok(...) just like before:

import checkpipe as pipe
from result import Result

print(
    [1, 2, 3, 4]
        | pipe.OfIter[int].map(lambda n: 
            n + 2
        )
        | pipe.OfResultIter[int, str].check(
            lambda n: n % 2 != 0,
            lambda n: f'Evens like {n} are not allowd!')
        | pipe.OfResultIter[int, str].on_ok(lambda n: 
            n * 10
        )
        | pipe.OfIter[Result[int, str]].to_list()
)
[Ok(30), Err('Evens like 4 are not allowd!'), Ok(50), Err('Evens like 6 are not allowd!')]

We can also chain multiple checks in a row, keeping in mind that checks on Result[T, E] use the then_check variants while checks on T use check.

import checkpipe as pipe
from result import Result

print(
    [1, 2, 3, 4]
        | pipe.OfIter[int].map(lambda n: 
            n + 2
        )
        | pipe.OfResultIter[int, str].check(
            lambda n: n % 2 != 0,
            lambda n: f'Evens like {n} are not allowd!')
        | pipe.OfResultIter[int, str].then_check(
            lambda n: n != 3,
            lambda _: 'The number 3 is specifically not welcome!')
        | pipe.OfResultIter[int, str].on_ok(lambda n: 
            n * 10
        )
        | pipe.OfIter[Result[int, str]].to_list()
)
[Err('The number 3 is specifically not welcome!'), Err('Evens like 4 are not allowd!'), Ok(50), Err('Evens like 6 are not allowd!')]

Sometimes doing a check requires finding a problematic aspect of the source object. For this, we use the check_using functions, which take a finder callback which returns None if it finds nothing problematic, it just tags the source Ok. But if it does find something problematic, it uses the problematic object to create an Err object.

import checkpipe as pipe
from result import Result

def find_capitalized_word(s: str) -> Optional[str]:
    words = s.split(' ')

    for word in words:
        if str.isupper(word):
            return word
    
    return None

print(
    [ 
        'this string contains no CAPITALIZED words!',
        'this one is all good!'
    ]
    | pipe.OfResultIter[str, str].check_using(
        find_capitalized_word,
        lambda cap_word: f'Bad! You used a capitalized word: {cap_word}')
    | pipe.OfIter[Result[str, str]].to_list()
)
[Err('Bad! You used a capitalized word: CAPITALIZED'), Ok('this one is all good!')]

Unpacking tuples

checkpipe comes with support for unpacking tuples of limited size while specifying the types of each element:

import checkpipe as pipe

print(
    (4, 2, 'Hello ')
        | pipe.OfUnpack3[int, int, str].unpack(
              lambda num_spaces, repeat, text: 
                  '"' + ' ' * num_spaces + repeat * text + '"'
        )
)
"    Hello Hello "

Creating a new Pipe function

import checkpipe as pipe
from checkpipe import Pipe
from typing import Callable, Iterable

@Pipe
def multiply_by_num(num: int) -> Callable[[Iterable[int]], Iterable[int]]:
    def inner(source: Iterable[int]) -> Iterable[int]:
        return map(lambda n: n * num, source)
    return inner

print(
    [1, 2, 3]
        | multiply_by_num(3)
        | pipe.OfIter[int].to_list()
)
[3, 6, 9]

Here we create a new function that could utilize the pipe operator |, multiply_by_num. It defines an inner function which takes a source, Iterable[int], and it maps it to another Iterable[int] via the builtin map function.

If we want to utilize generics to create a more type-general pipe function, we could use typevars to infer types from the arguments passed into the function. If we want to inform the function about a more generic source type, we can wrap it in a class then inform of it the expected source type through the class like this:

import checkpipe as pipe
from checkpipe import Pipe
from typing import Generic, TypeVar, Callable, Iterable

T = TypeVar('T')

class Repeat(Generic[T]):
    @Pipe
    @staticmethod
    def repeat(n: int) -> Callable[[Iterable[T]], Iterable[T]]:
        def inner(source: Iterable[T]) -> Iterable[T]:
            for item in source:
                for _ in range(n):
                    yield item
        return inner

print(
    ['a', 'b', 'c']
        | Repeat[str].repeat(3)
        | pipe.OfIter[str].to_list()
)
['a', 'a', 'a', 'b', 'b', 'b', 'c', 'c', 'c']

The pipes are type-safe and they can be checked by mypy. checkpipe cannot automatically infer the source type from the left of the |. By specifiying Repeat[str], mypy knows that when the source ['a', 'b', 'c'] is piped to Repeat, that it must comply to being an Iterable[str] or mypy will error.

Todo

  • Implement similar default pipes to Julien Palard's project to facilitate transition
  • Implement unit testing for all functions of this module

Sponsorship

If this project brings value to you, please consider supporting me with a monthly sponsorship or buying me a coffee

🎉 Credits

  • Thanks to Julien Palard for the pipe library which was a major inspiration for this project.
  • Thanks to jeffreystse for the README style.

Contributing

All contributions are welcome! I would appreciate feedback on improving the library and optimizing for use cases I haven't thought of yet! Please feel free to contact me by opening an issue ticket or emailing lanhikarixx@gmail.com if you want to chat.

License

This theme is licensed under the MIT license © Mohammed Alzakariya.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

checkpipe-1.0.5.tar.gz (12.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

checkpipe-1.0.5-py3-none-any.whl (12.2 kB view details)

Uploaded Python 3

File details

Details for the file checkpipe-1.0.5.tar.gz.

File metadata

  • Download URL: checkpipe-1.0.5.tar.gz
  • Upload date:
  • Size: 12.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.2

File hashes

Hashes for checkpipe-1.0.5.tar.gz
Algorithm Hash digest
SHA256 a760317bda5f4c1fb04e1cedaaa3eb1544687f77b6d6053fa42bb14662a104c6
MD5 b8584f5de818375024aa7fea9e51f8a5
BLAKE2b-256 d68e8bc35808cf5c3234226d4799504b07a0866f5d9f134582498851537a6851

See more details on using hashes here.

File details

Details for the file checkpipe-1.0.5-py3-none-any.whl.

File metadata

  • Download URL: checkpipe-1.0.5-py3-none-any.whl
  • Upload date:
  • Size: 12.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.2

File hashes

Hashes for checkpipe-1.0.5-py3-none-any.whl
Algorithm Hash digest
SHA256 9a6f0895aeab34eb4b176761b2018fba3ceff58309772d036970a428d6e36c3a
MD5 23844fd548aa3c3e4702f73a2352afdf
BLAKE2b-256 9f3c1b737ebb3bc4aaa14b56346471e7fd31939fd169a759ffc15f01ffc44cc3

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page