Skip to main content

Add your description here

Project description

danom

PyPI Downloads

API Reference

Ok

Frozen instance of an Ok monad used to wrap successful operations.

Ok.and_then

Ok.and_then(self, func: collections.abc.Callable[[~T], danom._result.Result], **kwargs: dict) -> danom._result.Result

Pipe another function that returns a monad.

>>> Ok(1).and_then(add_one) == Ok(2)
>>> Ok(1).and_then(raise_err) == Err(error=TypeError())

Ok.is_ok

Ok.is_ok(self) -> Literal[True]

Returns True if the result type is Ok.

>>> Ok().is_ok() == True

Ok.match

Ok.match(self, if_ok_func: collections.abc.Callable[[~T], danom._result.Result], _if_err_func: collections.abc.Callable[[~T], danom._result.Result]) -> danom._result.Result

Map Ok func to Ok and Err func to Err

>>> Ok(1).match(add_one, mock_get_error_type) == Ok(inner=2)
>>> Ok("ok").match(double, mock_get_error_type) == Ok(inner='okok')
>>> Err(error=TypeError()).match(double, mock_get_error_type) == Ok(inner='TypeError')

Ok.unwrap

Ok.unwrap(self) -> ~T

Unwrap the Ok monad and get the inner value.

>>> Ok().unwrap() == None
>>> Ok(1).unwrap() == 1
>>> Ok("ok").unwrap() == 'ok'

Err

Frozen instance of an Err monad used to wrap failed operations.

Err.and_then

Err.and_then(self, _: 'Callable[[T], Result]', **_kwargs: 'dict') -> 'Self'

Pipe another function that returns a monad. For Err will return original error.

>>> Err(error=TypeError()).and_then(add_one) == Err(error=TypeError())
>>> Err(error=TypeError()).and_then(raise_value_err) == Err(error=TypeError())

Err.is_ok

Err.is_ok(self) -> 'Literal[False]'

Returns False if the result type is Err.

Err().is_ok() == False

Err.match

Err.match(self, _if_ok_func: 'Callable[[T], Result]', if_err_func: 'Callable[[T], Result]') -> 'Result'

Map Ok func to Ok and Err func to Err

>>> Ok(1).match(add_one, mock_get_error_type) == Ok(inner=2)
>>> Ok("ok").match(double, mock_get_error_type) == Ok(inner='okok')
>>> Err(error=TypeError()).match(double, mock_get_error_type) == Ok(inner='TypeError')

Err.unwrap

Err.unwrap(self) -> 'None'

Unwrap the Err monad will raise the inner error.

>>> Err(error=TypeError()).unwrap() raise TypeError(...)

Stream

A lazy iterator with functional operations.

Stream.collect

Stream.collect(self) -> 'tuple'

Materialise the sequence from the Stream.

>>> stream = Stream.from_iterable([0, 1, 2, 3]).map(add_one)
>>> stream.collect() == (1, 2, 3, 4)

Stream.filter

Stream.filter(self, *fns: 'Callable[[T], bool]') -> 'Self'

Filter the stream based on a predicate. Will return a new Stream with the modified sequence.

>>> Stream.from_iterable([0, 1, 2, 3]).filter(lambda x: x % 2 == 0).collect() == (0, 2)

Simple functions can be passed in sequence to compose more complex filters

>>> Stream.from_iterable(range(20)).filter(divisible_by_3, divisible_by_5).collect() == (0, 15)

Stream.from_iterable

Stream.from_iterable(it: 'Iterable') -> 'Self'

This is the recommended way of creating a Stream object.

>>> Stream.from_iterable([0, 1, 2, 3]).collect() == (0, 1, 2, 3)

Stream.map

Stream.map(self, *fns: 'Callable[[T], U]') -> 'Self'

Map a function to the elements in the Stream. Will return a new Stream with the modified sequence.

>>> Stream.from_iterable([0, 1, 2, 3]).map(add_one).collect() == (1, 2, 3, 4)

This can also be mixed with safe functions:

>>> Stream.from_iterable([0, 1, 2, 3]).map(add_one).collect() == (Ok(inner=1), Ok(inner=2), Ok(inner=3), Ok(inner=4))

>>> @safe
... def two_div_value(x: float) -> float:
...     return 2 / x

>>> Stream.from_iterable([0, 1, 2, 4]).map(two_div_value).collect() == (Err(error=ZeroDivisionError('division by zero')), Ok(inner=2.0), Ok(inner=1.0), Ok(inner=0.5))

Simple functions can be passed in sequence to compose more complex transformations

>>> Stream.from_iterable(range(5)).map(mul_two, add_one).collect() == (1, 3, 5, 7, 9)

Stream.partition

Stream.partition(self, fn: 'Callable[[T], bool]') -> 'tuple[Self, Self]'

Similar to filter except splits the True and False values. Will return a two new Stream with the partitioned sequences.

Each partition is independently replayable.

>>> part1, part2 = Stream.from_iterable([0, 1, 2, 3]).partition(lambda x: x % 2 == 0)
>>> part1.collect() == (0, 2)
>>> part2.collect() == (1, 3)

Stream.to_par_stream

Stream.to_par_stream(self) -> 'ParStream'

Convert Stream to ParStream. This will incur a collect.

>>> Stream.from_iterable([0, 1, 2, 3]).to_par_stream().map(some_expensive_cpu_task).collect() == (1, 2, 3, 4)

