Skip to main content

Typesafe railway-oriented programming (ROP)

Project description

trcks 🚂

trcks is a Python library. It allows railway-oriented programming in two different programming styles:

  1. an object-oriented style based on method chaining and
  2. a functional style based on function composition.

Motivation

The following subsections motivate railway-oriented programming in general and the trcks library in particular.

Why should I use railway-oriented programming?

When writing modular Python code, return type annotations are extremely helpful. They help humans (and maybe LLMs) to understand the purpose of a function. And they allow static type checkers (e.g. mypy or pyright) to check whether functions fit together:

>>> def get_user_id(user_email: str) -> int:
...     if user_email == "erika.mustermann@domain.org":
...         return 1
...     if user_email == "john_doe@provider.com":
...         return 2
...     raise Exception("User does not exist")
...
>>> def get_subscription_id(user_id: int) -> int:
...     if user_id == 1:
...         return 42
...     raise Exception("User does not have a subscription")
...
>>> def get_subscription_fee(subscription_id: int) -> float:
...     return subscription_id * 0.1
...
>>> def get_subscription_fee_by_email(user_email: str) -> float:
...     return get_subscription_fee(get_subscription_id(get_user_id(user_email)))
...
>>> get_subscription_fee_by_email("erika.mustermann@domain.org")
4.2

Unfortunately, conventional return type annotations do not always tell the full story:

>>> get_subscription_id(user_id=2)
Traceback (most recent call last):
    ...
Exception: User does not have a subscription

We can document domain exceptions in the docstring of the function:

>>> def get_subscription_id(user_id: int) -> int:
...     """Look up the subscription ID for a user.
...
...     Raises:
...         Exception: If the user does not have a subscription.
...     """
...     if user_id == 1:
...         return 42
...     raise Exception("User does not have a subscription")
...

While this helps humans (and maybe LLMs), static type checkers usually ignore docstrings. Moreover, it is difficult to document all domain exceptions in the docstring and to keep this documentation up-to-date. Therefore, we should use railway-oriented programming.

How can I use railway-oriented programming?

Instead of raising exceptions (and documenting this behavior in the docstring), we return a Result type:

>>> from typing import Literal
>>> from trcks import Result
>>>
>>> UserDoesNotHaveASubscription = Literal["User does not have a subscription"]
>>>
>>> def get_subscription_id(
...     user_id: int
... ) -> Result[UserDoesNotHaveASubscription, int]:
...     if user_id == 1:
...         return "success", 42
...     return "failure", "User does not have a subscription"
...
>>> get_subscription_id(user_id=1)
('success', 42)
>>> get_subscription_id(user_id=2)
('failure', 'User does not have a subscription')

This return type

  1. describes the success case and the failure case and
  2. is verified by static type checkers.

What do I need for railway-oriented programming?

Combining Result-returning functions with other Result-returning functions or with "regular" functions can be cumbersome. Moreover, it can lead to repetitive code patterns:

>>> from typing import Union
>>>
>>> UserDoesNotExist = Literal["User does not exist"]
>>> FailureDescription = Union[UserDoesNotExist, UserDoesNotHaveASubscription]
>>>
>>> def get_user_id(user_email: str) -> Result[UserDoesNotExist, int]:
...     if user_email == "erika.mustermann@domain.org":
...         return "success", 1
...     if user_email == "john_doe@provider.com":
...         return "success", 2
...     return "failure", "User does not exist"
...
>>> def get_subscription_fee_by_email(
...     user_email: str
... ) -> Result[FailureDescription, float]:
...     # Apply get_user_id:
...     user_id_result = get_user_id(user_email)
...     if user_id_result[0] == "failure":
...         return user_id_result
...     user_id = user_id_result[1]
...     # Apply get_subscription_id:
...     subscription_id_result = get_subscription_id(user_id)
...     if subscription_id_result[0] == "failure":
...         return subscription_id_result
...     subscription_id = subscription_id_result[1]
...     # Apply get_subscription_fee:
...     subscription_fee = get_subscription_fee(subscription_id)
...     # Return result:
...     return "success", subscription_fee
...
>>> get_subscription_fee_by_email("erika.mustermann@domain.org")
('success', 4.2)
>>> get_subscription_fee_by_email("john_doe@provider.com")
('failure', 'User does not have a subscription')
>>> get_subscription_fee_by_email("jane_doe@provider.com")
('failure', 'User does not exist')

Therefore, we need a library that helps us combine functions.

How does the module trcks.oop help with function combination?

The module trcks.oop supports combining functions in an object-oriented style using method chaining:

>>> from trcks.oop import Wrapper
>>>
>>> def get_subscription_fee_by_email(
...     user_email: str
... ) -> Result[FailureDescription, float]:
...     return (
...         Wrapper(core=user_email)
...         .map_to_result(get_user_id)
...         .map_success_to_result(get_subscription_id)
...         .map_success(get_subscription_fee)
...         .core
...     )
...
>>> get_subscription_fee_by_email("erika.mustermann@domain.org")
('success', 4.2)
>>> get_subscription_fee_by_email("john_doe@provider.com")
('failure', 'User does not have a subscription')
>>> get_subscription_fee_by_email("jane_doe@provider.com")
('failure', 'User does not exist')

How does the package trcks.fp help with function combination?

The package trcks.fp supports combining functions in a functional style using function composition:

>>> from trcks.fp.composition import Pipeline3, pipe
>>> from trcks.fp.monads import result as r
>>>
>>> def get_subscription_fee_by_email(
...     user_email: str
... ) -> Result[FailureDescription, float]:
...     # If your static type checker cannot infer
...     # the type of the argument passed to `pipe`,
...     # explicit type assignment can help:
...     pipeline: Pipeline3[
...         str,
...         Result[UserDoesNotExist, int],
...         Result[FailureDescription, int],
...         Result[FailureDescription, float],
...     ] = (
...         user_email,
...         get_user_id,
...         r.map_success_to_result(get_subscription_id),
...         r.map_success(get_subscription_fee),
...     )
...     return pipe(pipeline)
...
>>> get_subscription_fee_by_email("erika.mustermann@domain.org")
('success', 4.2)
>>> get_subscription_fee_by_email("john_doe@provider.com")
('failure', 'User does not have a subscription')
>>> get_subscription_fee_by_email("jane_doe@provider.com")
('failure', 'User does not exist')

Setup

trcks is available on PyPI. Use your favorite package manager (e.g. pip, poetry or uv) to install it.

Usage

The following subsections describe the usage of trcks, trcks.oop and trcks.fp.

Tuple types provided by trcks

The generic type trcks.Failure[F] describes all tuples of length 2 with the string "failure" as the first element and a second element of type F. Usually, the second element is a string, an exception or an enum value:

>>> import enum
>>> from typing import Literal
>>> from trcks import Failure
>>>
>>> UserDoesNotExistLiteral = Literal["User does not exist"]
>>> literal_failure: Failure[UserDoesNotExistLiteral] = (
...     "failure", "User does not exist"
... )
>>>
>>> class UserDoesNotExistException(Exception):
...     pass
...
>>> exception_failure: Failure[UserDoesNotExistException] = ("failure", UserDoesNotExistException())
>>>
>>> class ErrorEnum(enum.Enum):
...     USER_DOES_NOT_EXIST = enum.auto
...
>>> enum_failure: Failure[ErrorEnum] = ("failure", ErrorEnum.USER_DOES_NOT_EXIST)

The generic type trcks.Success[S] describes all tuples of length 2 with the string "success" as the first element and a second element of type S. Here, S can be any type.

>>> from decimal import Decimal
>>> from pathlib import Path
>>> from trcks import Success
>>>
>>> decimal_success: Success[Decimal] = ("success", Decimal("3.14"))
>>> float_list_success: Success[list[float]] = ("success", [1.0, 2.0, 3.0])
>>> int_success: Success[int] = ("success", 42)
>>> path_success: Success[Path] = ("success", Path("/tmp/my-file.txt"))
>>> str_success: Success[str] = ("success", "foo")

The generic type trcks.Result[F, S] is the union of trcks.Failure[F] and trcks.Success[S]. It is primarily used as a return type for functions:

>>> from typing import Literal
>>> from trcks import Result
>>>
>>> UserDoesNotHaveASubscription = Literal["User does not have a subscription"]
>>>
>>> def get_subscription_id(
...     user_id: int
... ) -> Result[UserDoesNotHaveASubscription, int]:
...     if user_id == 1:
...         return "success", 42
...     return "failure", "User does not have a subscription"
...
>>> get_subscription_id(user_id=1)
('success', 42)
>>> get_subscription_id(user_id=2)
('failure', 'User does not have a subscription')

Railway-oriented programming with trcks.oop

The following subsections describe how to use trcks.oop for railway-oriented programming. Single-track and double-track code are both discussed. So are synchronous and asynchronous code.

Synchronous single-track code with trcks.oop.Wrapper

The generic class trcks.oop.Wrapper[T] allows us to chain functions:

>>> from trcks.oop import Wrapper
>>>
>>> def to_length_string(s: str) -> str:
...     return Wrapper(core=s).map(len).map(lambda n: f"Length: {n}").core
...
>>> to_length_string("Hello, world!")
'Length: 13'

To understand what is going on here, let us have a look at the individual steps of the chain:

>>> # 1. Wrap the input string:
>>> wrapped: Wrapper[str] = Wrapper(core="Hello, world!")
>>> wrapped
Wrapper(core='Hello, world!')
>>> # 2. Apply the builtin function len:
>>> mapped: Wrapper[int] = wrapped.map(len)
>>> mapped
Wrapper(core=13)
>>> # 3. Apply a lambda function:
>>> mapped_again: Wrapper[str] = mapped.map(lambda n: f"Length: {n}")
>>> mapped_again
Wrapper(core='Length: 13')
>>> # 4. Unwrap the output string:
>>> unwrapped: str = mapped_again.core
>>> unwrapped
'Length: 13'

Note: Instead of the default constructor trcks.oop.Wrapper(core="Hello, world!"), we can also use the static method trcks.oop.Wrapper.construct("Hello, world!").

By following the pattern of wrapping, mapping and unwrapping, we can write code that resembles a single-track railway (or maybe a single-pipe pipeline).

Side effects like logging or writing to a file tend to "consume" their input and return None instead. To avoid this, we can use the tap method available in the Wrapper class. This method allows executing side effects while preserving the original value:

>>> def to_length_string(s: str) -> str:
...     return (
...         Wrapper(core=s)
...         .tap(lambda o: print(f"LOG: Received '{o}'."))
...         .map(len)
...         .map(lambda n: f"Length: {n}")
...         .tap(lambda o: print(f"LOG: Returning '{o}'."))
...         .core
...     )
...
>>> output = to_length_string("Hello, world!")
LOG: Received 'Hello, world!'.
LOG: Returning 'Length: 13'.
>>> output
'Length: 13'

Synchronous double-track code with trcks.Result and trcks.oop.ResultWrapper

Whenever we encounter something exceptional in conventional Python programming (e.g. something not working as expected or some edge case in our business logic), we usually jump (via raise and try ... except) to a completely different place in our codebase that (hopefully) handles our exception.

In railway-oriented programming, however, we tend to have two parallel code tracks:

  1. a failure track and
  2. a success track (a.k.a. "happy path").

This can be achieved by using the generic type trcks.Result[F, S] that contains either

  1. a failure value of type F or
  2. a success value of type S.

The generic class trcks.oop.ResultWrapper[F, S] simplifies the implementation of the parallel code tracks.

>>> def get_subscription_fee_by_email(
...     user_email: str
... ) -> Result[FailureDescription, float]:
...     return (
...         Wrapper(core=user_email)
...         .map_to_result(get_user_id)
...         .map_success_to_result(get_subscription_id)
...         .map_success(get_subscription_fee)
...         .core
...     )
...
>>> get_subscription_fee_by_email("erika.mustermann@domain.org")
('success', 4.2)
>>> get_subscription_fee_by_email("john_doe@provider.com")
('failure', 'User does not have a subscription')
>>> get_subscription_fee_by_email("jane_doe@provider.com")
('failure', 'User does not exist')

To understand what is going on here, let us have a look at the individual steps of the chain:

>>> from trcks.oop import ResultWrapper
>>>
>>> # 1. Wrap the input string:
>>> wrapped: Wrapper[str] = Wrapper(core="erika.mustermann@domain.org")
>>> wrapped
Wrapper(core='erika.mustermann@domain.org')
>>> # 2. Apply the Result function get_user_id:
>>> mapped_once: ResultWrapper[UserDoesNotExist, int] = wrapped.map_to_result(
...     get_user_id
... )
>>> mapped_once
ResultWrapper(core=('success', 1))
>>> # 3. Apply the Result function get_subscription_id in the success case:
>>> mapped_twice: ResultWrapper[
...     FailureDescription, int
... ] = mapped_once.map_success_to_result(get_subscription_id)
>>> mapped_twice
ResultWrapper(core=('success', 42))
>>> # 4. Apply the function get_subscription_fee in the success case:
>>> mapped_thrice: ResultWrapper[
...     FailureDescription, float
... ] = mapped_twice.map_success(get_subscription_fee)
>>> mapped_thrice
ResultWrapper(core=('success', 4.2))
>>> # 5. Unwrap the output result:
>>> unwrapped: Result[FailureDescription, float] = mapped_thrice.core
>>> unwrapped
('success', 4.2)

Note: The method trcks.oop.Wrapper.map_to_result returns a trcks.oop.ResultWrapper object. The corresponding class trcks.oop.ResultWrapper has a map_failure* and a map_success* method for each map* method of the class trcks.oop.Wrapper.

The tap_success and tap_failure methods allow us to execute side effects in the success case or in the failure case, respectively:

>>> def get_subscription_fee_by_email(
...     user_email: str
... ) -> Result[FailureDescription, float]:
...     return (
...         Wrapper(core=user_email)
...         .map_to_result(get_user_id)
...         .tap_success(lambda n: print(f"LOG: User ID: {n}."))
...         .map_success_to_result(get_subscription_id)
...         .map_success(get_subscription_fee)
...         .tap_success(lambda x: print(f"LOG: Subscription fee: {x}."))
...         .tap_failure(lambda fd: print(f"LOG: Failure description: {fd}."))
...         .core
...     )
...
>>> fee_erika = get_subscription_fee_by_email("erika.mustermann@domain.org")
LOG: User ID: 1.
LOG: Subscription fee: 4.2.
>>> fee_erika
('success', 4.2)
>>> fee_john = get_subscription_fee_by_email("john_doe@provider.com")
LOG: User ID: 2.
LOG: Failure description: User does not have a subscription.
>>> fee_john
('failure', 'User does not have a subscription')
>>> fee_jane = get_subscription_fee_by_email("jane_doe@provider.com")
LOG: Failure description: User does not exist.
>>> fee_jane
('failure', 'User does not exist')

Sometimes, side effects themselves can fail and need to return a Result type. The tap_success_to_result method allows us to execute such side effects in the success case. If the side effect returns a Failure, that failure is propagated. If the side effect returns a Success, the original success value is preserved.

>>> OutOfDiskSpace = Literal["Out of disk space"]
>>> def write_to_disk(n: int) -> Result[OutOfDiskSpace, None]:
...     if n > 1:
...         return "failure", "Out of disk space"
...     return "success", print(f"LOG: Wrote {n} to disk.")
...
>>> def get_and_persist_user_id(
...     user_email: str
... ) -> Result[Union[UserDoesNotExist, OutOfDiskSpace], int]:
...     return (
...         Wrapper(core=user_email)
...         .map_to_result(get_user_id)
...         .tap_success_to_result(write_to_disk)
...         .core
...     )
...
>>> id_erika = get_and_persist_user_id("erika.mustermann@domain.org")
LOG: Wrote 1 to disk.
>>> id_erika
('success', 1)
>>> id_john = get_and_persist_user_id("john_doe@provider.com")
>>> id_john
('failure', 'Out of disk space')
>>> id_jane = get_and_persist_user_id("jane_doe@provider.com")
>>> id_jane
('failure', 'User does not exist')

Asynchronous single-track code with collections.abc.Awaitable and trcks.oop.AwaitableWrapper

While the class trcks.oop.Wrapper and its method map allow the chaining of synchronous functions, they cannot chain asynchronous functions. To understand why, we first need to understand the return type of asynchronous functions:

>>> import asyncio
>>> from collections.abc import Awaitable, Coroutine
>>> async def read_from_disk(path: str) -> str:
...     await asyncio.sleep(0.001)
...     s = "Hello, world!"
...     print(f"Read '{s}' from file {path}.")
...     return s
...
>>> # Examine the return value of read_from_disk:
>>> return_value = read_from_disk("input.txt")
>>> return_value
<coroutine object read_from_disk at ...>
>>> asyncio.run(return_value)
Read 'Hello, world!' from file input.txt.
'Hello, world!'
>>> # Examine the type of the return value:
>>> return_type = type(return_value)
>>> return_type
<class 'coroutine'>
>>> issubclass(return_type, Coroutine)
True
>>> issubclass(Coroutine, Awaitable)
True

So, whenever we define a function using the async def ... -> T syntax, we actually get a function with the return type collections.abc.Awaitable[T]. The method trcks.oop.Wrapper.map_to_awaitable and the class trcks.oop.AwaitableWrapper allow us to combine collections.abc.Awaitable-returning functions with other collections.abc.Awaitable-returning functions or with "regular" functions:

>>> def transform(s: str) -> str:
...     return f"Length: {len(s)}"
...
>>> async def write_to_disk(s: str, path: str) -> None:
...     await asyncio.sleep(0.001)
...     print(f"Wrote '{s}' to file {path}.")
...
>>> async def read_and_transform_and_write(
...     input_path: str, output_path: str
... ) -> None:
...     return await (
...         Wrapper(core=input_path)
...         .map_to_awaitable(read_from_disk)
...         .map(transform)
...         .map_to_awaitable(lambda s: write_to_disk(s, output_path))
...         .core
...     )
...
>>> asyncio.run(read_and_transform_and_write("input.txt", "output.txt"))
Read 'Hello, world!' from file input.txt.
Wrote 'Length: 13' to file output.txt.

To understand what is going on here, let us have a look at the individual steps of the chain:

>>> from typing import Any
>>> from trcks.oop import AwaitableWrapper
>>> # 1. Wrap the input string:
>>> wrapped: Wrapper[str] = Wrapper(core="input.txt")
>>> wrapped
Wrapper(core='input.txt')
>>> # 2. Apply the Awaitable function read_from_disk:
>>> mapped_once: AwaitableWrapper[str] = wrapped.map_to_awaitable(read_from_disk)
>>> mapped_once
AwaitableWrapper(core=<coroutine object ...>)
>>> # 3. Apply the function transform:
>>> mapped_twice: AwaitableWrapper[str] = mapped_once.map(transform)
>>> mapped_twice
AwaitableWrapper(core=<coroutine object ...>)
>>> # 4. Apply the Awaitable function write_to_disk:
>>> mapped_thrice: AwaitableWrapper[None] = mapped_twice.map_to_awaitable(
...     lambda s: write_to_disk(s, "output.txt")
... )
>>> mapped_thrice
AwaitableWrapper(core=<coroutine object ...>)
>>> # 5. Unwrap the output coroutine:
>>> unwrapped: Coroutine[Any, Any, None] = mapped_thrice.core_as_coroutine
>>> unwrapped
<coroutine object ...>
>>> # 6. Run the output coroutine:
>>> asyncio.run(unwrapped)
Read 'Hello, world!' from file input.txt.
Wrote 'Length: 13' to file output.txt.

Note: The property core of the class trcks.oop.AwaitableWrapper has type collections.abc.Awaitable. Since asyncio.run expects a collections.abc.Coroutine object, we need to use the property core_as_coroutine instead.

The method AwaitableWrapper.tap allows us to execute synchronous side effects. Similarly, he method AwaitableWrapper.tap_to_awaitable allows us to execute asynchronous side effects.

>>> async def read_from_disk(path: str) -> str:
...     await asyncio.sleep(0.001)
...     return "Hello, world!"
...
>>> async def write_to_disk(s: str, path: str) -> None:
...     await asyncio.sleep(0.001)
...
>>> async def read_and_transform_and_write(input_path: str, output_path: str) -> str:
...     return await (
...         Wrapper(core=input_path)
...         .map_to_awaitable(read_from_disk)
...         .tap(lambda s: print(f"Read '{s}' from disk."))
...         .map(transform)
...         .tap_to_awaitable(lambda s: write_to_disk(s, output_path))
...         .tap(lambda s: print(f"Wrote '{s}' to disk."))
...         .core
...     )
...
>>> return_value = asyncio.run(read_and_transform_and_write("input.txt", "output.txt"))
Read 'Hello, world!' from disk.
Wrote 'Length: 13' to disk.
>>> return_value
'Length: 13'

Asynchronous double-track code with trcks.AwaitableResult and trcks.oop.AwaitableResultWrapper

Whenever we define a function using the async def ... -> Result[F, S] syntax, we actually get a function with the return type collections.abc.Awaitable[trcks.Result[F, S]]. The module trcks.oop provides the type alias trcks.oop.AwaitableResult[F, S] for this type. Moreover, the method trcks.oop.Wrapper.map_to_awaitable_result and the class trcks.oop.AwaitableResultWrapper allow us to combine trcks.oop.AwaitableResult-returning functions with other trcks.oop.AwaitableResult-returning functions or with "regular" functions:

>>> ReadErrorLiteral = Literal["read error"]
>>> WriteErrorLiteral = Literal["write error"]
>>> async def read_from_disk(path: str) -> Result[ReadErrorLiteral, str]:
...     if path != "input.txt":
...         return "failure", "read error"
...     await asyncio.sleep(0.001)
...     s = "Hello, world!"
...     print(f"Read '{s}' from file {path}.")
...     return "success", s
...
>>> def transform(s: str) -> str:
...     return f"Length: {len(s)}"
...
>>> async def write_to_disk(s: str, path: str) -> Result[WriteErrorLiteral, None]:
...     if path != "output.txt":
...         return "failure", "write error"
...     await asyncio.sleep(0.001)
...     print(f"Wrote '{s}' to file {path}.")
...     return "success", None
...
>>>
>>> async def read_and_transform_and_write(
...     input_path: str, output_path: str
... ) -> Result[Union[ReadErrorLiteral, WriteErrorLiteral], None]:
...     return await (
...         Wrapper(core=input_path)
...         .map_to_awaitable_result(read_from_disk)
...         .map_success(transform)
...         .map_success_to_awaitable_result(lambda s: write_to_disk(s, output_path))
...         .core
...     )
...
>>> asyncio.run(read_and_transform_and_write("input.txt", "output.txt"))
Read 'Hello, world!' from file input.txt.
Wrote 'Length: 13' to file output.txt.
('success', None)

To understand what is going on here, let us have a look at the individual steps of the chain:

>>> from trcks.oop import AwaitableResultWrapper
>>> # 1. Wrap the input string:
>>> wrapped: Wrapper[str] = Wrapper(core="input.txt")
>>> wrapped
Wrapper(core='input.txt')
>>> # 2. Apply the AwaitableResult function read_from_disk:
>>> mapped_once: AwaitableResultWrapper[ReadErrorLiteral, str] = (
...     wrapped.map_to_awaitable_result(read_from_disk)
... )
>>> mapped_once
AwaitableResultWrapper(core=<coroutine object ...>)
>>> # 3. Apply the function transform in the success case:
>>> mapped_twice: AwaitableResultWrapper[ReadErrorLiteral, str] = mapped_once.map_success(
...     transform
... )
>>> mapped_twice
AwaitableResultWrapper(core=<coroutine object ...>)
>>> # 4. Apply the AwaitableResult function write_to_disk in the success case:
>>> mapped_thrice: AwaitableResultWrapper[
...     Union[ReadErrorLiteral, WriteErrorLiteral], None
... ] = mapped_twice.map_success_to_awaitable_result(
...     lambda s: write_to_disk(s, "output.txt")
... )
>>> mapped_thrice
AwaitableResultWrapper(core=<coroutine object ...>)
>>> # 5. Unwrap the output coroutine:
>>> unwrapped: Coroutine[
...     Any, Any, Result[Union[ReadErrorLiteral, WriteErrorLiteral], None]
... ] = mapped_thrice.core_as_coroutine
>>> unwrapped
<coroutine object ...>
>>> # 6. Run the output coroutine:
>>> asyncio.run(unwrapped)
Read 'Hello, world!' from file input.txt.
Wrote 'Length: 13' to file output.txt.
('success', None)

The methods AwaitableResultWrapper.tap_failure and AwaitableResultWrapper.tap_success allow us to execute synchronous side effects in the failure case or in the success case, respectively:

>>> async def read_from_disk(path: str) -> Result[ReadErrorLiteral, str]:
...     if path != "input.txt":
...         return "failure", "read error"
...     await asyncio.sleep(0.001)
...     return "success", "Hello, world!"
...
>>> async def write_to_disk(s: str, path: str) -> Result[WriteErrorLiteral, None]:
...     if path != "output.txt":
...         return "failure", "write error"
...     await asyncio.sleep(0.001)
...     return "success", None
...
>>> async def read_and_transform_and_write(
...     input_path: str, output_path: str
... ) -> Result[Union[ReadErrorLiteral, WriteErrorLiteral], None]:
...     return await (
...         Wrapper(core=input_path)
...         .map_to_awaitable_result(read_from_disk)
...         .tap_success(lambda s: print(f"LOG: Read '{s}' from disk."))
...         .map_success(transform)
...         .map_success_to_awaitable_result(lambda s: write_to_disk(s, output_path))
...         .tap_success(lambda _: print(f"LOG: Successfully wrote to disk."))
...         .tap_failure(lambda err: print(f"LOG: Failed with error: {err}"))
...         .core
...     )
...
>>> result_1 = asyncio.run(read_and_transform_and_write("input.txt", "output.txt"))
LOG: Read 'Hello, world!' from disk.
LOG: Successfully wrote to disk.
>>> result_1
('success', None)
>>> result_2 = asyncio.run(read_and_transform_and_write("missing.txt", "output.txt"))
LOG: Failed with error: read error
>>> result_2
('failure', 'read error')

Sometimes, side effects themselves can fail and need to return an AwaitableResult type. The method AwaitableResultWrapper.tap_success_to_awaitable_result allows us to execute such asynchronous side effects in the success case. If the side effect returns an AwaitableFailure, that failure is propagated. If the side effect returns an AwaitableSuccess, the original success value is preserved:

>>> async def write_to_disk(s: str) -> Result[OutOfDiskSpace, None]:
...     await asyncio.sleep(0.001)
...     if len(s) > 10:
...         return "failure", "Out of disk space"
...     return "success", None
...
>>> async def read_and_persist(input_path: str) -> Result[Union[ReadErrorLiteral, OutOfDiskSpace], str]:
...     return await (
...         Wrapper(core=input_path)
...         .map_to_awaitable_result(read_from_disk)
...         .tap_success(lambda s: print(f"LOG: Read '{s}' from disk."))
...         .tap_success(lambda s: print(f"LOG: Persisting '{s}'."))
...         .tap_success_to_awaitable_result(write_to_disk)
...         .core
...     )
...
>>> result = asyncio.run(read_and_persist("input.txt"))
LOG: Read 'Hello, world!' from disk.
LOG: Persisting 'Hello, world!'.
>>> result
('failure', 'Out of disk space')

Railway-oriented programming with trcks.fp

The following subsections describe how to use trcks.fp for railway-oriented programming. Single-track and double-track code are both discussed. So are synchronous and asynchronous code.

Synchronous single-track code with trcks.fp.composition

The function trcks.fp.composition.pipe allows us to chain functions:

>>> from trcks.fp.composition import pipe
>>> def to_length_string(s: str) -> str:
...     return pipe((s, len, lambda n: f"Length: {n}"))
...
>>> to_length_string("Hello, world!")
'Length: 13'

To understand what is going on here, let us have a look at the individual steps of the chain:

>>> pipe(("Hello, world!",))
'Hello, world!'
>>> pipe(("Hello, world!", len))
13
>>> pipe(("Hello, world!", len, lambda n: f"Length: {n}"))
'Length: 13'

Note: The function trcks.fp.composition.pipe expects a trcks.fp.composition.Pipeline, i.e. a tuple consisting of a start value followed by up to seven compatible functions.

Side effects like logging or writing to a file tend to "consume" their input and return None instead. To avoid this, we can use the higher-order function trcks.fp.monads.identity.tap. This higher-order function turns each function into a function that behaves like the original function but returns the input value.

>>> from trcks.fp.monads import identity as i
>>> def to_length_string(s: str) -> str:
...     return pipe(
...         (
...             s,
...             i.tap(lambda o: print(f"LOG: Received '{o}'.")),
...             len,
...             lambda n: f"Length: {n}",
...             i.tap(lambda o: print(f"LOG: Returning '{o}'.")),
...         ),
...     )
...
>>> output = to_length_string("Hello, world!")
LOG: Received 'Hello, world!'.
LOG: Returning 'Length: 13'.
>>> output
'Length: 13'

Synchronous double-track code with trcks.fp.composition and trcks.fp.monads.result

If one of the functions in a trcks.fp.composition.Pipeline returns a trcks.Result[F, S] type, the following function must accept this trcks.Result[F, S] type as its input. However, functions with input type trcks.Result[F, S] tend to violate the "do one thing and do it well" principle. Therefore, the module trcks.fp.monads.result provides some higher-order functions named map_* that turn functions with input type F and functions with input type S into functions with input type trcks.Result[F, S].

>>> def get_subscription_fee_by_email(
...     user_email: str
... ) -> Result[FailureDescription, float]:
...     # If your static type checker cannot infer
...     # the type of the argument passed to `pipe`,
...     # explicit type assignment can help:
...     pipeline: Pipeline3[
...         str,
...         Result[UserDoesNotExist, int],
...         Result[FailureDescription, int],
...         Result[FailureDescription, float],
...     ] = (
...         user_email,
...         get_user_id,
...         r.map_success_to_result(get_subscription_id),
...         r.map_success(get_subscription_fee),
...     )
...     return pipe(pipeline)
...
>>> get_subscription_fee_by_email("erika.mustermann@domain.org")
('success', 4.2)
>>> get_subscription_fee_by_email("john_doe@provider.com")
('failure', 'User does not have a subscription')
>>> get_subscription_fee_by_email("jane_doe@provider.com")
('failure', 'User does not exist')

To understand what is going on here, let us have a look at the individual steps of the chain:

>>> from trcks.fp.composition import (
...     Pipeline0, Pipeline1, Pipeline2, Pipeline3, pipe
... )
>>> p0: Pipeline0[str] = ("erika.mustermann@domain.org",)
>>> pipe(p0)
'erika.mustermann@domain.org'
>>> p1: Pipeline1[str, Result[UserDoesNotExist, int]] = (
...     "erika.mustermann@domain.org",
...     get_user_id,
... )
>>> pipe(p1)
('success', 1)
>>> p2: Pipeline2[
...     str, Result[UserDoesNotExist, int], Result[FailureDescription, int]
... ] = (
...     "erika.mustermann@domain.org",
...     get_user_id,
...     r.map_success_to_result(get_subscription_id),
... )
>>> pipe(p2)
('success', 42)
>>> p3: Pipeline3[
...     str,
...     Result[UserDoesNotExist, int],
...     Result[FailureDescription, int],
...     Result[FailureDescription, float],
... ] = (
...     "erika.mustermann@domain.org",
...     get_user_id,
...     r.map_success_to_result(get_subscription_id),
...     r.map_success(get_subscription_fee),
... )
>>> pipe(p3)
('success', 4.2)

While trcks.fp.monads.result.map_failure and trcks.fp.monads.result.map_success allow us to apply functions in the failure case or in the success case, respectively, the higher-order functions trcks.fp.monads.result.tap_failure and trcks.fp.monads.result.tap_success allow us to execute side effects in the failure case or in the success case, respectively.

>>> from trcks.fp.composition import Pipeline6
>>> def get_subscription_fee_by_email(user_email: str) -> Result[FailureDescription, float]:
...     pipeline: Pipeline6[
...         str,
...         Result[UserDoesNotExist, int],
...         Result[UserDoesNotExist, int],
...         Result[FailureDescription, int],
...         Result[FailureDescription, float],
...         Result[FailureDescription, float],
...         Result[FailureDescription, float],
...     ] = (
...         user_email,
...         get_user_id,
...         r.tap_success(lambda n: print(f"LOG: User ID: {n}.")),
...         r.map_success_to_result(get_subscription_id),
...         r.map_success(get_subscription_fee),
...         r.tap_success(lambda x: print(f"LOG: Subscription fee: {x}.")),
...         r.tap_failure(lambda fd: print(f"LOG: Failure description: {fd}.")),
...     )
...     return pipe(pipeline)
...
>>> fee_erika = get_subscription_fee_by_email("erika.mustermann@domain.org")
LOG: User ID: 1.
LOG: Subscription fee: 4.2.
>>> fee_erika
('success', 4.2)
>>> fee_john = get_subscription_fee_by_email("john_doe@provider.com")
LOG: User ID: 2.
LOG: Failure description: User does not have a subscription.
>>> fee_john
('failure', 'User does not have a subscription')
>>> fee_jane = get_subscription_fee_by_email("jane_doe@provider.com")
LOG: Failure description: User does not exist.
>>> fee_jane
('failure', 'User does not exist')

Sometimes, side effects themselves can fail and need to return a Result type. The higher-order function trcks.fp.monads.result.tap_success_to_result allows us to execute such side effects in the success case. If the side effect returns a Failure, that failure is propagated. If the side effect returns a Success, the original success value is preserved.

>>> def write_to_disk(n: int) -> Result[OutOfDiskSpace, None]:
...     if n > 1:
...         return "failure", "Out of disk space"
...     return "success", print(f"LOG: Wrote {n} to disk.")
...
>>> def get_and_persist_user_id(
...     user_email: str
... ) -> Result[Union[UserDoesNotExist, OutOfDiskSpace], int]:
...     pipeline: Pipeline2[
...         str,
...         Result[UserDoesNotExist, int],
...         Result[Union[UserDoesNotExist, OutOfDiskSpace], int],
...     ] = (
...         user_email,
...         get_user_id,
...         r.tap_success_to_result(write_to_disk),
...     )
...     return pipe(pipeline)
...
>>> id_erika = get_and_persist_user_id("erika.mustermann@domain.org")
LOG: Wrote 1 to disk.
>>> id_erika
('success', 1)
>>> id_john = get_and_persist_user_id("john_doe@provider.com")
>>> id_john
('failure', 'Out of disk space')
>>> id_jane = get_and_persist_user_id("jane_doe@provider.com")
>>> id_jane
('failure', 'User does not exist')

Asynchronous single-track code with trcks.fp.composition and trcks.fp.monads.awaitable

If one of the functions in a trcks.fp.composition.Pipeline returns a collections.abc.Awaitable[T] type, the following function must accept this collections.abc.Awaitable[T] type as its input. However, functions with input type collections.abc.Awaitable[T] tend to contain unnecessary await statements. Therefore, the module trcks.fp.monads.awaitable provides some higher-order functions named map_* that turn functions with input type T into functions with input type collections.abc.Awaitable[T].

>>> from trcks.fp.monads import awaitable as a
>>> async def read_from_disk(path: str) -> str:
...     await asyncio.sleep(0.001)
...     s = "Hello, world!"
...     print(f"Read '{s}' from file {path}.")
...     return s
...
>>> def transform(s: str) -> str:
...     return f"Length: {len(s)}"
...
>>> async def write_to_disk(s: str, path: str) -> None:
...     await asyncio.sleep(0.001)
...     print(f"Wrote '{s}' to file {path}.")
...
>>> async def read_and_transform_and_write(
...     input_path: str, output_path: str
... ) -> None:
...     p: Pipeline3[str, Awaitable[str], Awaitable[str], Awaitable[None]] = (
...         input_path,
...         read_from_disk,
...         a.map_(transform),
...         a.map_to_awaitable(lambda s: write_to_disk(s, output_path)),
...     )
...     return await pipe(p)
...
>>> asyncio.run(read_and_transform_and_write("input.txt", "output.txt"))
Read 'Hello, world!' from file input.txt.
Wrote 'Length: 13' to file output.txt.

To understand what is going on here, let us have a look at the individual steps of the chain:

