Skip to main content

a toolbox with pythonic utils, tools

Project description

Pythonic toolbox

PyPI version License Supported Python versions Stability CodeQL Status Python3.6 Test Status SNYK Status

Table of Contents

README.md is auto generated by the script tests/generate_readme_markdown.py from testing files,

DO NOT EDIT DIRECTLY! ;)

python3 tests/generate_readme_markdown.py

Introduction

A python3.6+ toolbox with multi useful utils, functions, decorators in pythonic way, and is fully tested from python3.6 to python3.11 .

Installation

pip3 install pythonic-toolbox --upgrade

Usage

decorators

The decorators demos highlight reusable wrappers that harden function interfaces, making call sites more forgiving and resilient to transient failures.

ignore_unexpected_kwargs

Use ignore_unexpected_kwargs to accept forgiving keyword arguments without altering core logic or signatures.

import pytest
from pythonic_toolbox.decorators.common import ignore_unexpected_kwargs

# Following functions are named under Metasyntactic Variables, like:
# foobar, foo, bar, baz, qux, quux, quuz, corge,
# grault, garply, waldo, fred, plugh, xyzzy, thud

def foo(a, b=0, c=3):
    return a, b, c

dct = {'a': 1, 'b': 2, 'd': 4}
with pytest.raises(TypeError) as __:
    assert foo(**dct) == (1, 2, 3)

wrapped_foo = ignore_unexpected_kwargs(foo)
assert wrapped_foo(**dct) == (1, 2, 3)

assert wrapped_foo(0, 0, 0) == (0, 0, 0)
assert wrapped_foo(a=1, b=2, c=3) == (1, 2, 3)

@ignore_unexpected_kwargs
def bar(*args: int):
    return sum(args)

# should not change original behavior
assert bar(1, 2, 3) == 6
assert bar(1, 2, 3, unexpected='Gotcha') == 6
nums = [1, 2, 3]
assert bar(*nums, unexpected='Gotcha') == 6

@ignore_unexpected_kwargs
def qux(a, b, **kwargs):
    # function with Parameter.VAR_KEYWORD Aka **kwargs
    return a, b, kwargs.get('c', 3), kwargs.get('d', 4)

assert qux(**{'a': 1, 'b': 2, 'd': 4, 'e': 5}) == (1, 2, 3, 4)

class Person:
    @ignore_unexpected_kwargs
    def __init__(self, name, age, sex):
        self.name = name
        self.age = age
        self.sex = sex

    @classmethod
    @ignore_unexpected_kwargs
    def create(cls, name, age, sex):
        return cls(name, age, sex)

    @staticmethod
    @ignore_unexpected_kwargs
    def greetings(name):
        return f'Hello, I am {name}'

params = {
    'name': 'albert',
    'age': 34,
    'sex': 'male',
    'height': '170cm',
}
__ = Person(**params)
__ = Person('albert', 35, 'male', height='170cm')

# test cases for classmethod, staticmethod
__ = Person.create(**params)
assert Person.greetings(**params)

retry

retry wraps callables with configurable retry logic so transient errors can be retried transparently.

import pytest

from pythonic_toolbox.decorators.common import retry

# use decorator without any arguments, using retry default params
@retry
def func_fail_first_time():
    """func_fail_first_time"""
    self = func_fail_first_time
    if not hasattr(self, 'call_times'):
        # set attribute call_times for function, to count call times
        self.call_times = 0
    self.call_times += 1
    if self.call_times == 1:
        raise Exception('Fail when first called')
    return 'ok'

assert func_fail_first_time() == 'ok'
assert func_fail_first_time.call_times == 2
assert func_fail_first_time.__doc__ == 'func_fail_first_time'

@retry(tries=2, delay=0.1)  # use decorator with customized params
def func_fail_twice():
    """func_fail_twice"""
    self = func_fail_twice
    if not hasattr(self, 'call_times'):
        self.call_times = 0
    self.call_times += 1
    if self.call_times <= 2:
        raise Exception('Fail when called first, second time')
    return 'ok'

assert func_fail_twice() == 'ok'
assert func_fail_twice.call_times == 3
assert func_fail_twice.__doc__ == 'func_fail_twice'

@retry(tries=2, delay=0.1)
def func_fail_three_times():
    """func_fail_three_times"""
    self = func_fail_three_times
    if not hasattr(self, 'call_times'):
        self.call_times = 0
    self.call_times += 1
    if self.call_times <= 3:  # 1, 2, 3
        raise Exception('Fail when called first, second, third time')
    return 'ok'

with pytest.raises(Exception) as exec_info:
    func_fail_three_times()
assert func_fail_three_times.call_times == 3
assert exec_info.value.args[0] == 'Fail when called first, second, third time'

def raw_func_fail_first_time():
    """func_fail_first_time"""
    self = raw_func_fail_first_time
    if not hasattr(self, 'call_times'):
        # set attribute call_times for function, to count call times
        self.call_times = 0
    self.call_times += 1
    if self.call_times == 1:
        raise Exception('Fail when first called')
    return 'ok'

assert retry(raw_func_fail_first_time)() == 'ok'

# test cases when function has arguments, kwargs
@retry(tries=1, delay=0.1)
def func_fail_first_time_with_parameters(p1, p2):
    """func_fail_first_time"""
    self = func_fail_first_time_with_parameters
    if not hasattr(self, 'call_times'):
        # set attribute call_times for function, to count call times
        self.call_times = 0
    self.call_times += 1
    if self.call_times == 1:
        raise Exception('Fail when first called')
    return p1 + p2

assert func_fail_first_time_with_parameters(1, 2) == 3

def func_fail_first_time_with_parameters(p1, p2):
    """func_fail_first_time"""
    self = func_fail_first_time_with_parameters
    if not hasattr(self, 'call_times'):
        # set attribute call_times for function, to count call times
        self.call_times = 0
    self.call_times += 1
    if self.call_times == 1:
        raise Exception('Fail when first called')
    return p1 + p2

assert retry(tries=1, delay=0.1)(func_fail_first_time_with_parameters)(1, 2) == 3
assert retry(tries=1, delay=0.1)(func_fail_first_time_with_parameters)(p1=1, p2=2) == 3

import asyncio

@retry
async def async_func_fail_first_time():
    """async_func_fail_first_time"""
    self = async_func_fail_first_time
    if not hasattr(self, 'call_times'):
        self.call_times = 0
    self.call_times += 1
    if self.call_times == 1:
        raise Exception('Fail when first called')
    return 'ok'

@retry(delay=0.1)
async def async_func_fail_first_time2():
    """async_func_fail_first_time2"""
    self = async_func_fail_first_time2
    if not hasattr(self, 'call_times'):
        self.call_times = 0
    self.call_times += 1
    if self.call_times == 1:
        raise Exception('Fail when first called')
    return 'ok'

async def async_main():
    assert await async_func_fail_first_time() == 'ok'
    assert async_func_fail_first_time.__doc__ == 'async_func_fail_first_time'
    assert async_func_fail_first_time.call_times == 2
    assert await async_func_fail_first_time2() == 'ok'
    assert async_func_fail_first_time2.call_times == 2
    assert async_func_fail_first_time2.__doc__ == 'async_func_fail_first_time2'

loop = asyncio.get_event_loop()
if loop.is_closed():
    loop = asyncio.new_event_loop()
try:
    loop.run_until_complete(async_main())
finally:
    loop.close()

import random
fail_count = 0

@retry(delay=0.1)
async def always_fail_func():
    nonlocal fail_count
    fail_count += 1
    await asyncio.sleep(random.random())
    raise ValueError()

async def async_main_for_always_fail():
    nonlocal fail_count
    tasks = [always_fail_func() for i in range(0, 3)]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    assert all(map(lambda e: isinstance(e, ValueError), results))
    assert fail_count == 2 * 3  # each func run twice, three func calls

loop = asyncio.get_event_loop()
if loop.is_closed():
    loop = asyncio.new_event_loop()
try:
    loop.run_until_complete(async_main_for_always_fail())
finally:
    loop.close()

deque_utils

deque_utils focuses on ergonomic helpers for Python's double-ended queues, emphasizing efficient mutation patterns.

deque_pop_any

deque_pop_any removes the first matching element from a deque while preserving O(n) traversal semantics.

from collections import deque

import pytest
from pythonic_toolbox.utils.deque_utils import deque_pop_any

queue = deque([1, 2, 3, 4, 5])
assert deque_pop_any(queue, idx=1) == 2
assert queue == deque([1, 3, 4, 5])

# edge case: same as deque.popleft()
queue = deque([1, 2, 3, 4, 5])
assert deque_pop_any(queue, idx=0) == 1
assert queue == deque([2, 3, 4, 5])

# edge case: same as deque.popright()
queue = deque([1, 2, 3, 4, 5])
assert deque_pop_any(queue, idx=len(queue) - 1) == 5
assert queue == deque([1, 2, 3, 4])

queue = deque([1, 2, 3, 4, 5])
with pytest.raises(IndexError) as exec_info:
    deque_pop_any(queue, idx=102)

# edge case: pop from empty deque
queue = deque()
with pytest.raises(IndexError) as exec_info:
    deque_pop_any(queue, idx=0)
assert exec_info.value.args[0] == 'pop from empty deque'

deque_split

deque_split partitions a deque into multiple deques based on a predicate, keeping operations efficient for queue-like workloads.

import pytest

from collections import deque

from pythonic_toolbox.utils.deque_utils import deque_split

queue1, queue2 = deque_split(deque([1, 2, 3, 4, 5]), num=3)
assert queue1 == deque([1, 2, 3])
assert queue2 == deque([4, 5])

queue1, queue2 = deque_split(deque([1, 2, 3, 4, 5]), num=0)
assert queue1 == deque([])
assert queue2 == deque([1, 2, 3, 4, 5])

queue1, queue2 = deque_split(deque([1, 2, 3, 4, 5]), num=100)
assert queue1 == deque([1, 2, 3, 4, 5])
assert queue2 == deque([])

with pytest.raises(ValueError) as exec_info:
    deque_split(deque([1, 2, 3, 4, 5]), -1)
assert exec_info.value.args[0] == 'num must be integer: 0 <= num <= sys.maxsize'

dict_utils

The dict_utils section collects richer dictionary abstractions and traversal helpers for working with nested mappings.

DictObj

DictObj exposes dictionary keys as attributes, enabling dot-style access in dynamic data structures.

from copy import deepcopy

import pytest
from pythonic_toolbox.utils.dict_utils import DictObj

naive_dct = {
    'key1': 'val1',
    'key2': 'val2',
}

obj = DictObj(naive_dct)

# test basic functional methods like dict
assert len(obj) == 2
assert bool(obj) is True
# same behavior like ordinary dict according to the python version (FILO for popitem for 3.6+)
assert obj.popitem() == ('key2', 'val2')
assert obj.popitem() == ('key1', 'val1')
with pytest.raises(KeyError) as __:
    obj.popitem()

# a key can be treated like an attribute
# an attribute can be treated like a key
obj.key3 = 'val3'
assert obj.pop('key3') == 'val3'
with pytest.raises(KeyError) as __:
    obj.pop('key4')
obj.key5 = 'val5'
del obj.key5
with pytest.raises(KeyError) as __:
    obj.pop('key5')
with pytest.raises(AttributeError) as __:
    del obj.key5

# test deepcopy
obj = DictObj({'languages': ['Chinese', 'English']})
copied_obj = deepcopy(obj)
assert copied_obj == obj
copied_obj.languages = obj.languages + ['Japanese']
assert obj.languages == ['Chinese', 'English']
assert copied_obj.languages == ['Chinese', 'English', 'Japanese']
assert copied_obj != obj

person_dct = {'name': 'Albert', 'age': '34', 'sex': 'Male', 'languages': ['Chinese', 'English']}

person = DictObj(person_dct)
assert DictObj(person_dct) == DictObj(person_dct)
assert person.to_dict() == person_dct
assert set(person.keys()) == {'name', 'age', 'sex', 'languages'}
assert hasattr(person, 'name') is True
assert person.name == 'Albert'
assert person['name'] == 'Albert'
person.languages.append('Japanese')
assert person.languages == ['Chinese', 'English', 'Japanese']

person.height = '170'
assert person['height'] == '170'
assert 'height' in person
assert 'height' in person.keys()
assert hasattr(person, 'height') is True
del person['height']
assert 'height' not in person
assert 'height' not in person.keys()
person['height'] = '170cm'

person.update({'weight': '50'})
weight_val = person.pop('weight')
assert weight_val == '50'
person.update(DictObj({'weight': '50kg'}))
assert person.weight == '50kg'

expected = {
    'name': 'Albert', 'age': '34', 'sex': 'Male',
    'languages': ['Chinese', 'English', 'Japanese'],  # appended new language
    'height': '170cm',  # new added attribute
    'weight': '50kg',  # new added attribute
}
assert person.to_dict() == expected

repr_expected: str = ("{'name': 'Albert', 'age': '34', 'sex': 'Male', "
                      "'languages': ['Chinese', 'English', 'Japanese'],"
                      " 'height': '170cm', 'weight': '50kg'}")
assert repr(person) == repr_expected

# nested structure will be detected, and changed to DictObj
chessboard_data = {
    'position': [
        [{'name': 'knight'}, {'name': 'pawn'}],
        [{'name': 'pawn'}, {'name': 'queen'}],
    ]
}
chessboard_obj = DictObj(chessboard_data)
# test comparing instances of DictObj
assert DictObj(chessboard_data) == DictObj(chessboard_data)
assert isinstance(chessboard_obj.position, list)
assert len(chessboard_obj.position) == 2
assert isinstance(chessboard_obj.position[0][0], DictObj)
assert chessboard_obj.position[0][0].name == 'knight'
assert chessboard_obj.position[1][1].name == 'queen'

# edge case empty DictObj
empty_dict_obj = DictObj({})
assert len(empty_dict_obj) == 0
assert bool(empty_dict_obj) is False

obj_dict = DictObj({'data': 'oops'})
assert obj_dict.data == 'oops'

# params validation
invalid_key_dct = {
    1: '1',
}

# test when dict's key is not str
with pytest.raises(ValueError) as __:
    __ = DictObj(invalid_key_dct)

complicated_key_dct = {
    '1abc': 'Gotcha',  # '1abc' is not valid identifier for Python, so obj.1abc will cause SyntaxError
    'class': 'MyClass',  # 'class' is keyword in Python, so obj.class will cause SyntaxError
}

obj_dict = DictObj(complicated_key_dct)
assert obj_dict['1abc'] == 'Gotcha'
assert getattr(obj_dict, '1abc') == 'Gotcha'
# you can access '1abc' as attribute by adding prefix '_'
assert obj_dict._1abc == 'Gotcha'
del obj_dict._1abc

assert obj_dict['class'] == 'MyClass'
assert getattr(obj_dict, 'class') == 'MyClass'
# you can access 'class' as attribute by adding prefix '_'
assert obj_dict._class == 'MyClass'

# test re-assign new value for 'class'
obj_dict._class = 'MyClass2'
assert obj_dict._class == 'MyClass2'
assert obj_dict['class'] == 'MyClass2'
assert getattr(obj_dict, 'class') == 'MyClass2'
del obj_dict._class

# if assign new attributes (_2, _try), ObjDict will treat it like what the originally are
# this is fully considered by design, you're not encouraged to mess up keys
obj_dict._2x = 'NewAttr'
assert obj_dict._2x == 'NewAttr'
assert obj_dict['_2x'] == 'NewAttr'
with pytest.raises(KeyError):
    __ = obj_dict['2x']
with pytest.raises(AttributeError):
    __ = getattr(obj_dict, '2x')

obj_dict._try = 'NewAttr'
assert obj_dict._try == 'NewAttr'
assert obj_dict['_try'] == 'NewAttr'
with pytest.raises(KeyError):
    __ = obj_dict['NewAttr']
with pytest.raises(AttributeError):
    __ = getattr(obj_dict, 'NewAttr')

# Demo for messing up key 'class'
# delete and re-assign _class
complicated_key_dct = {
    'class': 'MyClass',  # 'class' is keyword in Python, so obj.class will cause SyntaxError
}
obj_dict = DictObj(complicated_key_dct)

assert obj_dict['class'] == 'MyClass'
obj_dict._class = 'MyClass2'
assert obj_dict['class'] == 'MyClass2'
del obj_dict._class

# obj_dict has no knowledge about 'class' or '_class'
# so '_class' is a brand-new attribute, and will be stored as '_class'
obj_dict._class = 'MyClass3'
with pytest.raises(KeyError):
    # Oops!!! by-design
    # 'class' cannot be accessed as key anymore,
    # because we store '_class' as key as other valid keys behave
    assert obj_dict['class'] == 'MyClass3'
assert obj_dict['_class'] == 'MyClass3'

# thread safe testing
import sys
from threading import Thread
from pythonic_toolbox.decorators.decorator_utils import method_synchronized

class MyObjDict(DictObj):
    # implement a thread-safe method to increase the value of cnt
    @method_synchronized
    def increase_cnt_by_n(self, n):
        self.cnt += n

def increase_cnt_by_100(dict_obj):
    for i in range(100):
        dict_obj.increase_cnt_by_n(1)

sw_interval = sys.getswitchinterval()
try:
    sys.setswitchinterval(0.0001)
    my_dict_obj = MyObjDict({'cnt': 0})
    threads = [Thread(target=increase_cnt_by_100, args=(my_dict_obj,)) for _ in range(100)]
    [t.start() for t in threads]
    [t.join() for t in threads]
    assert my_dict_obj.cnt == 10000
finally:
    sys.setswitchinterval(sw_interval)

# test copy/deepcopy of DictObj
import copy

person = DictObj({'name': 'albert', 'age': 33})
team = DictObj({'leader': person})
shallow_copy_of_team = copy.copy(team)
assert team.leader is shallow_copy_of_team.leader

deep_copy_of_team = copy.deepcopy(team)
assert team.leader is not deep_copy_of_team.leader
assert team.leader == deep_copy_of_team.leader

FinalDictObj

FinalDictObj freezes dictionaries after construction, safeguarding nested data against accidental mutation.

from typing import cast

import pytest
from pythonic_toolbox.utils.dict_utils import FinalDictObj

person_dct = {'name': 'Albert', 'age': '34', 'sex': 'Male', 'languages': ['Chinese', 'English']}

fixed_person = FinalDictObj(person_dct)
assert fixed_person.name == 'Albert'

# FINAL means once initialized, you cannot change the key/attribute anymore
with pytest.raises(RuntimeError) as exec_info:
    fixed_person.name = 'Steve'
expected_error_str = 'Cannot modify attribute/item in an already initialized FinalDictObj'
assert exec_info.value.args[0] == expected_error_str

with pytest.raises(RuntimeError) as __:
    fixed_person.popitem()

with pytest.raises(RuntimeError) as __:
    fixed_person.pop('name')

assert isinstance(fixed_person.languages, tuple)
with pytest.raises(AttributeError) as exec_info:
    # list values are changed into tuple to avoid being modified
    cast(list, fixed_person.languages).append('Japanese')
expected_error_str = "'tuple' object has no attribute 'append'"
assert exec_info.value.args[0] == expected_error_str
assert fixed_person.to_dict() == person_dct

# nested structure will be detected, and changed to FinalDictObj
chessboard_data = {
    'position': [
        [{'name': 'knight'}, {'name': 'pawn'}],
        [{'name': 'pawn'}, {'name': 'queen'}],
    ]
}
chessboard_obj = FinalDictObj(chessboard_data)
# test comparing instances of FinalDictObj
assert FinalDictObj(chessboard_data) == FinalDictObj(chessboard_data)
assert isinstance(chessboard_obj.position, tuple)
assert isinstance(chessboard_obj.position[0][0], FinalDictObj)
assert chessboard_obj.position[1][1].name == 'queen'
with pytest.raises(RuntimeError) as __:
    chessboard_obj.position[1][1].name = 'knight'

# test for keyword/non-identifier key as attribute
final_obj_dict = FinalDictObj({
    'class': 'MyClass',  # 'class' is keyword in Python, so obj.class will cause SyntaxError
})
assert final_obj_dict['class'] == 'MyClass'
assert getattr(final_obj_dict, 'class') == 'MyClass'
assert final_obj_dict._class == 'MyClass'

# test copy/deepcopy of FileDictObj
import copy
person = FinalDictObj({'name': 'albert', 'age': 33})
team = FinalDictObj({'leader': person})
shallow_copy_of_team = copy.copy(team)
assert team.leader is shallow_copy_of_team.leader
assert team.leader == shallow_copy_of_team.leader

deep_copy_of_team = copy.deepcopy(team)
assert team.leader is not deep_copy_of_team.leader
assert team.leader == deep_copy_of_team.leader

RangeKeyDict

RangeKeyDict associates lookup results with numeric ranges, yielding logarithmic-time queries backed by bisect searches.

import pytest
from pythonic_toolbox.utils.dict_utils import RangeKeyDict

# test normal case
range_key_dict: RangeKeyDict[float, str] = RangeKeyDict({
    (float('-inf'), 0): 'Negative',
    (0, 60): 'F',  # 0 <= val < 60
    (60, 70): 'D',  # 60 <= val < 70
    (70, 80): 'C',  # 70 <= val < 80
    (80, 90): 'B',  # 80 <= val < 90
    (90, 100): 'A',  # 90 <= val < 100
    100: 'A+',  # val == 100
})

# Big O of querying is O(log n), n is the number of ranges, due to using bisect inside
assert range_key_dict[-1] == 'Negative'
assert range_key_dict[0] == 'F'
assert range_key_dict[55] == 'F'
assert range_key_dict[60] == 'D'
assert range_key_dict[75] == 'C'
assert range_key_dict[85] == 'B'
assert range_key_dict[95] == 'A'
assert range_key_dict[100] == 'A+'

with pytest.raises(KeyError) as exec_info:
    _ = range_key_dict['95']  # when key is not comparable with other integer keys
assert exec_info.value.args[0] == "KeyError: '95' is not comparable with other keys"

with pytest.raises(KeyError) as exec_info:
    _ = range_key_dict[150]
assert exec_info.value.args[0] == 'KeyError: 150'

assert range_key_dict.get(150, 'N/A') == 'N/A'

# test comparison with other RangeKeyDict
assert RangeKeyDict({(0, 10): '1'}) == RangeKeyDict({(0, 10): '1'})
assert RangeKeyDict({(0, 10): '1'}) != RangeKeyDict({(0, 10): '2'})
assert RangeKeyDict({(0, 10): '1'}) != RangeKeyDict({(0, 1000): '1'})

with pytest.raises(ValueError):
    # [1, 1) is not a valid range
    # there's no value x satisfy 1 <= x < 1
    RangeKeyDict({(1, 1): '1'})

with pytest.raises(ValueError):
    # [1, -1) is not a valid range
    RangeKeyDict({(1, -1): '1'})

# validate input keys types and detect range overlaps(segment intersect)
with pytest.raises(ValueError) as exec_info:
    RangeKeyDict({
        (0, 10): 'val-between-0-and-10',
        (0, 5): 'val-between-0-and-5'
    })
expected_error_msg = ("Duplicated left boundary key 0 detected: "
                      "(0, 10): 'val-between-0-and-10', (0, 5): 'val-between-0-and-5'")
assert exec_info.value.args[0] == expected_error_msg

with pytest.raises(ValueError) as exec_info:
    RangeKeyDict({
        (0, 10): 'val-between-0-and-10',
        (5, 15): 'val-between-5-and-15'
    })
expected_error_msg = ("Overlap detected: "
                      "(0, 10): 'val-between-0-and-10', (5, 15): 'val-between-5-and-15'")
assert exec_info.value.args[0] == expected_error_msg

# test RangeKeyDict with no continuous ranges
range_key_dict: RangeKeyDict[float, str] = RangeKeyDict({
    (0, 60): 'F',  # 0 <= val < 60
    (70, 80): 'C',  # 70 <= val < 80
})

assert range_key_dict[10] == 'F'

with pytest.raises(KeyError) as exec_info:
    _ = range_key_dict[-100]
assert exec_info.value.args[0] == 'KeyError: -100'

with pytest.raises(KeyError) as exec_info:
    _ = range_key_dict[65]
assert exec_info.value.args[0] == 'KeyError: 65'

with pytest.raises(KeyError) as exec_info:
    _ = range_key_dict[100]
assert exec_info.value.args[0] == 'KeyError: 100'

from functools import total_ordering

@total_ordering
class Age:
    def __init__(self, val: float):
        if not isinstance(val, (int, float)):
            raise ValueError('Invalid age value')
        self.val = val

    def __le__(self, other):
        return self.val <= other.val

    def __repr__(self):
        return f'Age({repr(self.val)})'

    def __hash__(self):
        return hash(self.val)

age_categories_map: RangeKeyDict[Age, str] = RangeKeyDict({
    (Age(0), Age(2)): 'Baby',
    (Age(2), Age(15)): 'Children',
    (Age(15), Age(25)): 'Youth',
    (Age(25), Age(65)): 'Adults',
    (Age(65), Age(123)): 'Seniors',
})

assert age_categories_map[Age(0.5)] == 'Baby'
assert age_categories_map[Age(12)] == 'Children'
assert age_categories_map[Age(20)] == 'Youth'
assert age_categories_map[Age(35)] == 'Adults'
assert age_categories_map[Age(70)] == 'Seniors'

StrKeyIdDict

StrKeyIdDict assigns deterministic integer identifiers to string keys while maintaining bidirectional lookups.

import pytest
from pythonic_toolbox.utils.dict_utils import StrKeyIdDict

data = {1: 'a', 2: 'b', '3': 'c'}
my_dict = StrKeyIdDict(data)

# usage: value can be accessed by id (str: int-like/uuid-like/whatever) or id (int)
assert my_dict['1'] == my_dict[1] == 'a'
assert my_dict.keys() == {'1', '2', '3'}  # all keys are str type
my_dict['4'] = 'd'
assert my_dict['4'] == 'd'
my_dict[4] = 'd'
assert my_dict['4'] == 'd'
my_dict.update({4: 'd'})
assert my_dict['4'] == 'd'

# test comparing instances of the class
assert StrKeyIdDict(data) == StrKeyIdDict(data)
assert StrKeyIdDict(data) != StrKeyIdDict(dict(data, **{'4': 'd'}))
assert StrKeyIdDict(data) == {'1': 'a', '2': 'b', '3': 'c'}
assert StrKeyIdDict(data) != {'1': 'a', '2': 'b', '3': 'd'}
assert StrKeyIdDict(data) != {1: 'a', 2: 'b', 3: 'c'}  # StrKeyIdDict assumes all keys are strings

# test delete key
del my_dict[4]
assert my_dict.keys() == {'1', '2', '3'}  # '4' is not in the dict anymore

# assign value to an arbitrary string key that is not in the dict
my_dict.update({'some-uuid': 'something'})
assert my_dict['some-uuid'] == 'something'

with pytest.raises(TypeError):
    # key '1', 1 both stands for key '1',
    # so we get duplicated keys when initializing instance, oops!
    my_dict = StrKeyIdDict({'1': 'a', 1: 'A'})

assert my_dict.get(1) == 'a'
assert my_dict.get('NotExistKey') is None
assert my_dict.get('NotExistKey', 'NotExistValue') == 'NotExistValue'

# test edge cases
assert StrKeyIdDict() == {}

# test shallow copy
my_dict[5] = ['e1', 'e2', 'e3']
copy_dict = my_dict.copy()
copy_dict[1] = 'A'
assert my_dict[1] == 'a'
my_dict['5'].append('e4')
assert copy_dict['5'] == ['e1', 'e2', 'e3', 'e4']

# test deep copy
from copy import deepcopy

copy_dict = deepcopy(my_dict)
my_dict[5].append('e5')
assert my_dict['5'] == ['e1', 'e2', 'e3', 'e4', 'e5']
assert copy_dict[5] == ['e1', 'e2', 'e3', 'e4']

# test constructor
my_dict = StrKeyIdDict(uuid1='a', uuid2='b')
assert my_dict['uuid1'] == 'a'

# test constructor (from keys)
my_dict = StrKeyIdDict.fromkeys([1, 2, 3], None)
assert my_dict == {'1': None, '2': None, '3': None}
# test update and overwrite
my_dict.update(StrKeyIdDict({1: 'a', 2: 'b', 3: 'c', 4: 'd'}))
assert my_dict == {'1': 'a', '2': 'b', '3': 'c', '4': 'd'}