ParStream

A parallel iterator with functional operations.

ParStream.collect

ParStream.collect(self, workers: 'int' = 4, *, use_threads: 'bool' = False) -> 'tuple'

Materialise the sequence from the ParStream.

>>> stream = ParStream.from_iterable([0, 1, 2, 3]).map(add_one)
>>> stream.collect() == (1, 2, 3, 4)

Use the workers arg to select the number of workers to use. Use -1 to use all available processors (except 1). Defaults to 4.

>>> stream = ParStream.from_iterable([0, 1, 2, 3]).map(add_one)
>>> stream.collect(workers=-1) == (1, 2, 3, 4)

For smaller I/O bound tasks use the use_threads flag as True

>>> stream = ParStream.from_iterable([0, 1, 2, 3]).map(add_one)
>>> stream.collect(use_threads=True) == (1, 2, 3, 4)

ParStream.filter

ParStream.filter(self, *fns: 'Callable[[T], bool]') -> 'Self'

Filter the par stream based on a predicate. Will return a new ParStream with the modified sequence.

>>> ParStream.from_iterable([0, 1, 2, 3]).filter(lambda x: x % 2 == 0).collect() == (0, 2)

Simple functions can be passed in sequence to compose more complex filters

>>> ParStream.from_iterable(range(20)).filter(divisible_by_3, divisible_by_5).collect() == (0, 15)

ParStream.from_iterable

ParStream.from_iterable(it: 'Iterable') -> 'Self'

This is the recommended way of creating a ParStream object.

>>> ParStream.from_iterable([0, 1, 2, 3]).collect() == (0, 1, 2, 3)

ParStream.map

ParStream.map(self, *fns: 'Callable[[T], U]') -> 'Self'

Map functions to the elements in the ParStream in parallel. Will return a new ParStream with the modified sequence.

>>> ParStream.from_iterable([0, 1, 2, 3]).map(add_one, add_one).collect() == (2, 3, 4, 5)

ParStream.partition

ParStream.partition(self, _fn: 'Callable[[T], bool]') -> 'tuple[Self, Self]'

Partition isn't implemented for ParStream. Convert to Stream with the to_stream() method and then call partition.

ParStream.to_stream

ParStream.to_stream(self) -> 'Stream'

Convert ParStream to Stream. This will incur a collect.

>>> ParStream.from_iterable([0, 1, 2, 3]).to_stream().map(some_memory_hungry_task).collect() == (1, 2, 3, 4)

safe

safe

safe(func: collections.abc.Callable[~P, ~T]) -> collections.abc.Callable[~P, danom._result.Result]

Decorator for functions that wraps the function in a try except returns Ok on success else Err.

>>> @safe
... def add_one(a: int) -> int:
...     return a + 1

>>> add_one(1) == Ok(inner=2)

safe_method

safe_method

safe_method(func: collections.abc.Callable[~P, ~T]) -> collections.abc.Callable[~P, danom._result.Result]

The same as safe except it forwards on the self of the class instance to the wrapped function.

>>> class Adder:
...     def __init__(self, result: int = 0) -> None:
...         self.result = result
...
...     @safe_method
...     def add_one(self, a: int) -> int:
...         return self.result + 1

>>> Adder.add_one(1) == Ok(inner=1)

::

Repo map

├── .github
│   └── workflows
│       ├── ci_tests.yaml
│       └── publish.yaml
├── dev_tools
│   ├── __init__.py
│   └── update_readme.py
├── src
│   └── danom
│       ├── __init__.py
│       ├── _err.py
│       ├── _ok.py
│       ├── _result.py
│       ├── _safe.py
│       └── _stream.py
├── tests
│   ├── __init__.py
│   ├── test_api.py
│   ├── test_err.py
│   ├── test_ok.py
│   ├── test_result.py
│   ├── test_safe.py
│   └── test_stream.py
├── .pre-commit-config.yaml
├── README.md
├── pyproject.toml
├── ruff.toml
└── uv.lock
::

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

danom-0.3.0.tar.gz (5.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

danom-0.3.0-py3-none-any.whl (9.3 kB view details)

Uploaded Python 3

File details

Details for the file danom-0.3.0.tar.gz.

File metadata

  • Download URL: danom-0.3.0.tar.gz
  • Upload date:
  • Size: 5.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for danom-0.3.0.tar.gz
Algorithm Hash digest
SHA256 fd71e0fc352149213b34e347c035e3ea34bd1f0b280d2d6908f1cc8cf07e070b
MD5 adfc7dc04ea196518fe9605c8155ef35
BLAKE2b-256 6dab775b5f86eca7f6f0ad9cf8f72cce701b1a5750566636ec78722ce0f30f52

See more details on using hashes here.

Provenance

The following attestation bundles were made for danom-0.3.0.tar.gz:

Publisher: publish.yaml on second-ed/danom

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file danom-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: danom-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 9.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for danom-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 52e82e6e6c01b6ef5d7cea378bb3a0ea72bdb71e864f1486a8a4ee072fc6c961
MD5 cc4c9c35802156d7fc9872623e0e27de
BLAKE2b-256 f16a74b9866d5077793dadab3c390d5608883454e15465527462ef1e9c3e4dc3

See more details on using hashes here.

Provenance

The following attestation bundles were made for danom-0.3.0-py3-none-any.whl:

Publisher: publish.yaml on second-ed/danom

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page