>>> p1: Pipeline1[str, Awaitable[str]] = (
...     "input.txt",
...     read_from_disk,
... )
>>> asyncio.run(a.to_coroutine(pipe(p1)))
Read 'Hello, world!' from file input.txt.
'Hello, world!'
>>> p2: Pipeline2[str, Awaitable[str], Awaitable[str]] = (
...     "input.txt",
...     read_from_disk,
...     a.map_(transform),
... )
>>> asyncio.run(a.to_coroutine(pipe(p2)))
Read 'Hello, world!' from file input.txt.
'Length: 13'
>>> p3: Pipeline3[str, Awaitable[str], Awaitable[str], Awaitable[None]] = (
...     "input.txt",
...     read_from_disk,
...     a.map_(transform),
...     a.map_to_awaitable(lambda s: write_to_disk(s, "output.txt")),
... )
>>> asyncio.run(a.to_coroutine(pipe(p3)))
Read 'Hello, world!' from file input.txt.
Wrote 'Length: 13' to file output.txt.

Note: The values pipe(p1), pipe(p2) and pipe(p3) are all of type collections.abc.Awaitable. Since asyncio.run expects the input type collections.abc.Coroutine, we use the function trcks.fp.monads.awaitable.to_coroutine to convert the collections.abc.Awaitables to collections.abc.Coroutines.

The higher-order function trcks.fp.monads.awaitable.tap allows us to execute synchronous side effects. Similarly, the higher-order function trcks.fp.monads.awaitable.tap_to_awaitable allows us to execute asynchronous side effects.

>>> async def read_from_disk(path: str) -> str:
...     await asyncio.sleep(0.001)
...     return "Hello, world!"
...
>>> async def write_to_disk(s: str, path: str) -> None:
...     await asyncio.sleep(0.001)
...
>>> async def read_and_transform_and_write(input_path: str, output_path: str) -> str:
...     p: Pipeline5[
...         str,
...         Awaitable[str],
...         Awaitable[str],
...         Awaitable[str],
...         Awaitable[str],
...         Awaitable[str],
...     ] = (
...         input_path,
...         read_from_disk,
...         a.tap(lambda s: print(f"Read '{s}' from disk.")),
...         a.map_(transform),
...         a.tap_to_awaitable(lambda s: write_to_disk(s, output_path)),
...         a.tap(lambda s: print(f"Wrote '{s}' to disk.")),
...     )
...     return await pipe(p)
...
>>> asyncio.run(read_and_transform_and_write("input.txt", "output.txt"))
Read 'Hello, world!' from disk.
Wrote 'Length: 13' to disk.
'Length: 13'

Asynchronous double-track code with trcks.fp.composition and trcks.fp.monads.awaitable_result

If one of the functions in a trcks.fp.composition.Pipeline returns a trcks.AwaitableResult[F, S] type, the following function must accept this trcks.AwaitableResult[F, S] type as its input. However, functions with input type trcks.AwaitableResult[F, S] tend to contain unnecessary await statements and violate the "do one thing and do it well" principle. Therefore, the module trcks.fp.monads.awaitable_result provides some higher-order functions named map_* that turn functions with input type F and functions with input type S into functions with input type trcks.AwaitableResult[F, S].

>>> from trcks.fp.monads import awaitable_result as ar
>>> ReadErrorLiteral = Literal["read error"]
>>> WriteErrorLiteral = Literal["write error"]
>>> async def read_from_disk(path: str) -> Result[ReadErrorLiteral, str]:
...     if path != "input.txt":
...         return "failure", "read error"
...     await asyncio.sleep(0.001)
...     s = "Hello, world!"
...     print(f"Read '{s}' from file {path}.")
...     return "success", s
...
>>> def transform(s: str) -> str:
...     return f"Length: {len(s)}"
...
>>> async def write_to_disk(s: str, path: str) -> Result[WriteErrorLiteral, None]:
...     if path != "output.txt":
...         return "failure", "write error"
...     await asyncio.sleep(0.001)
...     print(f"Wrote '{s}' to file {path}.")
...     return "success", None
...
>>> async def read_and_transform_and_write(
...     input_path: str, output_path: str
... ) -> Result[Union[ReadErrorLiteral, WriteErrorLiteral], None]:
...     p: Pipeline3[
...         str,
...         AwaitableResult[ReadErrorLiteral, str],
...         AwaitableResult[ReadErrorLiteral, str],
...         AwaitableResult[Union[ReadErrorLiteral, WriteErrorLiteral], None],
...     ] = (
...         input_path,
...         read_from_disk,
...         ar.map_success(transform),
...         ar.map_success_to_awaitable_result(lambda s: write_to_disk(s, output_path)),
...     )
...     return await pipe(p)
...
>>> asyncio.run(read_and_transform_and_write("input.txt", "output.txt"))
Read 'Hello, world!' from file input.txt.
Wrote 'Length: 13' to file output.txt.
('success', None)

To understand what is going on here, let us have a look at the individual steps of the chain:

>>> from trcks import AwaitableResult, Result
>>> p1: Pipeline1[str, AwaitableResult[ReadErrorLiteral, str]] = (
...     "input.txt",
...     read_from_disk,
... )
>>> asyncio.run(ar.to_coroutine_result(pipe(p1)))
Read 'Hello, world!' from file input.txt.
('success', 'Hello, world!')
>>> p2: Pipeline2[
...     str,
...     AwaitableResult[ReadErrorLiteral, str],
...     AwaitableResult[ReadErrorLiteral, str],
... ] = (
...     "input.txt",
...     read_from_disk,
...     ar.map_success(transform),
... )
>>> asyncio.run(ar.to_coroutine_result(pipe(p2)))
Read 'Hello, world!' from file input.txt.
('success', 'Length: 13')
>>> p3: Pipeline3[
...     str,
...     AwaitableResult[ReadErrorLiteral, str],
...     AwaitableResult[ReadErrorLiteral, str],
...     AwaitableResult[Union[ReadErrorLiteral, WriteErrorLiteral], None],
... ] = (
...     "input.txt",
...     read_from_disk,
...     ar.map_success(transform),
...     ar.map_success_to_awaitable_result(lambda s: write_to_disk(s, "output.txt")),
... )
>>> asyncio.run(ar.to_coroutine_result(pipe(p3)))
Read 'Hello, world!' from file input.txt.
Wrote 'Length: 13' to file output.txt.
('success', None)

Note: The values pipe(p1), pipe(p2) and pipe(p3) are all of type trcks.AwaitableResult. Since asyncio.run expects the input type collections.abc.Coroutine, we use the function trcks.fp.monads.awaitable_result.to_coroutine to convert the trcks.AwaitableResults to collections.abc.Coroutines.

The higher-order functions trcks.fp.monads.awaitable_result.tap_failure and trcks.fp.monads.awaitable_result.tap_success allow us to execute synchronous side effects in the failure case or in the success case, respectively:

>>> async def read_from_disk(path: str) -> Result[ReadErrorLiteral, str]:
...     if path != "input.txt":
...         return "failure", "read error"
...     await asyncio.sleep(0.001)
...     return "success", "Hello, world!"
...
>>> async def write_to_disk(s: str, path: str) -> Result[WriteErrorLiteral, None]:
...     if path != "output.txt":
...         return "failure", "write error"
...     await asyncio.sleep(0.001)
...     return "success", None
...
>>> async def read_and_transform_and_write(
...     input_path: str, output_path: str
... ) -> Result[Union[ReadErrorLiteral, WriteErrorLiteral], None]:
...     pipeline: Pipeline6[
...         str,
...         AwaitableResult[ReadErrorLiteral, str],
...         AwaitableResult[ReadErrorLiteral, str],
...         AwaitableResult[ReadErrorLiteral, str],
...         AwaitableResult[Union[ReadErrorLiteral, WriteErrorLiteral], None],
...         AwaitableResult[Union[ReadErrorLiteral, WriteErrorLiteral], None],
...         AwaitableResult[Union[ReadErrorLiteral, WriteErrorLiteral], None],
...     ] = (
...         input_path,
...         read_from_disk,
...         ar.tap_success(lambda s: print(f"LOG: Read '{s}' from disk.")),
...         ar.map_success(transform),
...         ar.map_success_to_awaitable_result(lambda s: write_to_disk(s, output_path)),
...         ar.tap_success(lambda _: print(f"LOG: Successfully wrote to disk.")),
...         ar.tap_failure(lambda err: print(f"LOG: Failed with error: {err}")),
...     )
...     return await pipe(pipeline)
...
>>> result_1 = asyncio.run(read_and_transform_and_write("input.txt", "output.txt"))
LOG: Read 'Hello, world!' from disk.
LOG: Successfully wrote to disk.
>>> result_1
('success', None)
>>> result_2 = asyncio.run(read_and_transform_and_write("missing.txt", "output.txt"))
LOG: Failed with error: read error
>>> result_2
('failure', 'read error')

Sometimes, side effects themselves can fail and need to return an AwaitableResult type. The higher-order function trcks.fp.monads.awaitable_result.tap_success_to_awaitable_result allows us to execute such asynchronous side effects in the success case. If the side effect returns an AwaitableFailure, that failure is propagated. If the side effect returns an AwaitableSuccess, the original success value is preserved:

>>> async def write_to_disk(s: str) -> Result[OutOfDiskSpace, None]:
...     await asyncio.sleep(0.001)
...     if len(s) > 10:
...         return "failure", "Out of disk space"
...     return "success", None
...
>>> async def read_and_persist(input_path: str) -> Result[Union[ReadErrorLiteral, OutOfDiskSpace], str]:
...     pipeline: Pipeline3[
...         str,
...         AwaitableResult[ReadErrorLiteral, str],
...         AwaitableResult[ReadErrorLiteral, str],
...         AwaitableResult[Union[ReadErrorLiteral, OutOfDiskSpace], str],
...     ] = (
...         input_path,
...         read_from_disk,
...         ar.tap_success(lambda s: print(f"LOG: Persisting '{s}'.")),
...         ar.tap_success_to_awaitable_result(write_to_disk),
...     )
...     return await pipe(pipeline)
...
>>> result = asyncio.run(read_and_persist("input.txt"))
LOG: Persisting 'Hello, world!'.
>>> result
('failure', 'Out of disk space')

Frequently asked questions (FAQs)

This section answers some questions that might come to your mind.

Where can I learn more about railway-oriented programming?

Scott Wlaschin's blog post Railway oriented programming comes with lots of examples and illustrations as well as videos and slides from his talks.

Should I replace all raised exceptions with trcks.Result?

No, you should not. Scott Wlaschin's blog post Against Railway-Oriented Programming lists eight scenarios where raising or not catching an exception is the better choice.

Which static type checkers does trcks support?

trcks is compatible with current versions of mypy and pyright. Other type checkers may work as well.

Which alternatives to trcks are there?

returns supports object-oriented style and functional style (like trcks). It provides a Result container (and multiple other containers) for synchronous code and a Future and a FutureResult container for asynchronous code. Whereas the Result container is pretty similar to trcks.Result, the Future container and the FutureResult container deviate from collections.abc.Awaitable and trcks.AwaitableResult. Other major differences are:

Expression supports object-oriented style ("fluent syntax") and functional style (like trcks). It provides a Result class (and multiple other container classes) for synchronous code. The Result class is pretty similar to trcks.Result and trcks.oop.ResultWrapper. An AsyncResult type based on collections.abc.AsyncGenerator will be added in a future version.

Which libraries inspired trcks?

trcks is mostly inspired by the Python libraries mentioned in the previous section and by the TypeScript library fp-ts.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

trcks-0.3.0.tar.gz (36.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

trcks-0.3.0-py3-none-any.whl (31.2 kB view details)

Uploaded Python 3

File details

Details for the file trcks-0.3.0.tar.gz.

File metadata

  • Download URL: trcks-0.3.0.tar.gz
  • Upload date:
  • Size: 36.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.8.3

File hashes

Hashes for trcks-0.3.0.tar.gz
Algorithm Hash digest
SHA256 cd7ddc68e862914280b5fc69d384d7450772f64452543b46b0520adfd46ffe4f
MD5 bb062cb6f0f34b161a06318f2df452a9
BLAKE2b-256 6f73e9398ce8845bae4e970bedf8182adec2f95d499594dd1689a27710c2c43c

See more details on using hashes here.

File details

Details for the file trcks-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: trcks-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 31.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.8.3

File hashes

Hashes for trcks-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 bac2819effb9ef4af22e3fb74e314fd643b9743c74b85ec7d5c7dbec21a19f03
MD5 6d6c8501c82b025f512f58d4b2f15b89
BLAKE2b-256 d7ba5d4d5fa695681d497a2ebe1f3b33222d04709eb36f2dfe50fcda1557fc5d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page