Functional streams and monads
Project description
danom
API Reference
Ok
Frozen instance of an Ok monad used to wrap successful operations.
Ok.and_then
Ok.and_then(self, func: collections.abc.Callable[[~T], danom._result.Result], **kwargs: dict) -> danom._result.Result
Pipe another function that returns a monad.
>>> Ok(1).and_then(add_one) == Ok(2)
>>> Ok(1).and_then(raise_err) == Err(error=TypeError())
Ok.is_ok
Ok.is_ok(self) -> Literal[True]
Returns True if the result type is Ok.
>>> Ok().is_ok() == True
Ok.match
Ok.match(self, if_ok_func: collections.abc.Callable[[~T], danom._result.Result], _if_err_func: collections.abc.Callable[[~T], danom._result.Result]) -> danom._result.Result
Map Ok func to Ok and Err func to Err
>>> Ok(1).match(add_one, mock_get_error_type) == Ok(inner=2)
>>> Ok("ok").match(double, mock_get_error_type) == Ok(inner='okok')
>>> Err(error=TypeError()).match(double, mock_get_error_type) == Ok(inner='TypeError')
Ok.unwrap
Ok.unwrap(self) -> ~T
Unwrap the Ok monad and get the inner value.
>>> Ok().unwrap() == None
>>> Ok(1).unwrap() == 1
>>> Ok("ok").unwrap() == 'ok'
Err
Frozen instance of an Err monad used to wrap failed operations.
Err.and_then
Err.and_then(self, _: 'Callable[[T], Result]', **_kwargs: 'dict') -> 'Self'
Pipe another function that returns a monad. For Err will return original error.
>>> Err(error=TypeError()).and_then(add_one) == Err(error=TypeError())
>>> Err(error=TypeError()).and_then(raise_value_err) == Err(error=TypeError())
Err.is_ok
Err.is_ok(self) -> 'Literal[False]'
Returns False if the result type is Err.
Err().is_ok() == False
Err.match
Err.match(self, _if_ok_func: 'Callable[[T], Result]', if_err_func: 'Callable[[T], Result]') -> 'Result'
Map Ok func to Ok and Err func to Err
>>> Ok(1).match(add_one, mock_get_error_type) == Ok(inner=2)
>>> Ok("ok").match(double, mock_get_error_type) == Ok(inner='okok')
>>> Err(error=TypeError()).match(double, mock_get_error_type) == Ok(inner='TypeError')
Err.unwrap
Err.unwrap(self) -> 'None'
Unwrap the Err monad will raise the inner error.
>>> Err(error=TypeError()).unwrap() raise TypeError(...)
Stream
A lazy iterator with functional operations.
Stream.collect
Stream.collect(self) -> 'tuple'
Materialise the sequence from the Stream.
>>> stream = Stream.from_iterable([0, 1, 2, 3]).map(add_one)
>>> stream.collect() == (1, 2, 3, 4)
Stream.filter
Stream.filter(self, *fns: 'Callable[[T], bool]') -> 'Self'
Filter the stream based on a predicate. Will return a new Stream with the modified sequence.
>>> Stream.from_iterable([0, 1, 2, 3]).filter(lambda x: x % 2 == 0).collect() == (0, 2)
Simple functions can be passed in sequence to compose more complex filters
>>> Stream.from_iterable(range(20)).filter(divisible_by_3, divisible_by_5).collect() == (0, 15)
Stream.from_iterable
Stream.from_iterable(it: 'Iterable') -> 'Self'
This is the recommended way of creating a Stream object.
>>> Stream.from_iterable([0, 1, 2, 3]).collect() == (0, 1, 2, 3)
Stream.map
Stream.map(self, *fns: 'Callable[[T], U]') -> 'Self'
Map a function to the elements in the Stream. Will return a new Stream with the modified sequence.
>>> Stream.from_iterable([0, 1, 2, 3]).map(add_one).collect() == (1, 2, 3, 4)
This can also be mixed with safe functions:
>>> Stream.from_iterable([0, 1, 2, 3]).map(add_one).collect() == (Ok(inner=1), Ok(inner=2), Ok(inner=3), Ok(inner=4))
>>> @safe
... def two_div_value(x: float) -> float:
... return 2 / x
>>> Stream.from_iterable([0, 1, 2, 4]).map(two_div_value).collect() == (Err(error=ZeroDivisionError('division by zero')), Ok(inner=2.0), Ok(inner=1.0), Ok(inner=0.5))
Simple functions can be passed in sequence to compose more complex transformations
>>> Stream.from_iterable(range(5)).map(mul_two, add_one).collect() == (1, 3, 5, 7, 9)
Stream.par_collect
Stream.par_collect(self, workers: 'int' = 4, *, use_threads: 'bool' = False) -> 'tuple'
Materialise the sequence from the Stream in parallel.
>>> stream = Stream.from_iterable([0, 1, 2, 3]).map(add_one)
>>> stream.par_collect() == (1, 2, 3, 4)
Use the workers arg to select the number of workers to use. Use -1 to use all available processors (except 1).
Defaults to 4.
>>> stream = Stream.from_iterable([0, 1, 2, 3]).map(add_one)
>>> stream.par_collect(workers=-1) == (1, 2, 3, 4)
For smaller I/O bound tasks use the use_threads flag as True.
If False the processing will use ProcessPoolExecutor else it will use ThreadPoolExecutor.
>>> stream = Stream.from_iterable([0, 1, 2, 3]).map(add_one)
>>> stream.par_collect(use_threads=True) == (1, 2, 3, 4)
Stream.partition
Stream.partition(self, fn: 'Callable[[T], bool]', *, workers: 'int' = 1, use_threads: 'bool' = False) -> 'tuple[Self, Self]'
Similar to filter except splits the True and False values. Will return a two new Stream with the partitioned sequences.
Each partition is independently replayable.
>>> part1, part2 = Stream.from_iterable([0, 1, 2, 3]).partition(lambda x: x % 2 == 0)
>>> part1.collect() == (0, 2)
>>> part2.collect() == (1, 3)
As partition triggers an action, the parameters will be forwarded to the collect call if the workers are greater than 1.
>>> Stream.from_iterable(range(10)).map(add_one, add_one).partition(divisible_by_3, workers=4)
>>> part1.map(add_one).par_collect() == (4, 7, 10)
>>> part2.collect() == (2, 4, 5, 7, 8, 10, 11)
safe
safe
safe(func: collections.abc.Callable[~P, ~T]) -> collections.abc.Callable[~P, danom._result.Result]
Decorator for functions that wraps the function in a try except returns Ok on success else Err.
>>> @safe
... def add_one(a: int) -> int:
... return a + 1
>>> add_one(1) == Ok(inner=2)
safe_method
safe_method
safe_method(func: collections.abc.Callable[~P, ~T]) -> collections.abc.Callable[~P, danom._result.Result]
The same as safe except it forwards on the self of the class instance to the wrapped function.
>>> class Adder:
... def __init__(self, result: int = 0) -> None:
... self.result = result
...
... @safe_method
... def add_one(self, a: int) -> int:
... return self.result + 1
>>> Adder.add_one(1) == Ok(inner=1)
compose
compose
compose(*fns: collections.abc.Callable[[T], U]) -> collections.abc.Callable[[T], U]
Compose multiple functions into one.
The functions will be called in sequence with the result of one being used as the input for the next.
>>> add_two = compose(add_one, add_one)
>>> add_two(0) == 2
>>> add_two = compose(add_one, add_one, is_even)
>>> add_two(0) == True
all_of
all_of
all_of(*fns: collections.abc.Callable[[T], bool]) -> collections.abc.Callable[[T], bool]
True if all of the given functions return True.
>>> is_valid_user = all_of(is_subscribed, is_active, has_2fa)
>>> is_valid_user(user) == True
any_of
any_of
any_of(*fns: collections.abc.Callable[[T], bool]) -> collections.abc.Callable[[T], bool]
True if any of the given functions return True.
>>> is_eligible = any_of(has_coupon, is_vip, is_staff)
>>> is_eligible(user) == True
identity
identity
identity(x: T) -> T
Basic identity function.
>>> identity("abc") == "abc"
>>> identity(1) == 1
>>> identity(ComplexDataType(a=1, b=2, c=3)) == ComplexDataType(a=1, b=2, c=3)
invert
invert
invert(func: collections.abc.Callable[~P, bool]) -> collections.abc.Callable[~P, bool]
Invert a boolean function so it returns False where it would've returned True.
>>> invert(has_len)("abc") == False
>>> invert(has_len)("") == True
new_type
new_type
new_type(name: 'str', base_type: 'type', validators: 'Callable | Sequence[Callable] | None' = None, converters: 'Callable | Sequence[Callable] | None' = None, *, frozen: 'bool' = True)
Create a NewType based on another type.
>>> def is_positive(value):
... return value >= 0
>>> ValidBalance = new_type("ValidBalance", float, validators=[is_positive])
>>> ValidBalance("20") == ValidBalance(inner=20.0)
Unlike an inherited class, the type will not return True for an isinstance check.
>>> isinstance(ValidBalance(20.0), ValidBalance) == True
>>> isinstance(ValidBalance(20.0), float) == False
The methods of the given base_type will be forwarded to the specialised type.
Alternatively the map method can be used to return a new type instance with the transformation.
>>> def has_len(email: str) -> bool:
... return len(email) > 0
>>> Email = new_type("Email", str, validators=[has_len])
>>> Email("some_email@domain.com").upper() == "SOME_EMAIL@DOMAIN.COM"
>>> Email("some_email@domain.com").map(str.upper) == Email(inner='SOME_EMAIL@DOMAIN.COM')
::
Repo map
├── .github
│ └── workflows
│ ├── ci_tests.yaml
│ └── publish.yaml
├── dev_tools
│ ├── __init__.py
│ ├── update_cov.py
│ └── update_readme.py
├── src
│ └── danom
│ ├── __init__.py
│ ├── _err.py
│ ├── _new_type.py
│ ├── _ok.py
│ ├── _result.py
│ ├── _safe.py
│ ├── _stream.py
│ └── _utils.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── test_api.py
│ ├── test_err.py
│ ├── test_new_type.py
│ ├── test_ok.py
│ ├── test_result.py
│ ├── test_safe.py
│ ├── test_stream.py
│ └── test_utils.py
├── .pre-commit-config.yaml
├── README.md
├── pyproject.toml
├── ruff.toml
└── uv.lock
::
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file danom-0.5.0.tar.gz.
File metadata
- Download URL: danom-0.5.0.tar.gz
- Upload date:
- Size: 8.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c2ced1c92b74b4fd53fa9e256cd0834f4a28eec58e54fcf8f7398877eeaeeeba
|
|
| MD5 |
24e85df334e02e50d6d067923d57bf78
|
|
| BLAKE2b-256 |
5ad5534d5900cea61211a029d134dfd774986bca83c9e638957e8d9bf407c528
|
Provenance
The following attestation bundles were made for danom-0.5.0.tar.gz:
Publisher:
publish.yaml on second-ed/danom
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
danom-0.5.0.tar.gz -
Subject digest:
c2ced1c92b74b4fd53fa9e256cd0834f4a28eec58e54fcf8f7398877eeaeeeba - Sigstore transparency entry: 751494881
- Sigstore integration time:
-
Permalink:
second-ed/danom@76b750dc65aee431607fc4c3e447b6d429a2289b -
Branch / Tag:
refs/tags/v0.5.0 - Owner: https://github.com/second-ed
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yaml@76b750dc65aee431607fc4c3e447b6d429a2289b -
Trigger Event:
release
-
Statement type:
File details
Details for the file danom-0.5.0-py3-none-any.whl.
File metadata
- Download URL: danom-0.5.0-py3-none-any.whl
- Upload date:
- Size: 13.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4c7723e6700efb2befd7383cee37c61f40bef4d4c5798eb425399b0ea7e6cbd6
|
|
| MD5 |
41df766ae730d437359f945745991b99
|
|
| BLAKE2b-256 |
18f85a7125d5ffbdd8f8ac5ed18dccb18decb9a865a0eb5d50fa58dd0bde218a
|
Provenance
The following attestation bundles were made for danom-0.5.0-py3-none-any.whl:
Publisher:
publish.yaml on second-ed/danom
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
danom-0.5.0-py3-none-any.whl -
Subject digest:
4c7723e6700efb2befd7383cee37c61f40bef4d4c5798eb425399b0ea7e6cbd6 - Sigstore transparency entry: 751494916
- Sigstore integration time:
-
Permalink:
second-ed/danom@76b750dc65aee431607fc4c3e447b6d429a2289b -
Branch / Tag:
refs/tags/v0.5.0 - Owner: https://github.com/second-ed
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yaml@76b750dc65aee431607fc4c3e447b6d429a2289b -
Trigger Event:
release
-
Statement type: