Skip to main content

Functional streams and monads

Project description

danom

PyPI Downloads coverage

API Reference

Ok

Frozen instance of an Ok monad used to wrap successful operations.

Ok.and_then

Ok.and_then(self, func: collections.abc.Callable[[~T], danom._result.Result], **kwargs: dict) -> danom._result.Result

Pipe another function that returns a monad.

>>> Ok(1).and_then(add_one) == Ok(2)
>>> Ok(1).and_then(raise_err) == Err(error=TypeError())

Ok.is_ok

Ok.is_ok(self) -> Literal[True]

Returns True if the result type is Ok.

>>> Ok().is_ok() == True

Ok.match

Ok.match(self, if_ok_func: collections.abc.Callable[[~T], danom._result.Result], _if_err_func: collections.abc.Callable[[~T], danom._result.Result]) -> danom._result.Result

Map Ok func to Ok and Err func to Err

>>> Ok(1).match(add_one, mock_get_error_type) == Ok(inner=2)
>>> Ok("ok").match(double, mock_get_error_type) == Ok(inner='okok')
>>> Err(error=TypeError()).match(double, mock_get_error_type) == Ok(inner='TypeError')

Ok.unwrap

Ok.unwrap(self) -> ~T

Unwrap the Ok monad and get the inner value.

>>> Ok().unwrap() == None
>>> Ok(1).unwrap() == 1
>>> Ok("ok").unwrap() == 'ok'

Err

Frozen instance of an Err monad used to wrap failed operations.

Err.and_then

Err.and_then(self, _: 'Callable[[T], Result]', **_kwargs: 'dict') -> 'Self'

Pipe another function that returns a monad. For Err will return original error.

>>> Err(error=TypeError()).and_then(add_one) == Err(error=TypeError())
>>> Err(error=TypeError()).and_then(raise_value_err) == Err(error=TypeError())

Err.is_ok

Err.is_ok(self) -> 'Literal[False]'

Returns False if the result type is Err.

Err().is_ok() == False

Err.match

Err.match(self, _if_ok_func: 'Callable[[T], Result]', if_err_func: 'Callable[[T], Result]') -> 'Result'

Map Ok func to Ok and Err func to Err

>>> Ok(1).match(add_one, mock_get_error_type) == Ok(inner=2)
>>> Ok("ok").match(double, mock_get_error_type) == Ok(inner='okok')
>>> Err(error=TypeError()).match(double, mock_get_error_type) == Ok(inner='TypeError')

Err.unwrap

Err.unwrap(self) -> 'None'

Unwrap the Err monad will raise the inner error.

>>> Err(error=TypeError()).unwrap() raise TypeError(...)

Stream

A lazy iterator with functional operations.

Stream.collect

Stream.collect(self) -> 'tuple'

Materialise the sequence from the Stream.

>>> stream = Stream.from_iterable([0, 1, 2, 3]).map(add_one)
>>> stream.collect() == (1, 2, 3, 4)

Stream.filter

Stream.filter(self, *fns: 'Callable[[T], bool]') -> 'Self'

Filter the stream based on a predicate. Will return a new Stream with the modified sequence.

>>> Stream.from_iterable([0, 1, 2, 3]).filter(lambda x: x % 2 == 0).collect() == (0, 2)

Simple functions can be passed in sequence to compose more complex filters

>>> Stream.from_iterable(range(20)).filter(divisible_by_3, divisible_by_5).collect() == (0, 15)

Stream.fold

Stream.fold(self, initial: 'T', fn: 'Callable[[T], U]', *, workers: 'int' = 1, use_threads: 'bool' = False) -> 'U'

Fold the results into a single value. fold triggers an action so will incur a collect.

>>> Stream.from_iterable([1, 2, 3, 4]).fold(0, lambda a, b: a + b) == 10
>>> Stream.from_iterable([[1], [2], [3], [4]]).fold([0], lambda a, b: a + b) == [0, 1, 2, 3, 4]
>>> Stream.from_iterable([1, 2, 3, 4]).fold(1, lambda a, b: a * b) == 24

As fold triggers an action, the parameters will be forwarded to the par_collect call if the workers are greater than 1. This will only effect the collect that is used to create the iterable to reduce, not the fold operation itself.

>>> Stream.from_iterable([1, 2, 3, 4]).map(some_expensive_fn).fold(0, add, workers=4, use_threads=False)

Stream.from_iterable

Stream.from_iterable(it: 'Iterable') -> 'Self'