my_dict = StrKeyIdDict([(1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')])
assert my_dict['1'] == my_dict[1] == 'a'

# reassign StrKeyIdDict instance to another StrKeyIdDict instance
my_dict = StrKeyIdDict(my_dict)
assert my_dict == {'1': 'a', '2': 'b', '3': 'c', '4': 'd'}
assert dict(my_dict) == {'1': 'a', '2': 'b', '3': 'c', '4': 'd'}

# test case when "key" is "data", which is a reserved keyword inside StrKeyIdDict
my_dict = StrKeyIdDict({'data': 'data_value', '1': 'a'})
assert my_dict['data'] == 'data_value'
assert my_dict['1'] == 'a'
# delete key 'data', should not affect other keys
del my_dict['data']
assert my_dict['1'] == 'a'

collect_leaves

collect_leaves traverses nested dictionaries and gathers terminal values into a flat structure.

from pythonic_toolbox.utils.dict_utils import collect_leaves

# a nested dict-like struct
my_dict = {
    'node_1': {
        'node_1_1': {
            'node_1_1_1': 'A',
        },
        'node_1_2': {
            'node_1_2_1': 'B',
            'node_1_2_2': 'C',
            'node_1_2_3': None,
        },
        'node_1_3': [  # dict list
            {
                'node_1_3_1_1': 'D',
                'node_1_3_1_2': 'E',
            },
            {
                'node_1_3_2_1': 'FF',
                'node_1_3_2_2': 'GG',
            }
        ]
    }}

expected = ['A', 'B', 'C', None, 'D', 'E', 'FF', 'GG']
assert collect_leaves(my_dict) == expected

expected = ['A', 'B', 'C', 'D', 'E', 'FF', 'GG']
assert collect_leaves(my_dict, leaf_pred=lambda lf: lf) == expected

assert collect_leaves(my_dict, keypath_pred=lambda kp: len(kp) == 1) == []

expected = ['B', 'C']
assert collect_leaves(my_dict, keypath_pred=lambda kp: kp[-1] in {'node_1_2_1', 'node_1_2_2'}) == expected

expected = ['C']
assert collect_leaves(my_dict, leaf_pred=lambda lf: lf == 'C') == expected
assert collect_leaves(my_dict,
                      keypath_pred=lambda kp: kp[-1] == 'node_1_2_2',
                      leaf_pred=lambda lf: lf == 'C') == expected

assert collect_leaves(my_dict,
                      keypath_pred=lambda kp: kp[-1] == 'node_1_1_1',
                      leaf_pred=lambda lf: lf == 'C') == []

expected = ['D', 'E', 'FF', 'GG']
assert collect_leaves(my_dict,
                      keypath_pred=lambda kp: len(kp) >= 2 and kp[-2] == 'node_1_3') == expected

expected = ['FF', 'GG']
assert collect_leaves(my_dict,
                      keypath_pred=lambda kp: len(kp) >= 2 and kp[-2] == 'node_1_3',
                      leaf_pred=lambda lf: isinstance(lf, str) and len(lf) == 2) == expected

# edge cases
assert collect_leaves([]) == []
assert collect_leaves({}) == []
assert collect_leaves(None) == []

dict_until

dict_until repeatedly applies mutations to a mapping until a predicate signals completion.

from pythonic_toolbox.utils.dict_utils import dict_until

data = {'full_name': 'Albert Lee', 'pen_name': None}
assert dict_until(data, keys=['name', 'full_name']) == 'Albert Lee'
assert dict_until(data, keys=['full_name', 'name']) == 'Albert Lee'
assert dict_until(data, keys=['name', 'english_name']) is None
assert dict_until(data, keys=['name', 'english_name'], default='anonymous') == 'anonymous'
# test when pen_name is set None on purpose
assert dict_until(data, keys=['pen_name'], default='anonymous') is None
# test when value with None value is not acceptable
assert dict_until(data, keys=['pen_name'], terminate=lambda x: x is not None, default='anonymous') == 'anonymous'

select_list_of_dicts

select_list_of_dicts filters lists of dictionaries using expressive selection predicates.

from pythonic_toolbox.utils.dict_utils import select_list_of_dicts

dict_lst = [
    {'name': 'Tony Stark', 'sex': 'male', 'age': 49, 'alias': 'Iron Man'},
    {'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
    # another Peter Parker from multiverse
    {'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
    # age unknown for Carol Danvers, no age field
    {'name': 'Carol Danvers', 'sex': 'female', 'alias': 'Captain Marvel'},
    {'name': 'Natasha Romanoff', 'sex': 'female', 'age': 35, 'alias': 'Black Widow'},
]

assert select_list_of_dicts(dict_lst, look_like={'name': 'Peter Parker'}) == [
    {'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
    {'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'}]

assert select_list_of_dicts(dict_lst, look_like={'sex': 'female'}) == [
    {'name': 'Carol Danvers', 'sex': 'female', 'alias': 'Captain Marvel'},
    {'name': 'Natasha Romanoff', 'sex': 'female', 'age': 35, 'alias': 'Black Widow'}]

assert select_list_of_dicts(dict_lst, look_like={'sex': 'female'}, keys=['name']) == [
    {'name': 'Carol Danvers'}, {'name': 'Natasha Romanoff'}]

# unique is supported for return list
assert select_list_of_dicts(dict_lst, look_like={'sex': 'male'}, keys=['name', 'age']) == [
    {'name': 'Tony Stark', 'age': 49},
    {'name': 'Peter Parker', 'age': 16},
    {'name': 'Peter Parker', 'age': 16},
]

assert select_list_of_dicts(dict_lst, look_like={'sex': 'male'}, keys=['name', 'age'], unique=True) == [
    {'name': 'Tony Stark', 'age': 49},
    {'name': 'Peter Parker', 'age': 16}]

# dict keys are ordered as the keys passed-in
assert list(select_list_of_dicts(dict_lst, keys=['name', 'age'], unique=True)[0].keys()) == ['name', 'age']
assert list(select_list_of_dicts(dict_lst, keys=['age', 'name'], unique=True)[0].keys()) == ['age', 'name']

# locate Captain Marvel, with default val for missing key
assert select_list_of_dicts(dict_lst,
                            look_like={'alias': 'Captain Marvel'},
                            keys=['name', 'sex', 'age', 'alias'],
                            val_for_missing_key='Unknown')[0]['age'] == 'Unknown'

# edge cases, get the original dict
assert select_list_of_dicts([]) == []
assert select_list_of_dicts(dict_lst) == dict_lst

# new list of dicts is returned, leaving the original list of dicts untouched
black_widow = select_list_of_dicts(dict_lst, look_like={'name': 'Natasha Romanoff'})[0]
black_widow['age'] += 1
assert black_widow['age'] == 36
# we don't modify the original dict data, Natasha is always 35 years old
assert select_list_of_dicts(dict_lst, look_like={'name': 'Natasha Romanoff'})[0]['age'] == 35

# preds provide more flexibility, filter the ones with age info
assert len(select_list_of_dicts(dict_lst, preds=[lambda d: 'age' in d, lambda d: d['age'] >= 0])) == 4
assert len(select_list_of_dicts(dict_lst, preds=[lambda d: 'age' in d, lambda d: d['age'] >= 0], unique=True)) == 3

# combine look_like and preds parameters
expected = [{'name': 'Tony Stark', 'sex': 'male', 'age': 49, 'alias': 'Iron Man'}]
assert select_list_of_dicts(dict_lst, look_like={'sex': 'male'},
                            preds=[lambda d: 'age' in d, lambda d: d['age'] > 20]) == expected

# empty list is returned if no dict matches the criteria
assert select_list_of_dicts(dict_lst, look_like={'sex': 'male'},
                            preds=[lambda d: 'sex' in d and d['sex'] == 'female']) == []

unique_list_of_dicts

unique_list_of_dicts collapses dictionaries into a unique list based on configurable identity keys.

from pythonic_toolbox.utils.dict_utils import unique_list_of_dicts

dict_lst = [
    {'name': 'Tony Stark', 'sex': 'male', 'age': 49, 'alias': 'Iron Man'},
    {'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
    # Peter Parkers from multiverse in same age.
    {'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
    {'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
    # test same dict, but the order of dict is different
    {'name': 'Peter Parker', 'sex': 'male', 'alias': 'Spider Man', 'age': 16},
]

# Only one Peter Parker will be kept, for all data are exactly same.
assert unique_list_of_dicts(dict_lst) == [
    {'name': 'Tony Stark', 'sex': 'male', 'age': 49, 'alias': 'Iron Man'},
    {'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
]

# edge cases
assert unique_list_of_dicts([]) == []

walk_leaves

walk_leaves yields a generator over nested key paths and leaf values for introspection-heavy workflows.

from pythonic_toolbox.utils.dict_utils import walk_leaves

data = {
    'k1': {
        'k1_1': 1,
        'k1_2': 2,
    },
    'k2': 'N/A',  # stands for not available
}

expected = {
    'k1': {
        'k1_1': 2,
        'k1_2': 4,
    },
    'k2': 'N/A',  # stands for not available
}
assert walk_leaves(data) == data  # no transform function provided, just a deepcopy
assert walk_leaves(data, trans_fun=lambda x: x * 2 if isinstance(x, int) else x) == expected

# if inplace is set True, will change data inplace, return nothing
assert walk_leaves(data, trans_fun=lambda x: x * 2 if isinstance(x, int) else x, inplace=True) is None
assert data == expected

data = [{'name': 'lml', 'age': 33}, {'name': 'albert', 'age': 18}]
expected = [{'name': 'lml', 'age': 66}, {'name': 'albert', 'age': 36}]
assert walk_leaves(data, trans_fun=lambda x: x * 2 if isinstance(x, int) else x) == expected
assert walk_leaves(data, trans_fun=lambda x: x * 2 if isinstance(x, int) else x, inplace=True) is None
assert data == expected

# edge cases
assert walk_leaves(None) is None
assert walk_leaves([]) == []
assert walk_leaves({}) == {}
assert walk_leaves(None, inplace=True) is None
assert walk_leaves([], inplace=True) is None
assert walk_leaves({}, inplace=True) is None

functional_utils

functional_utils gathers lightweight functional-programming inspired helpers that compose common iterable transformations.

filter_multi

filter_multi composes multiple predicates for iterative filtering, with lfilter_multi providing a list materialization helper.

from pythonic_toolbox.utils.functional_utils import lfilter_multi, filter_multi
from collections.abc import Iterable

def is_even(x):
    return x % 2 == 0

def is_divisible_by_5(x):
    return x % 5 == 0

# select numbers which are divisible by 2 and 5
assert lfilter_multi([is_even, is_divisible_by_5], range(1, 30)) == [10, 20]
assert lfilter_multi([is_even, is_divisible_by_5], [5, 10, 15, 20]) == [10, 20]

from itertools import count, takewhile
# if you want to pass an iterator, make sure the iterator will end/break,
# Note: a bare count(start=0, step=2) will generate number like 0, 2, 4, 6, .... (never ends)
even_numbers_less_equal_than_50 = takewhile(lambda x: x <= 50, count(start=0, step=2))
expected = [0, 10, 20, 30, 40, 50]
assert lfilter_multi([is_even, is_divisible_by_5], even_numbers_less_equal_than_50) == expected

# testing for filter_multi, not converted to list directly
num_iterator = filter_multi([is_even, is_divisible_by_5], [5, 10, 15, 20])
assert type(num_iterator) is filter
assert isinstance(num_iterator, Iterable)
expected = [10, 20]
for idx, value in enumerate(num_iterator):
    assert value == expected[idx]

# when items are infinite, choose filter_multi instead of lfilter_multi
expected = [0, 10, 20, 30, 40, 50]
for idx, value in enumerate(filter_multi([is_even, is_divisible_by_5], count(start=0, step=1))):
    if value > 50:
        break
    else:
        assert value == expected[idx]

list_utils

Utilities in list_utils provide expressive patterns for curation, ordering, and restructuring of list data.

filter_allowable

filter_allowable retains items that match allowable values, whether they are literal matches or resolved dynamically.

from pythonic_toolbox.utils.list_utils import filter_allowable

fruits = ['apple', 'banana', 'orange']
vegetables = ['carrot', 'potato', 'tomato']
meats = ['beef', 'chicken', 'fish']

foods = fruits + vegetables + meats

assert list(filter_allowable(foods)) == foods
assert list(filter_allowable(foods, allow_list=[], block_list=[])) == foods
assert list(filter_allowable(foods, allow_list=['apple', 'banana', 'blueberry'])) == ['apple', 'banana']
assert list(filter_allowable(foods, allow_list=[], block_list=foods)) == []
assert list(filter_allowable(foods, block_list=meats)) == fruits + vegetables
assert list(filter_allowable(foods, allow_list=['apple'], block_list=[])) == ['apple']
assert list(filter_allowable(foods, allow_list=['apple'], block_list=['apple'])) == []
assert list(filter_allowable(foods + ['blueberry'], allow_list=[], block_list=foods)) == ['blueberry']
assert list(filter_allowable(['blueberry'], allow_list=[], block_list=[])) == ['blueberry']
assert list(filter_allowable(['blueberry'], allow_list=[], block_list=['apple', 'banana'])) == ['blueberry']
assert list(filter_allowable(['blueberry'], allow_list=['orange'], block_list=['apple', 'banana'])) == []

# test cases with parameter key
assert list(filter_allowable(foods, allow_list=['a', 'b'], key=lambda x: x[0])) == ['apple', 'banana', 'beef']

# test some basic cases
assert list(filter_allowable()) == []
assert list(filter_allowable(candidates=None)) == []
assert list(filter_allowable(candidates=[])) == []
assert list(filter_allowable(candidates=[], allow_list=[], block_list=[])) == []

sort_with_custom_orders

sort_with_custom_orders sorts sequences according to bespoke priority orders or fallback comparators.

from operator import itemgetter
from typing import List

import pytest
from pythonic_toolbox.utils.list_utils import sort_with_custom_orders

# basic usage
values = ['branch2', 'branch1', 'branch3', 'master', 'release']
expected = ['master', 'release', 'branch1', 'branch2', 'branch3']
assert sort_with_custom_orders(values, prefix_orders=['master', 'release']) == expected
assert sort_with_custom_orders(values, prefix_orders=['master', 'release'], reverse=True) == expected[::-1]

values = [1, 2, 3, 9, 9]
expected = [9, 9, 1, 2, 3]
assert sort_with_custom_orders(values, prefix_orders=[9, 8, 7]) == expected

values = [1, 2, 3, 9]
expected = [9, 2, 3, 1]
assert sort_with_custom_orders(values, prefix_orders=[9], suffix_orders=[1]) == expected

assert sort_with_custom_orders([]) == []
assert sort_with_custom_orders([], prefix_orders=[], suffix_orders=[]) == []
assert sort_with_custom_orders([], prefix_orders=['master']) == []

# tests for unhashable values
values = [[2, 2], [1, 1], [3, 3], [6, 0]]
assert sort_with_custom_orders(values, prefix_orders=[[3, 3]]) == [[3, 3], [1, 1], [2, 2], [6, 0]]
# if "key" is provided, items are sorted in order of key(item)
# items in prefix_orders/suffix_orders don't need to be one-one correspondence with items to sort
# sum([6]) == sum([3, 3]) == sum([6, 0])
assert sort_with_custom_orders(values, prefix_orders=[[6]], key=sum) == [[3, 3], [6, 0], [1, 1], [2, 2]]

# tests for list of dicts
values = [{2: 2}, {1: 1}, {1: 2}]
assert sort_with_custom_orders(values, prefix_orders=[{2: 2}],
                               key=lambda data: sum(data.values())) == [{2: 2}, {1: 2}, {1: 1}]

branch_info: List[dict] = [{'branch': 'master', 'commit_id': 'v1.2'}, {'branch': 'release', 'commit_id': 'v1.1'}]
# Assume that we prefer choosing branch in order: release > master > others (develop, hotfix etc.)
res = sort_with_custom_orders(branch_info,
                              prefix_orders=[{'branch': 'release'}, {'branch': 'master'}],
                              key=itemgetter('branch'))
expected = [{'branch': 'release', 'commit_id': 'v1.1'}, {'branch': 'master', 'commit_id': 'v1.2'}]
assert res == expected

branch_info = [{'branch': 'develop', 'commit_id': 'v1.3'}, {'branch': 'master', 'commit_id': 'v1.2'}]
res = sort_with_custom_orders(branch_info,
                              prefix_orders=[{'branch': 'release'}, {'branch': 'master'}],
                              key=itemgetter('branch'))
expected = [{'branch': 'master', 'commit_id': 'v1.2'}, {'branch': 'develop', 'commit_id': 'v1.3'}]
assert res == expected

# tests for exceptions
with pytest.raises(ValueError) as exec_info:
    sort_with_custom_orders([1, 2, 3], prefix_orders=[3], suffix_orders=[3])
assert exec_info.value.args[0] == 'prefix and suffix contains same value'

with pytest.raises(ValueError) as exec_info:
    sort_with_custom_orders([1, 2, 3], prefix_orders=[1, 1])
assert exec_info.value.args[0] == 'prefix_orders contains duplicated values'

# tests for class
class Person:
    def __init__(self, id, name, age):
        self.id = id
        self.name = name
        self.age = age

    def __lt__(self, other: 'Person'):
        return self.age < other.age

    def __eq__(self, other: 'Person'):
        return self.age == other.age

    def __hash__(self):
        return self.id

    def __str__(self):
        return f'Person({self.id}, {self.name}, {self.age})'

    def __repr__(self):
        return str(self)

Albert = Person(1, 'Albert', 28)
Alice = Person(2, 'Alice', 26)
Menglong = Person(3, 'Menglong', 33)

persons = [Albert, Alice, Menglong]
expected = [Alice, Albert, Menglong]
assert sort_with_custom_orders(persons) == expected

expected = [Menglong, Alice, Albert]
assert sort_with_custom_orders(persons, prefix_orders=[Menglong, Person(4, 'Anyone', 40)]) == expected

unpack_list

unpack_list unpacks nested iterables into positional variables with clear error reporting.

import pytest
from pythonic_toolbox.utils.list_utils import unpack_list

first, second, third = unpack_list(['a', 'b', 'c', 'd'], target_num=3)
assert first == 'a' and second == 'b' and third == 'c'

first, second, third = unpack_list(['a', 'b'], target_num=3, default=None)
assert first == 'a' and second == 'b' and third is None

first, second, third = unpack_list(range(1, 3), target_num=3, default=None)
assert first == 1 and second == 2 and third is None

first, second, third = unpack_list([], target_num=3, default=0)
assert first == second == third == 0

first, second, *rest = unpack_list(['a', 'b', 'c'], target_num=4, default='x')
assert first == 'a' and second == 'b' and rest == ['c', 'x']

# test case for type range
first, second, third = unpack_list(range(1, 3), target_num=3, default=None)
assert first == 1 and second == 2 and third is None

def fib():
    a, b = 0, 1
    while 1:
        yield a
        a, b = b, a + b

# test case for type generator
fib_generator = fib()  # generates data like [0, 1, 1, 2, 3, 5, 8, 13, 21 ...]
first, second, third, *rest = unpack_list(fib_generator, target_num=6)
assert first == 0 and second == 1 and third == 1
assert rest == [2, 3, 5]
seventh, eighth = unpack_list(fib_generator, target_num=2)
assert seventh == 8 and eighth == 13

# test edge case, nothing to unpack
empty = unpack_list([], target_num=0, default=None)
assert empty == []

res = unpack_list([], target_num=2, default=None)
assert res == [None, None]

empty = unpack_list(['a', 'b'], target_num=0, default=None)
assert empty == []

empty = unpack_list(range(0, 0), target_num=0)
assert empty == []

empty = unpack_list(iter([]), target_num=0, default=None)
assert empty == []

with pytest.raises(ValueError):
    # ValueError: not enough values to unpack (expected 3, got 2)
    first, second, third = unpack_list([1, 2], target_num=2)

until

until iterates through data until a stopping condition is met, mirroring familiar functional-programming patterns.

from itertools import count

from pythonic_toolbox.utils.list_utils import until

# basic usage
counter = count(1, 2)  # generator of odd numbers: 1, 3, 5, 7 ...
assert until(counter, lambda x: x > 10) == 11

assert until([1, 2, 3], lambda x: x > 10, default=11) == 11

# test case for when there's no default value and no item in the iterable satisfies the condition
assert until([1, 2, 3], lambda x: x > 10) is None

# edge cases
assert until([], default=3) == 3  # nothing provided, return default
assert until(None, lambda x: x > 10, default=11) == 11

# test case for when there's no item in the counter satisfies the condition
# the following codes will run forever, so comment them out
# counter = count(1, 2)  # generator of odd numbers: 1, 3, 5, 7 ...
# assert until(counter, lambda x: x % 2 == 0) is None

# test case for when max_iter_num is provided, only iterate the counter for max_iter_num times
counter = count(1, 2)  # generator of odd numbers: 1, 3, 5, 7 ...
assert until(counter, lambda x: x % 2 == 0, default=None, max_iter_num=100) is None

numbers = [1, 2, 3, 4, 5, 6]
assert until(numbers, lambda x: x >= 5, default=None, max_iter_num=1) is None
assert until(numbers, lambda x: x >= 5, default=None, max_iter_num=4) is None
assert until(numbers, lambda x: x >= 5, default=None, max_iter_num=5) == 5
assert until(numbers, lambda x: x >= 5, default=None, max_iter_num=100) == 5

string_utils

The string_utils module streamlines templating and value substitution when building dynamic strings.

substitute_string_template_dict

substitute_string_template_dict safely fills placeholders in string templates using dictionary-based parameters.

from unittest.mock import patch, PropertyMock

import pytest
from pythonic_toolbox.utils.string_utils import substitute_string_template_dict, CycleError

# simple usage
# both $variable ${variable} declarations are supported in string template format
str_template_dict = {
    'greeting': 'Good Morning, Everyone!',
    'first_name': 'Albert',
    'last_name': 'Lee',
    'full_name': '$first_name $last_name',
    'age': 34,
    'speech': '$greeting, I am $full_name, a ${age}-year-old programmer, very glad to meet you!'
}
output_dict = substitute_string_template_dict(str_template_dict)
assert output_dict['full_name'] == 'Albert Lee'
expected_speech = 'Good Morning, Everyone!, I am Albert Lee, a 34-year-old programmer, very glad to meet you!'
assert output_dict['speech'] == expected_speech

# complex usage, with dynamic values, and multi value-providing holders
str_template_dict = {
    'first_name': 'Daenerys',
    'last_name': 'Targaryen',
    'nick_name': 'Dany',
    'full_name': '$first_name $last_name',
    'speech': "$nick_name: I'm $full_name ($title1, $title2, $title3), it's $current_time_str, $greeting!",
}

variables_dict = {'title1': 'Queen of Meereen',
                  'title2': 'Mother of Dragons'}

class DynamicVariables:
    @property
    def current_time_str(self):
        import datetime
        return datetime.datetime.now().strftime("%H:%M:%S")

class DefaultUnknownTitle:
    """
    A class will always return UnknownTitle, when try to access attribute like
    title1, title2, ..., titleX
    """

    def __getattribute__(self, item):
        if isinstance(item, str) and item.startswith('title') and item[len(item) - 1:].isdigit():
            return 'UnknownTitle'
        return super(DefaultUnknownTitle, self).__getattribute__(item)

expected_speech = ("Dany: I'm Daenerys Targaryen (Queen of Meereen, Mother of Dragons, UnknownTitle), "
                   "it's 08:00:00, good morning everyone!")

# using mock to make DynamicVariables().current_time_str always return 08:00:00
with patch.object(DynamicVariables, 'current_time_str', return_value='08:00:00', new_callable=PropertyMock):
    output_dict = substitute_string_template_dict(str_template_dict, variables_dict, DynamicVariables(),
                                                  DefaultUnknownTitle(),
                                                  greeting='good morning everyone')
    assert output_dict['speech'] == expected_speech

# edge cases
assert substitute_string_template_dict({}) == {}

# cycle detection
str_template_dict = {
    'variable_a': 'Hello $variable_b',  # variable_a depends on variable_b
    'variable_b': 'Hello $variable_a',  # variable_b depends on variable_a, it's a cycle!
}

with pytest.raises(CycleError) as exec_info:
    substitute_string_template_dict(str_template_dict)

context

context demonstrates context managers that gracefully gate execution paths based on runtime conditions.

SkipContext

SkipContext conditionally suppresses execution within a context manager, ideal for pre-emptive locking or runtime flags.

import itertools

import pytest
from pythonic_toolbox.utils.context_utils import SkipContext

# Usage: define a class that inherits the SkipContext,
# and takes control of the skip or not logic
class MyWorkStation(SkipContext):

    def __init__(self, week_day: str):
        working_days = {'monday', 'tuesday', 'wednesday', 'thursday', 'friday'}
        weekends = {'saturday', 'sunday'}

        if week_day.lower() not in working_days.union(weekends):
            raise ValueError(f'Invalid weekday {week_day}')

        skip = True if week_day.lower() in weekends else False
        super(MyWorkStation, self).__init__(skip=skip)

seven_week_days = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
logged_opening_days = []
total_working_hours = 0

for cur_week_day in seven_week_days:
    # MyWorkStation will skip the code block when encountering weekends
    with MyWorkStation(week_day=cur_week_day):
        # log this working day
        logged_opening_days.append(cur_week_day)
        # accumulate working hours, 8 hours on each working day
        total_working_hours += 8

# only working days are logged
assert logged_opening_days == ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday']
assert total_working_hours == 8 * 5

# test basic SkipContext
count_iterator = itertools.count(start=0, step=1)

flg_skip = True
with SkipContext(skip=flg_skip):
    # if skip = True, all codes inside the context will be skipped(not executed)
    next(count_iterator)  # this will not be executed
    assert sum([1, 1]) == 3
    raise Exception('Codes will not be executed')

assert next(count_iterator) == 0  # check previous context is skipped

flg_skip = False
with SkipContext(skip=flg_skip):
    # codes will be executed as normal, if skip = False
    next(count_iterator)  # generate value 1
    assert sum([1, 1]) == 2

assert next(count_iterator) == 2  # check previous context is executed

with pytest.raises(Exception) as exec_info:
    with SkipContext(skip=False):
        # if skip = False, this SkipContextManager is transparent,
        # internal exception will be detected as normal
        raise Exception('MyError')
assert exec_info.value.args[0] == 'MyError'

# another example: ensure there will be only one job, who acquire the lock, run the increase +1

from multiprocessing import Manager, Pool
import time

from pythonic_toolbox.utils.context_utils import SkipContext


def plain_cronjob_increase(ns, lock):
    start = time.time()
    with lock:
        now = time.time()
        if now - start >= 0.5:
            pass
        else:
            ns.cnt += 1
            time.sleep(1)
    return ns.cnt


class PreemptiveLockContext(SkipContext):
    def __init__(self, lock):
        self.start_time = time.perf_counter()
        self.lock = lock
        self.acquired = self.lock.acquire(timeout=0.5)
        skip = not self.acquired
        super(PreemptiveLockContext, self).__init__(skip=skip)

    def __exit__(self, type, value, traceback):
        if self.acquired:
            time.sleep(1)
            self.lock.release()
        if type is None:
            return  # No exception
        else:
            if issubclass(type, self.SkipContentException):
                return True  # Suppress special SkipWithBlockException
            return False


def cronjob_increase(ns, lock):
    # for those who cannot acquire the lock within some time
    # this context block will be skipped, quite simple
    with PreemptiveLockContext(lock):
        ns.cnt += 1
    return ns.cnt



manager = Manager()
lock = manager.Lock()
ns = manager.Namespace()
pool = Pool(2)

ns.cnt = 0
processes = [pool.apply_async(plain_cronjob_increase, args=(ns, lock)) for __ in range(0, 2)]
result = [p.get() for p in processes]
assert result == [1, 1]
assert ns.cnt == 1

# reset global cnt=0
ns.cnt = 0
processes = [pool.apply_async(cronjob_increase, args=(ns, lock)) for __ in range(0, 2)]
result = [p.get() for p in processes]
assert result == [1, 1]
assert ns.cnt == 1

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pythonic-toolbox-1.1.40.tar.gz (57.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pythonic_toolbox-1.1.40-py3-none-any.whl (33.1 kB view details)

Uploaded Python 3

File details

Details for the file pythonic-toolbox-1.1.40.tar.gz.

File metadata

  • Download URL: pythonic-toolbox-1.1.40.tar.gz
  • Upload date:
  • Size: 57.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.8.0 pkginfo/1.10.0 readme-renderer/34.0 requests/2.27.1 requests-toolbelt/1.0.0 urllib3/1.26.18 tqdm/4.64.1 importlib-metadata/4.8.3 keyring/23.4.1 rfc3986/1.5.0 colorama/0.4.5 CPython/3.6.15

File hashes

Hashes for pythonic-toolbox-1.1.40.tar.gz
Algorithm Hash digest
SHA256 a6786239c13dbd929d74d0759c8039abd81ef84d859f4d99f600df09edf774d0
MD5 ca757286a98b1bba481f614f970efedc
BLAKE2b-256 1f156904e12e7c4c93194ccf7489f8033da0fd484a3801ae7bdc09198377b2ae

See more details on using hashes here.

File details

Details for the file pythonic_toolbox-1.1.40-py3-none-any.whl.

File metadata

  • Download URL: pythonic_toolbox-1.1.40-py3-none-any.whl
  • Upload date:
  • Size: 33.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.8.0 pkginfo/1.10.0 readme-renderer/34.0 requests/2.27.1 requests-toolbelt/1.0.0 urllib3/1.26.18 tqdm/4.64.1 importlib-metadata/4.8.3 keyring/23.4.1 rfc3986/1.5.0 colorama/0.4.5 CPython/3.6.15

File hashes

Hashes for pythonic_toolbox-1.1.40-py3-none-any.whl
Algorithm Hash digest
SHA256 76d4f8434d85f33e6dd64f2e93e2af63677713ff3dd675b4adbd106468e127dc
MD5 790e543c1c13d63458d58ec0a5cf5994
BLAKE2b-256 7c80d6a77888c0cdfcdc6b69d8b4a8fea63d530f2e0eef951ae14f9a4d65aa45

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page