This is the recommended way of creating a Stream object.

>>> Stream.from_iterable([0, 1, 2, 3]).collect() == (0, 1, 2, 3)

Stream.map

Stream.map(self, *fns: 'Callable[[T], U]') -> 'Self'

Map a function to the elements in the Stream. Will return a new Stream with the modified sequence.

>>> Stream.from_iterable([0, 1, 2, 3]).map(add_one).collect() == (1, 2, 3, 4)

This can also be mixed with safe functions:

>>> Stream.from_iterable([0, 1, 2, 3]).map(add_one).collect() == (Ok(inner=1), Ok(inner=2), Ok(inner=3), Ok(inner=4))

>>> @safe
... def two_div_value(x: float) -> float:
...     return 2 / x

>>> Stream.from_iterable([0, 1, 2, 4]).map(two_div_value).collect() == (Err(error=ZeroDivisionError('division by zero')), Ok(inner=2.0), Ok(inner=1.0), Ok(inner=0.5))

Simple functions can be passed in sequence to compose more complex transformations

>>> Stream.from_iterable(range(5)).map(mul_two, add_one).collect() == (1, 3, 5, 7, 9)

Stream.par_collect

Stream.par_collect(self, workers: 'int' = 4, *, use_threads: 'bool' = False) -> 'tuple'

Materialise the sequence from the Stream in parallel.

>>> stream = Stream.from_iterable([0, 1, 2, 3]).map(add_one)
>>> stream.par_collect() == (1, 2, 3, 4)

Use the workers arg to select the number of workers to use. Use -1 to use all available processors (except 1). Defaults to 4.

>>> stream = Stream.from_iterable([0, 1, 2, 3]).map(add_one)
>>> stream.par_collect(workers=-1) == (1, 2, 3, 4)

For smaller I/O bound tasks use the use_threads flag as True. If False the processing will use ProcessPoolExecutor else it will use ThreadPoolExecutor.

>>> stream = Stream.from_iterable([0, 1, 2, 3]).map(add_one)
>>> stream.par_collect(use_threads=True) == (1, 2, 3, 4)

Stream.partition

Stream.partition(self, fn: 'Callable[[T], bool]', *, workers: 'int' = 1, use_threads: 'bool' = False) -> 'tuple[Self, Self]'

Similar to filter except splits the True and False values. Will return a two new Stream with the partitioned sequences.

Each partition is independently replayable.

>>> part1, part2 = Stream.from_iterable([0, 1, 2, 3]).partition(lambda x: x % 2 == 0)
>>> part1.collect() == (0, 2)
>>> part2.collect() == (1, 3)

As partition triggers an action, the parameters will be forwarded to the par_collect call if the workers are greater than 1.

>>> Stream.from_iterable(range(10)).map(add_one, add_one).partition(divisible_by_3, workers=4)
>>> part1.map(add_one).par_collect() == (4, 7, 10)
>>> part2.collect() == (2, 4, 5, 7, 8, 10, 11)

safe

safe

safe(func: collections.abc.Callable[~P, ~T]) -> collections.abc.Callable[~P, danom._result.Result]

Decorator for functions that wraps the function in a try except returns Ok on success else Err.

>>> @safe
... def add_one(a: int) -> int:
...     return a + 1

>>> add_one(1) == Ok(inner=2)

safe_method

safe_method

safe_method(func: collections.abc.Callable[~P, ~T]) -> collections.abc.Callable[~P, danom._result.Result]

The same as safe except it forwards on the self of the class instance to the wrapped function.

>>> class Adder:
...     def __init__(self, result: int = 0) -> None:
...         self.result = result
...
...     @safe_method
...     def add_one(self, a: int) -> int:
...         return self.result + 1

>>> Adder.add_one(1) == Ok(inner=1)

compose

compose

compose(*fns: collections.abc.Callable[[T], U]) -> collections.abc.Callable[[T], U]

Compose multiple functions into one.

The functions will be called in sequence with the result of one being used as the input for the next.

>>> add_two = compose(add_one, add_one)
>>> add_two(0) == 2
>>> add_two = compose(add_one, add_one, is_even)
>>> add_two(0) == True

all_of

all_of

all_of(*fns: collections.abc.Callable[[T], bool]) -> collections.abc.Callable[[T], bool]

True if all of the given functions return True.

>>> is_valid_user = all_of(is_subscribed, is_active, has_2fa)
>>> is_valid_user(user) == True

any_of

any_of

any_of(*fns: collections.abc.Callable[[T], bool]) -> collections.abc.Callable[[T], bool]

True if any of the given functions return True.

>>> is_eligible = any_of(has_coupon, is_vip, is_staff)
>>> is_eligible(user) == True

identity

identity

identity(x: T) -> T

Basic identity function.

>>> identity("abc") == "abc"
>>> identity(1) == 1
>>> identity(ComplexDataType(a=1, b=2, c=3)) == ComplexDataType(a=1, b=2, c=3)

invert

invert

invert(func: collections.abc.Callable[~P, bool]) -> collections.abc.Callable[~P, bool]

Invert a boolean function so it returns False where it would've returned True.

>>> invert(has_len)("abc") == False
>>> invert(has_len)("") == True

new_type

new_type

new_type(name: 'str', base_type: 'type', validators: 'Callable | Sequence[Callable] | None' = None, converters: 'Callable | Sequence[Callable] | None' = None, *, frozen: 'bool' = True)

Create a NewType based on another type.

>>> def is_positive(value):
...     return value >= 0

>>> ValidBalance = new_type("ValidBalance", float, validators=[is_positive])
>>> ValidBalance("20") == ValidBalance(inner=20.0)

Unlike an inherited class, the type will not return True for an isinstance check.

>>> isinstance(ValidBalance(20.0), ValidBalance) == True
>>> isinstance(ValidBalance(20.0), float) == False

The methods of the given base_type will be forwarded to the specialised type. Alternatively the map method can be used to return a new type instance with the transformation.

>>> def has_len(email: str) -> bool:
... return len(email) > 0

>>> Email = new_type("Email", str, validators=[has_len])
>>> Email("some_email@domain.com").upper() == "SOME_EMAIL@DOMAIN.COM"
>>> Email("some_email@domain.com").map(str.upper) == Email(inner='SOME_EMAIL@DOMAIN.COM')

::

Repo map

├── .github
│   └── workflows
│       ├── ci_tests.yaml
│       └── publish.yaml
├── dev_tools
│   ├── __init__.py
│   ├── update_cov.py
│   └── update_readme.py
├── src
│   └── danom
│       ├── __init__.py
│       ├── _err.py
│       ├── _new_type.py
│       ├── _ok.py
│       ├── _result.py
│       ├── _safe.py
│       ├── _stream.py
│       └── _utils.py
├── tests
│   ├── __init__.py
│   ├── conftest.py
│   ├── test_api.py
│   ├── test_err.py
│   ├── test_new_type.py
│   ├── test_ok.py
│   ├── test_result.py
│   ├── test_safe.py
│   ├── test_stream.py
│   └── test_utils.py
├── .pre-commit-config.yaml
├── README.md
├── pyproject.toml
├── ruff.toml
└── uv.lock
::

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

danom-0.6.0.tar.gz (8.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

danom-0.6.0-py3-none-any.whl (13.8 kB view details)

Uploaded Python 3

File details

Details for the file danom-0.6.0.tar.gz.

File metadata

  • Download URL: danom-0.6.0.tar.gz
  • Upload date:
  • Size: 8.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for danom-0.6.0.tar.gz
Algorithm Hash digest
SHA256 824085c16f3f5e2fd91d793e5b90f39c0c60b72031da77eb34c7e0fd9b670f32
MD5 85f703f26d8a5a819cbfe9bb36471781
BLAKE2b-256 611b55daa4458e1a44cd682f1fecb96f62836974dd6c03c9fcaae26931fc9fce

See more details on using hashes here.

Provenance

The following attestation bundles were made for danom-0.6.0.tar.gz:

Publisher: publish.yaml on second-ed/danom

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file danom-0.6.0-py3-none-any.whl.

File metadata

  • Download URL: danom-0.6.0-py3-none-any.whl
  • Upload date:
  • Size: 13.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for danom-0.6.0-py3-none-any.whl
Algorithm Hash digest
SHA256 8df56167fc3c7473a6ac6ae41c89fc9f72e8cea1238e00e551077a6ae7584121
MD5 6072c8d487bef6296d59bf4260767735
BLAKE2b-256 51486bccbbc3c06657897ce095d1b1627bb99e78597409a2688bcf094e5bfeb0

See more details on using hashes here.

Provenance

The following attestation bundles were made for danom-0.6.0-py3-none-any.whl:

Publisher: publish.yaml on second-ed/danom

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page