a toolbox with pythonic utils, tools
Project description
Pythonic toolbox
Table of Contents
- Introduction
- Installation
- Usage
README.md is auto generated by the script tests/generate_readme_markdown.py from testing files,
DO NOT EDIT DIRECTLY! ;)
python3 tests/generate_readme_markdown.py
Introduction
A python3.6+ toolbox with multi useful utils, functions, decorators in pythonic way, and is fully tested from python3.6 to python3.11 .
Installation
pip3 install pythonic-toolbox --upgrade
Usage
decorators
The decorators demos highlight reusable wrappers that harden function interfaces, making call sites more forgiving and resilient to transient failures.
ignore_unexpected_kwargs
Use ignore_unexpected_kwargs to accept forgiving keyword arguments without altering core logic or signatures.
import pytest
from pythonic_toolbox.decorators.common import ignore_unexpected_kwargs
# Following functions are named under Metasyntactic Variables, like:
# foobar, foo, bar, baz, qux, quux, quuz, corge,
# grault, garply, waldo, fred, plugh, xyzzy, thud
def foo(a, b=0, c=3):
return a, b, c
dct = {'a': 1, 'b': 2, 'd': 4}
with pytest.raises(TypeError) as __:
assert foo(**dct) == (1, 2, 3)
wrapped_foo = ignore_unexpected_kwargs(foo)
assert wrapped_foo(**dct) == (1, 2, 3)
assert wrapped_foo(0, 0, 0) == (0, 0, 0)
assert wrapped_foo(a=1, b=2, c=3) == (1, 2, 3)
@ignore_unexpected_kwargs
def bar(*args: int):
return sum(args)
# should not change original behavior
assert bar(1, 2, 3) == 6
assert bar(1, 2, 3, unexpected='Gotcha') == 6
nums = [1, 2, 3]
assert bar(*nums, unexpected='Gotcha') == 6
@ignore_unexpected_kwargs
def qux(a, b, **kwargs):
# function with Parameter.VAR_KEYWORD Aka **kwargs
return a, b, kwargs.get('c', 3), kwargs.get('d', 4)
assert qux(**{'a': 1, 'b': 2, 'd': 4, 'e': 5}) == (1, 2, 3, 4)
class Person:
@ignore_unexpected_kwargs
def __init__(self, name, age, sex):
self.name = name
self.age = age
self.sex = sex
@classmethod
@ignore_unexpected_kwargs
def create(cls, name, age, sex):
return cls(name, age, sex)
@staticmethod
@ignore_unexpected_kwargs
def greetings(name):
return f'Hello, I am {name}'
params = {
'name': 'albert',
'age': 34,
'sex': 'male',
'height': '170cm',
}
__ = Person(**params)
__ = Person('albert', 35, 'male', height='170cm')
# test cases for classmethod, staticmethod
__ = Person.create(**params)
assert Person.greetings(**params)
retry
retry wraps callables with configurable retry logic so transient errors can be retried transparently.
import pytest
from pythonic_toolbox.decorators.common import retry
# use decorator without any arguments, using retry default params
@retry
def func_fail_first_time():
"""func_fail_first_time"""
self = func_fail_first_time
if not hasattr(self, 'call_times'):
# set attribute call_times for function, to count call times
self.call_times = 0
self.call_times += 1
if self.call_times == 1:
raise Exception('Fail when first called')
return 'ok'
assert func_fail_first_time() == 'ok'
assert func_fail_first_time.call_times == 2
assert func_fail_first_time.__doc__ == 'func_fail_first_time'
@retry(tries=2, delay=0.1) # use decorator with customized params
def func_fail_twice():
"""func_fail_twice"""
self = func_fail_twice
if not hasattr(self, 'call_times'):
self.call_times = 0
self.call_times += 1
if self.call_times <= 2:
raise Exception('Fail when called first, second time')
return 'ok'
assert func_fail_twice() == 'ok'
assert func_fail_twice.call_times == 3
assert func_fail_twice.__doc__ == 'func_fail_twice'
@retry(tries=2, delay=0.1)
def func_fail_three_times():
"""func_fail_three_times"""
self = func_fail_three_times
if not hasattr(self, 'call_times'):
self.call_times = 0
self.call_times += 1
if self.call_times <= 3: # 1, 2, 3
raise Exception('Fail when called first, second, third time')
return 'ok'
with pytest.raises(Exception) as exec_info:
func_fail_three_times()
assert func_fail_three_times.call_times == 3
assert exec_info.value.args[0] == 'Fail when called first, second, third time'
def raw_func_fail_first_time():
"""func_fail_first_time"""
self = raw_func_fail_first_time
if not hasattr(self, 'call_times'):
# set attribute call_times for function, to count call times
self.call_times = 0
self.call_times += 1
if self.call_times == 1:
raise Exception('Fail when first called')
return 'ok'
assert retry(raw_func_fail_first_time)() == 'ok'
# test cases when function has arguments, kwargs
@retry(tries=1, delay=0.1)
def func_fail_first_time_with_parameters(p1, p2):
"""func_fail_first_time"""
self = func_fail_first_time_with_parameters
if not hasattr(self, 'call_times'):
# set attribute call_times for function, to count call times
self.call_times = 0
self.call_times += 1
if self.call_times == 1:
raise Exception('Fail when first called')
return p1 + p2
assert func_fail_first_time_with_parameters(1, 2) == 3
def func_fail_first_time_with_parameters(p1, p2):
"""func_fail_first_time"""
self = func_fail_first_time_with_parameters
if not hasattr(self, 'call_times'):
# set attribute call_times for function, to count call times
self.call_times = 0
self.call_times += 1
if self.call_times == 1:
raise Exception('Fail when first called')
return p1 + p2
assert retry(tries=1, delay=0.1)(func_fail_first_time_with_parameters)(1, 2) == 3
assert retry(tries=1, delay=0.1)(func_fail_first_time_with_parameters)(p1=1, p2=2) == 3
import asyncio
@retry
async def async_func_fail_first_time():
"""async_func_fail_first_time"""
self = async_func_fail_first_time
if not hasattr(self, 'call_times'):
self.call_times = 0
self.call_times += 1
if self.call_times == 1:
raise Exception('Fail when first called')
return 'ok'
@retry(delay=0.1)
async def async_func_fail_first_time2():
"""async_func_fail_first_time2"""
self = async_func_fail_first_time2
if not hasattr(self, 'call_times'):
self.call_times = 0
self.call_times += 1
if self.call_times == 1:
raise Exception('Fail when first called')
return 'ok'
async def async_main():
assert await async_func_fail_first_time() == 'ok'
assert async_func_fail_first_time.__doc__ == 'async_func_fail_first_time'
assert async_func_fail_first_time.call_times == 2
assert await async_func_fail_first_time2() == 'ok'
assert async_func_fail_first_time2.call_times == 2
assert async_func_fail_first_time2.__doc__ == 'async_func_fail_first_time2'
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(async_main())
finally:
loop.close()
import random
fail_count = 0
@retry(delay=0.1)
async def always_fail_func():
nonlocal fail_count
fail_count += 1
await asyncio.sleep(random.random())
raise ValueError()
async def async_main_for_always_fail():
nonlocal fail_count
tasks = [always_fail_func() for i in range(0, 3)]
results = await asyncio.gather(*tasks, return_exceptions=True)
assert all(map(lambda e: isinstance(e, ValueError), results))
assert fail_count == 2 * 3 # each func run twice, three func calls
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(async_main_for_always_fail())
finally:
loop.close()
deque_utils
deque_utils focuses on ergonomic helpers for Python's double-ended queues, emphasizing efficient mutation patterns.
deque_pop_any
deque_pop_any removes the first matching element from a deque while preserving O(n) traversal semantics.
from collections import deque
import pytest
from pythonic_toolbox.utils.deque_utils import deque_pop_any
queue = deque([1, 2, 3, 4, 5])
assert deque_pop_any(queue, idx=1) == 2
assert queue == deque([1, 3, 4, 5])
# edge case: same as deque.popleft()
queue = deque([1, 2, 3, 4, 5])
assert deque_pop_any(queue, idx=0) == 1
assert queue == deque([2, 3, 4, 5])
# edge case: same as deque.popright()
queue = deque([1, 2, 3, 4, 5])
assert deque_pop_any(queue, idx=len(queue) - 1) == 5
assert queue == deque([1, 2, 3, 4])
queue = deque([1, 2, 3, 4, 5])
with pytest.raises(IndexError) as exec_info:
deque_pop_any(queue, idx=102)
# edge case: pop from empty deque
queue = deque()
with pytest.raises(IndexError) as exec_info:
deque_pop_any(queue, idx=0)
assert exec_info.value.args[0] == 'pop from empty deque'
deque_split
deque_split partitions a deque into multiple deques based on a predicate, keeping operations efficient for queue-like workloads.
import pytest
from collections import deque
from pythonic_toolbox.utils.deque_utils import deque_split
queue1, queue2 = deque_split(deque([1, 2, 3, 4, 5]), num=3)
assert queue1 == deque([1, 2, 3])
assert queue2 == deque([4, 5])
queue1, queue2 = deque_split(deque([1, 2, 3, 4, 5]), num=0)
assert queue1 == deque([])
assert queue2 == deque([1, 2, 3, 4, 5])
queue1, queue2 = deque_split(deque([1, 2, 3, 4, 5]), num=100)
assert queue1 == deque([1, 2, 3, 4, 5])
assert queue2 == deque([])
with pytest.raises(ValueError) as exec_info:
deque_split(deque([1, 2, 3, 4, 5]), -1)
assert exec_info.value.args[0] == 'num must be integer: 0 <= num <= sys.maxsize'
dict_utils
The dict_utils section collects richer dictionary abstractions and traversal helpers for working with nested mappings.
DictObj
DictObj exposes dictionary keys as attributes, enabling dot-style access in dynamic data structures.
from copy import deepcopy
import pytest
from pythonic_toolbox.utils.dict_utils import DictObj
naive_dct = {
'key1': 'val1',
'key2': 'val2',
}
obj = DictObj(naive_dct)
# test basic functional methods like dict
assert len(obj) == 2
assert bool(obj) is True
# same behavior like ordinary dict according to the python version (FILO for popitem for 3.6+)
assert obj.popitem() == ('key2', 'val2')
assert obj.popitem() == ('key1', 'val1')
with pytest.raises(KeyError) as __:
obj.popitem()
# a key can be treated like an attribute
# an attribute can be treated like a key
obj.key3 = 'val3'
assert obj.pop('key3') == 'val3'
with pytest.raises(KeyError) as __:
obj.pop('key4')
obj.key5 = 'val5'
del obj.key5
with pytest.raises(KeyError) as __:
obj.pop('key5')
with pytest.raises(AttributeError) as __:
del obj.key5
# test deepcopy
obj = DictObj({'languages': ['Chinese', 'English']})
copied_obj = deepcopy(obj)
assert copied_obj == obj
copied_obj.languages = obj.languages + ['Japanese']
assert obj.languages == ['Chinese', 'English']
assert copied_obj.languages == ['Chinese', 'English', 'Japanese']
assert copied_obj != obj
person_dct = {'name': 'Albert', 'age': '34', 'sex': 'Male', 'languages': ['Chinese', 'English']}
person = DictObj(person_dct)
assert DictObj(person_dct) == DictObj(person_dct)
assert person.to_dict() == person_dct
assert set(person.keys()) == {'name', 'age', 'sex', 'languages'}
assert hasattr(person, 'name') is True
assert person.name == 'Albert'
assert person['name'] == 'Albert'
person.languages.append('Japanese')
assert person.languages == ['Chinese', 'English', 'Japanese']
person.height = '170'
assert person['height'] == '170'
assert 'height' in person
assert 'height' in person.keys()
assert hasattr(person, 'height') is True
del person['height']
assert 'height' not in person
assert 'height' not in person.keys()
person['height'] = '170cm'
person.update({'weight': '50'})
weight_val = person.pop('weight')
assert weight_val == '50'
person.update(DictObj({'weight': '50kg'}))
assert person.weight == '50kg'
expected = {
'name': 'Albert', 'age': '34', 'sex': 'Male',
'languages': ['Chinese', 'English', 'Japanese'], # appended new language
'height': '170cm', # new added attribute
'weight': '50kg', # new added attribute
}
assert person.to_dict() == expected
repr_expected: str = ("{'name': 'Albert', 'age': '34', 'sex': 'Male', "
"'languages': ['Chinese', 'English', 'Japanese'],"
" 'height': '170cm', 'weight': '50kg'}")
assert repr(person) == repr_expected
# nested structure will be detected, and changed to DictObj
chessboard_data = {
'position': [
[{'name': 'knight'}, {'name': 'pawn'}],
[{'name': 'pawn'}, {'name': 'queen'}],
]
}
chessboard_obj = DictObj(chessboard_data)
# test comparing instances of DictObj
assert DictObj(chessboard_data) == DictObj(chessboard_data)
assert isinstance(chessboard_obj.position, list)
assert len(chessboard_obj.position) == 2
assert isinstance(chessboard_obj.position[0][0], DictObj)
assert chessboard_obj.position[0][0].name == 'knight'
assert chessboard_obj.position[1][1].name == 'queen'
# edge case empty DictObj
empty_dict_obj = DictObj({})
assert len(empty_dict_obj) == 0
assert bool(empty_dict_obj) is False
obj_dict = DictObj({'data': 'oops'})
assert obj_dict.data == 'oops'
# params validation
invalid_key_dct = {
1: '1',
}
# test when dict's key is not str
with pytest.raises(ValueError) as __:
__ = DictObj(invalid_key_dct)
complicated_key_dct = {
'1abc': 'Gotcha', # '1abc' is not valid identifier for Python, so obj.1abc will cause SyntaxError
'class': 'MyClass', # 'class' is keyword in Python, so obj.class will cause SyntaxError
}
obj_dict = DictObj(complicated_key_dct)
assert obj_dict['1abc'] == 'Gotcha'
assert getattr(obj_dict, '1abc') == 'Gotcha'
# you can access '1abc' as attribute by adding prefix '_'
assert obj_dict._1abc == 'Gotcha'
del obj_dict._1abc
assert obj_dict['class'] == 'MyClass'
assert getattr(obj_dict, 'class') == 'MyClass'
# you can access 'class' as attribute by adding prefix '_'
assert obj_dict._class == 'MyClass'
# test re-assign new value for 'class'
obj_dict._class = 'MyClass2'
assert obj_dict._class == 'MyClass2'
assert obj_dict['class'] == 'MyClass2'
assert getattr(obj_dict, 'class') == 'MyClass2'
del obj_dict._class
# if assign new attributes (_2, _try), ObjDict will treat it like what the originally are
# this is fully considered by design, you're not encouraged to mess up keys
obj_dict._2x = 'NewAttr'
assert obj_dict._2x == 'NewAttr'
assert obj_dict['_2x'] == 'NewAttr'
with pytest.raises(KeyError):
__ = obj_dict['2x']
with pytest.raises(AttributeError):
__ = getattr(obj_dict, '2x')
obj_dict._try = 'NewAttr'
assert obj_dict._try == 'NewAttr'
assert obj_dict['_try'] == 'NewAttr'
with pytest.raises(KeyError):
__ = obj_dict['NewAttr']
with pytest.raises(AttributeError):
__ = getattr(obj_dict, 'NewAttr')
# Demo for messing up key 'class'
# delete and re-assign _class
complicated_key_dct = {
'class': 'MyClass', # 'class' is keyword in Python, so obj.class will cause SyntaxError
}
obj_dict = DictObj(complicated_key_dct)
assert obj_dict['class'] == 'MyClass'
obj_dict._class = 'MyClass2'
assert obj_dict['class'] == 'MyClass2'
del obj_dict._class
# obj_dict has no knowledge about 'class' or '_class'
# so '_class' is a brand-new attribute, and will be stored as '_class'
obj_dict._class = 'MyClass3'
with pytest.raises(KeyError):
# Oops!!! by-design
# 'class' cannot be accessed as key anymore,
# because we store '_class' as key as other valid keys behave
assert obj_dict['class'] == 'MyClass3'
assert obj_dict['_class'] == 'MyClass3'
# thread safe testing
import sys
from threading import Thread
from pythonic_toolbox.decorators.decorator_utils import method_synchronized
class MyObjDict(DictObj):
# implement a thread-safe method to increase the value of cnt
@method_synchronized
def increase_cnt_by_n(self, n):
self.cnt += n
def increase_cnt_by_100(dict_obj):
for i in range(100):
dict_obj.increase_cnt_by_n(1)
sw_interval = sys.getswitchinterval()
try:
sys.setswitchinterval(0.0001)
my_dict_obj = MyObjDict({'cnt': 0})
threads = [Thread(target=increase_cnt_by_100, args=(my_dict_obj,)) for _ in range(100)]
[t.start() for t in threads]
[t.join() for t in threads]
assert my_dict_obj.cnt == 10000
finally:
sys.setswitchinterval(sw_interval)
# test copy/deepcopy of DictObj
import copy
person = DictObj({'name': 'albert', 'age': 33})
team = DictObj({'leader': person})
shallow_copy_of_team = copy.copy(team)
assert team.leader is shallow_copy_of_team.leader
deep_copy_of_team = copy.deepcopy(team)
assert team.leader is not deep_copy_of_team.leader
assert team.leader == deep_copy_of_team.leader
FinalDictObj
FinalDictObj freezes dictionaries after construction, safeguarding nested data against accidental mutation.
from typing import cast
import pytest
from pythonic_toolbox.utils.dict_utils import FinalDictObj
person_dct = {'name': 'Albert', 'age': '34', 'sex': 'Male', 'languages': ['Chinese', 'English']}
fixed_person = FinalDictObj(person_dct)
assert fixed_person.name == 'Albert'
# FINAL means once initialized, you cannot change the key/attribute anymore
with pytest.raises(RuntimeError) as exec_info:
fixed_person.name = 'Steve'
expected_error_str = 'Cannot modify attribute/item in an already initialized FinalDictObj'
assert exec_info.value.args[0] == expected_error_str
with pytest.raises(RuntimeError) as __:
fixed_person.popitem()
with pytest.raises(RuntimeError) as __:
fixed_person.pop('name')
assert isinstance(fixed_person.languages, tuple)
with pytest.raises(AttributeError) as exec_info:
# list values are changed into tuple to avoid being modified
cast(list, fixed_person.languages).append('Japanese')
expected_error_str = "'tuple' object has no attribute 'append'"
assert exec_info.value.args[0] == expected_error_str
assert fixed_person.to_dict() == person_dct
# nested structure will be detected, and changed to FinalDictObj
chessboard_data = {
'position': [
[{'name': 'knight'}, {'name': 'pawn'}],
[{'name': 'pawn'}, {'name': 'queen'}],
]
}
chessboard_obj = FinalDictObj(chessboard_data)
# test comparing instances of FinalDictObj
assert FinalDictObj(chessboard_data) == FinalDictObj(chessboard_data)
assert isinstance(chessboard_obj.position, tuple)
assert isinstance(chessboard_obj.position[0][0], FinalDictObj)
assert chessboard_obj.position[1][1].name == 'queen'
with pytest.raises(RuntimeError) as __:
chessboard_obj.position[1][1].name = 'knight'
# test for keyword/non-identifier key as attribute
final_obj_dict = FinalDictObj({
'class': 'MyClass', # 'class' is keyword in Python, so obj.class will cause SyntaxError
})
assert final_obj_dict['class'] == 'MyClass'
assert getattr(final_obj_dict, 'class') == 'MyClass'
assert final_obj_dict._class == 'MyClass'
# test copy/deepcopy of FileDictObj
import copy
person = FinalDictObj({'name': 'albert', 'age': 33})
team = FinalDictObj({'leader': person})
shallow_copy_of_team = copy.copy(team)
assert team.leader is shallow_copy_of_team.leader
assert team.leader == shallow_copy_of_team.leader
deep_copy_of_team = copy.deepcopy(team)
assert team.leader is not deep_copy_of_team.leader
assert team.leader == deep_copy_of_team.leader
RangeKeyDict
RangeKeyDict associates lookup results with numeric ranges, yielding logarithmic-time queries backed by bisect searches.
import pytest
from pythonic_toolbox.utils.dict_utils import RangeKeyDict
# test normal case
range_key_dict: RangeKeyDict[float, str] = RangeKeyDict({
(float('-inf'), 0): 'Negative',
(0, 60): 'F', # 0 <= val < 60
(60, 70): 'D', # 60 <= val < 70
(70, 80): 'C', # 70 <= val < 80
(80, 90): 'B', # 80 <= val < 90
(90, 100): 'A', # 90 <= val < 100
100: 'A+', # val == 100
})
# Big O of querying is O(log n), n is the number of ranges, due to using bisect inside
assert range_key_dict[-1] == 'Negative'
assert range_key_dict[0] == 'F'
assert range_key_dict[55] == 'F'
assert range_key_dict[60] == 'D'
assert range_key_dict[75] == 'C'
assert range_key_dict[85] == 'B'
assert range_key_dict[95] == 'A'
assert range_key_dict[100] == 'A+'
with pytest.raises(KeyError) as exec_info:
_ = range_key_dict['95'] # when key is not comparable with other integer keys
assert exec_info.value.args[0] == "KeyError: '95' is not comparable with other keys"
with pytest.raises(KeyError) as exec_info:
_ = range_key_dict[150]
assert exec_info.value.args[0] == 'KeyError: 150'
assert range_key_dict.get(150, 'N/A') == 'N/A'
# test comparison with other RangeKeyDict
assert RangeKeyDict({(0, 10): '1'}) == RangeKeyDict({(0, 10): '1'})
assert RangeKeyDict({(0, 10): '1'}) != RangeKeyDict({(0, 10): '2'})
assert RangeKeyDict({(0, 10): '1'}) != RangeKeyDict({(0, 1000): '1'})
with pytest.raises(ValueError):
# [1, 1) is not a valid range
# there's no value x satisfy 1 <= x < 1
RangeKeyDict({(1, 1): '1'})
with pytest.raises(ValueError):
# [1, -1) is not a valid range
RangeKeyDict({(1, -1): '1'})
# validate input keys types and detect range overlaps(segment intersect)
with pytest.raises(ValueError) as exec_info:
RangeKeyDict({
(0, 10): 'val-between-0-and-10',
(0, 5): 'val-between-0-and-5'
})
expected_error_msg = ("Duplicated left boundary key 0 detected: "
"(0, 10): 'val-between-0-and-10', (0, 5): 'val-between-0-and-5'")
assert exec_info.value.args[0] == expected_error_msg
with pytest.raises(ValueError) as exec_info:
RangeKeyDict({
(0, 10): 'val-between-0-and-10',
(5, 15): 'val-between-5-and-15'
})
expected_error_msg = ("Overlap detected: "
"(0, 10): 'val-between-0-and-10', (5, 15): 'val-between-5-and-15'")
assert exec_info.value.args[0] == expected_error_msg
# test RangeKeyDict with no continuous ranges
range_key_dict: RangeKeyDict[float, str] = RangeKeyDict({
(0, 60): 'F', # 0 <= val < 60
(70, 80): 'C', # 70 <= val < 80
})
assert range_key_dict[10] == 'F'
with pytest.raises(KeyError) as exec_info:
_ = range_key_dict[-100]
assert exec_info.value.args[0] == 'KeyError: -100'
with pytest.raises(KeyError) as exec_info:
_ = range_key_dict[65]
assert exec_info.value.args[0] == 'KeyError: 65'
with pytest.raises(KeyError) as exec_info:
_ = range_key_dict[100]
assert exec_info.value.args[0] == 'KeyError: 100'
from functools import total_ordering
@total_ordering
class Age:
def __init__(self, val: float):
if not isinstance(val, (int, float)):
raise ValueError('Invalid age value')
self.val = val
def __le__(self, other):
return self.val <= other.val
def __repr__(self):
return f'Age({repr(self.val)})'
def __hash__(self):
return hash(self.val)
age_categories_map: RangeKeyDict[Age, str] = RangeKeyDict({
(Age(0), Age(2)): 'Baby',
(Age(2), Age(15)): 'Children',
(Age(15), Age(25)): 'Youth',
(Age(25), Age(65)): 'Adults',
(Age(65), Age(123)): 'Seniors',
})
assert age_categories_map[Age(0.5)] == 'Baby'
assert age_categories_map[Age(12)] == 'Children'
assert age_categories_map[Age(20)] == 'Youth'
assert age_categories_map[Age(35)] == 'Adults'
assert age_categories_map[Age(70)] == 'Seniors'
StrKeyIdDict
StrKeyIdDict assigns deterministic integer identifiers to string keys while maintaining bidirectional lookups.
import pytest
from pythonic_toolbox.utils.dict_utils import StrKeyIdDict
data = {1: 'a', 2: 'b', '3': 'c'}
my_dict = StrKeyIdDict(data)
# usage: value can be accessed by id (str: int-like/uuid-like/whatever) or id (int)
assert my_dict['1'] == my_dict[1] == 'a'
assert my_dict.keys() == {'1', '2', '3'} # all keys are str type
my_dict['4'] = 'd'
assert my_dict['4'] == 'd'
my_dict[4] = 'd'
assert my_dict['4'] == 'd'
my_dict.update({4: 'd'})
assert my_dict['4'] == 'd'
# test comparing instances of the class
assert StrKeyIdDict(data) == StrKeyIdDict(data)
assert StrKeyIdDict(data) != StrKeyIdDict(dict(data, **{'4': 'd'}))
assert StrKeyIdDict(data) == {'1': 'a', '2': 'b', '3': 'c'}
assert StrKeyIdDict(data) != {'1': 'a', '2': 'b', '3': 'd'}
assert StrKeyIdDict(data) != {1: 'a', 2: 'b', 3: 'c'} # StrKeyIdDict assumes all keys are strings
# test delete key
del my_dict[4]
assert my_dict.keys() == {'1', '2', '3'} # '4' is not in the dict anymore
# assign value to an arbitrary string key that is not in the dict
my_dict.update({'some-uuid': 'something'})
assert my_dict['some-uuid'] == 'something'
with pytest.raises(TypeError):
# key '1', 1 both stands for key '1',
# so we get duplicated keys when initializing instance, oops!
my_dict = StrKeyIdDict({'1': 'a', 1: 'A'})
assert my_dict.get(1) == 'a'
assert my_dict.get('NotExistKey') is None
assert my_dict.get('NotExistKey', 'NotExistValue') == 'NotExistValue'
# test edge cases
assert StrKeyIdDict() == {}
# test shallow copy
my_dict[5] = ['e1', 'e2', 'e3']
copy_dict = my_dict.copy()
copy_dict[1] = 'A'
assert my_dict[1] == 'a'
my_dict['5'].append('e4')
assert copy_dict['5'] == ['e1', 'e2', 'e3', 'e4']
# test deep copy
from copy import deepcopy
copy_dict = deepcopy(my_dict)
my_dict[5].append('e5')
assert my_dict['5'] == ['e1', 'e2', 'e3', 'e4', 'e5']
assert copy_dict[5] == ['e1', 'e2', 'e3', 'e4']
# test constructor
my_dict = StrKeyIdDict(uuid1='a', uuid2='b')
assert my_dict['uuid1'] == 'a'
# test constructor (from keys)
my_dict = StrKeyIdDict.fromkeys([1, 2, 3], None)
assert my_dict == {'1': None, '2': None, '3': None}
# test update and overwrite
my_dict.update(StrKeyIdDict({1: 'a', 2: 'b', 3: 'c', 4: 'd'}))
assert my_dict == {'1': 'a', '2': 'b', '3': 'c', '4': 'd'}
my_dict = StrKeyIdDict([(1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')])
assert my_dict['1'] == my_dict[1] == 'a'
# reassign StrKeyIdDict instance to another StrKeyIdDict instance
my_dict = StrKeyIdDict(my_dict)
assert my_dict == {'1': 'a', '2': 'b', '3': 'c', '4': 'd'}
assert dict(my_dict) == {'1': 'a', '2': 'b', '3': 'c', '4': 'd'}
# test case when "key" is "data", which is a reserved keyword inside StrKeyIdDict
my_dict = StrKeyIdDict({'data': 'data_value', '1': 'a'})
assert my_dict['data'] == 'data_value'
assert my_dict['1'] == 'a'
# delete key 'data', should not affect other keys
del my_dict['data']
assert my_dict['1'] == 'a'
collect_leaves
collect_leaves traverses nested dictionaries and gathers terminal values into a flat structure.
from pythonic_toolbox.utils.dict_utils import collect_leaves
# a nested dict-like struct
my_dict = {
'node_1': {
'node_1_1': {
'node_1_1_1': 'A',
},
'node_1_2': {
'node_1_2_1': 'B',
'node_1_2_2': 'C',
'node_1_2_3': None,
},
'node_1_3': [ # dict list
{
'node_1_3_1_1': 'D',
'node_1_3_1_2': 'E',
},
{
'node_1_3_2_1': 'FF',
'node_1_3_2_2': 'GG',
}
]
}}
expected = ['A', 'B', 'C', None, 'D', 'E', 'FF', 'GG']
assert collect_leaves(my_dict) == expected
expected = ['A', 'B', 'C', 'D', 'E', 'FF', 'GG']
assert collect_leaves(my_dict, leaf_pred=lambda lf: lf) == expected
assert collect_leaves(my_dict, keypath_pred=lambda kp: len(kp) == 1) == []
expected = ['B', 'C']
assert collect_leaves(my_dict, keypath_pred=lambda kp: kp[-1] in {'node_1_2_1', 'node_1_2_2'}) == expected
expected = ['C']
assert collect_leaves(my_dict, leaf_pred=lambda lf: lf == 'C') == expected
assert collect_leaves(my_dict,
keypath_pred=lambda kp: kp[-1] == 'node_1_2_2',
leaf_pred=lambda lf: lf == 'C') == expected
assert collect_leaves(my_dict,
keypath_pred=lambda kp: kp[-1] == 'node_1_1_1',
leaf_pred=lambda lf: lf == 'C') == []
expected = ['D', 'E', 'FF', 'GG']
assert collect_leaves(my_dict,
keypath_pred=lambda kp: len(kp) >= 2 and kp[-2] == 'node_1_3') == expected
expected = ['FF', 'GG']
assert collect_leaves(my_dict,
keypath_pred=lambda kp: len(kp) >= 2 and kp[-2] == 'node_1_3',
leaf_pred=lambda lf: isinstance(lf, str) and len(lf) == 2) == expected
# edge cases
assert collect_leaves([]) == []
assert collect_leaves({}) == []
assert collect_leaves(None) == []
dict_until
dict_until repeatedly applies mutations to a mapping until a predicate signals completion.
from pythonic_toolbox.utils.dict_utils import dict_until
data = {'full_name': 'Albert Lee', 'pen_name': None}
assert dict_until(data, keys=['name', 'full_name']) == 'Albert Lee'
assert dict_until(data, keys=['full_name', 'name']) == 'Albert Lee'
assert dict_until(data, keys=['name', 'english_name']) is None
assert dict_until(data, keys=['name', 'english_name'], default='anonymous') == 'anonymous'
# test when pen_name is set None on purpose
assert dict_until(data, keys=['pen_name'], default='anonymous') is None
# test when value with None value is not acceptable
assert dict_until(data, keys=['pen_name'], terminate=lambda x: x is not None, default='anonymous') == 'anonymous'
select_list_of_dicts
select_list_of_dicts filters lists of dictionaries using expressive selection predicates.
from pythonic_toolbox.utils.dict_utils import select_list_of_dicts
dict_lst = [
{'name': 'Tony Stark', 'sex': 'male', 'age': 49, 'alias': 'Iron Man'},
{'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
# another Peter Parker from multiverse
{'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
# age unknown for Carol Danvers, no age field
{'name': 'Carol Danvers', 'sex': 'female', 'alias': 'Captain Marvel'},
{'name': 'Natasha Romanoff', 'sex': 'female', 'age': 35, 'alias': 'Black Widow'},
]
assert select_list_of_dicts(dict_lst, look_like={'name': 'Peter Parker'}) == [
{'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
{'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'}]
assert select_list_of_dicts(dict_lst, look_like={'sex': 'female'}) == [
{'name': 'Carol Danvers', 'sex': 'female', 'alias': 'Captain Marvel'},
{'name': 'Natasha Romanoff', 'sex': 'female', 'age': 35, 'alias': 'Black Widow'}]
assert select_list_of_dicts(dict_lst, look_like={'sex': 'female'}, keys=['name']) == [
{'name': 'Carol Danvers'}, {'name': 'Natasha Romanoff'}]
# unique is supported for return list
assert select_list_of_dicts(dict_lst, look_like={'sex': 'male'}, keys=['name', 'age']) == [
{'name': 'Tony Stark', 'age': 49},
{'name': 'Peter Parker', 'age': 16},
{'name': 'Peter Parker', 'age': 16},
]
assert select_list_of_dicts(dict_lst, look_like={'sex': 'male'}, keys=['name', 'age'], unique=True) == [
{'name': 'Tony Stark', 'age': 49},
{'name': 'Peter Parker', 'age': 16}]
# dict keys are ordered as the keys passed-in
assert list(select_list_of_dicts(dict_lst, keys=['name', 'age'], unique=True)[0].keys()) == ['name', 'age']
assert list(select_list_of_dicts(dict_lst, keys=['age', 'name'], unique=True)[0].keys()) == ['age', 'name']
# locate Captain Marvel, with default val for missing key
assert select_list_of_dicts(dict_lst,
look_like={'alias': 'Captain Marvel'},
keys=['name', 'sex', 'age', 'alias'],
val_for_missing_key='Unknown')[0]['age'] == 'Unknown'
# edge cases, get the original dict
assert select_list_of_dicts([]) == []
assert select_list_of_dicts(dict_lst) == dict_lst
# new list of dicts is returned, leaving the original list of dicts untouched
black_widow = select_list_of_dicts(dict_lst, look_like={'name': 'Natasha Romanoff'})[0]
black_widow['age'] += 1
assert black_widow['age'] == 36
# we don't modify the original dict data, Natasha is always 35 years old
assert select_list_of_dicts(dict_lst, look_like={'name': 'Natasha Romanoff'})[0]['age'] == 35
# preds provide more flexibility, filter the ones with age info
assert len(select_list_of_dicts(dict_lst, preds=[lambda d: 'age' in d, lambda d: d['age'] >= 0])) == 4
assert len(select_list_of_dicts(dict_lst, preds=[lambda d: 'age' in d, lambda d: d['age'] >= 0], unique=True)) == 3
# combine look_like and preds parameters
expected = [{'name': 'Tony Stark', 'sex': 'male', 'age': 49, 'alias': 'Iron Man'}]
assert select_list_of_dicts(dict_lst, look_like={'sex': 'male'},
preds=[lambda d: 'age' in d, lambda d: d['age'] > 20]) == expected
# empty list is returned if no dict matches the criteria
assert select_list_of_dicts(dict_lst, look_like={'sex': 'male'},
preds=[lambda d: 'sex' in d and d['sex'] == 'female']) == []
unique_list_of_dicts
unique_list_of_dicts collapses dictionaries into a unique list based on configurable identity keys.
from pythonic_toolbox.utils.dict_utils import unique_list_of_dicts
dict_lst = [
{'name': 'Tony Stark', 'sex': 'male', 'age': 49, 'alias': 'Iron Man'},
{'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
# Peter Parkers from multiverse in same age.
{'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
{'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
# test same dict, but the order of dict is different
{'name': 'Peter Parker', 'sex': 'male', 'alias': 'Spider Man', 'age': 16},
]
# Only one Peter Parker will be kept, for all data are exactly same.
assert unique_list_of_dicts(dict_lst) == [
{'name': 'Tony Stark', 'sex': 'male', 'age': 49, 'alias': 'Iron Man'},
{'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
]
# edge cases
assert unique_list_of_dicts([]) == []
walk_leaves
walk_leaves yields a generator over nested key paths and leaf values for introspection-heavy workflows.
from pythonic_toolbox.utils.dict_utils import walk_leaves
data = {
'k1': {
'k1_1': 1,
'k1_2': 2,
},
'k2': 'N/A', # stands for not available
}
expected = {
'k1': {
'k1_1': 2,
'k1_2': 4,
},
'k2': 'N/A', # stands for not available
}
assert walk_leaves(data) == data # no transform function provided, just a deepcopy
assert walk_leaves(data, trans_fun=lambda x: x * 2 if isinstance(x, int) else x) == expected
# if inplace is set True, will change data inplace, return nothing
assert walk_leaves(data, trans_fun=lambda x: x * 2 if isinstance(x, int) else x, inplace=True) is None
assert data == expected
data = [{'name': 'lml', 'age': 33}, {'name': 'albert', 'age': 18}]
expected = [{'name': 'lml', 'age': 66}, {'name': 'albert', 'age': 36}]
assert walk_leaves(data, trans_fun=lambda x: x * 2 if isinstance(x, int) else x) == expected
assert walk_leaves(data, trans_fun=lambda x: x * 2 if isinstance(x, int) else x, inplace=True) is None
assert data == expected
# edge cases
assert walk_leaves(None) is None
assert walk_leaves([]) == []
assert walk_leaves({}) == {}
assert walk_leaves(None, inplace=True) is None
assert walk_leaves([], inplace=True) is None
assert walk_leaves({}, inplace=True) is None
functional_utils
functional_utils gathers lightweight functional-programming inspired helpers that compose common iterable transformations.
filter_multi
filter_multi composes multiple predicates for iterative filtering, with lfilter_multi providing a list materialization helper.
from pythonic_toolbox.utils.functional_utils import lfilter_multi, filter_multi
from collections.abc import Iterable
def is_even(x):
return x % 2 == 0
def is_divisible_by_5(x):
return x % 5 == 0
# select numbers which are divisible by 2 and 5
assert lfilter_multi([is_even, is_divisible_by_5], range(1, 30)) == [10, 20]
assert lfilter_multi([is_even, is_divisible_by_5], [5, 10, 15, 20]) == [10, 20]
from itertools import count, takewhile
# if you want to pass an iterator, make sure the iterator will end/break,
# Note: a bare count(start=0, step=2) will generate number like 0, 2, 4, 6, .... (never ends)
even_numbers_less_equal_than_50 = takewhile(lambda x: x <= 50, count(start=0, step=2))
expected = [0, 10, 20, 30, 40, 50]
assert lfilter_multi([is_even, is_divisible_by_5], even_numbers_less_equal_than_50) == expected
# testing for filter_multi, not converted to list directly
num_iterator = filter_multi([is_even, is_divisible_by_5], [5, 10, 15, 20])
assert type(num_iterator) is filter
assert isinstance(num_iterator, Iterable)
expected = [10, 20]
for idx, value in enumerate(num_iterator):
assert value == expected[idx]
# when items are infinite, choose filter_multi instead of lfilter_multi
expected = [0, 10, 20, 30, 40, 50]
for idx, value in enumerate(filter_multi([is_even, is_divisible_by_5], count(start=0, step=1))):
if value > 50:
break
else:
assert value == expected[idx]
list_utils
Utilities in list_utils provide expressive patterns for curation, ordering, and restructuring of list data.
filter_allowable
filter_allowable retains items that match allowable values, whether they are literal matches or resolved dynamically.
from pythonic_toolbox.utils.list_utils import filter_allowable
fruits = ['apple', 'banana', 'orange']
vegetables = ['carrot', 'potato', 'tomato']
meats = ['beef', 'chicken', 'fish']
foods = fruits + vegetables + meats
assert list(filter_allowable(foods)) == foods
assert list(filter_allowable(foods, allow_list=[], block_list=[])) == foods
assert list(filter_allowable(foods, allow_list=['apple', 'banana', 'blueberry'])) == ['apple', 'banana']
assert list(filter_allowable(foods, allow_list=[], block_list=foods)) == []
assert list(filter_allowable(foods, block_list=meats)) == fruits + vegetables
assert list(filter_allowable(foods, allow_list=['apple'], block_list=[])) == ['apple']
assert list(filter_allowable(foods, allow_list=['apple'], block_list=['apple'])) == []
assert list(filter_allowable(foods + ['blueberry'], allow_list=[], block_list=foods)) == ['blueberry']
assert list(filter_allowable(['blueberry'], allow_list=[], block_list=[])) == ['blueberry']
assert list(filter_allowable(['blueberry'], allow_list=[], block_list=['apple', 'banana'])) == ['blueberry']
assert list(filter_allowable(['blueberry'], allow_list=['orange'], block_list=['apple', 'banana'])) == []
# test cases with parameter key
assert list(filter_allowable(foods, allow_list=['a', 'b'], key=lambda x: x[0])) == ['apple', 'banana', 'beef']
# test some basic cases
assert list(filter_allowable()) == []
assert list(filter_allowable(candidates=None)) == []
assert list(filter_allowable(candidates=[])) == []
assert list(filter_allowable(candidates=[], allow_list=[], block_list=[])) == []
sort_with_custom_orders
sort_with_custom_orders sorts sequences according to bespoke priority orders or fallback comparators.
from operator import itemgetter
from typing import List
import pytest
from pythonic_toolbox.utils.list_utils import sort_with_custom_orders
# basic usage
values = ['branch2', 'branch1', 'branch3', 'master', 'release']
expected = ['master', 'release', 'branch1', 'branch2', 'branch3']
assert sort_with_custom_orders(values, prefix_orders=['master', 'release']) == expected
assert sort_with_custom_orders(values, prefix_orders=['master', 'release'], reverse=True) == expected[::-1]
values = [1, 2, 3, 9, 9]
expected = [9, 9, 1, 2, 3]
assert sort_with_custom_orders(values, prefix_orders=[9, 8, 7]) == expected
values = [1, 2, 3, 9]
expected = [9, 2, 3, 1]
assert sort_with_custom_orders(values, prefix_orders=[9], suffix_orders=[1]) == expected
assert sort_with_custom_orders([]) == []
assert sort_with_custom_orders([], prefix_orders=[], suffix_orders=[]) == []
assert sort_with_custom_orders([], prefix_orders=['master']) == []
# tests for unhashable values
values = [[2, 2], [1, 1], [3, 3], [6, 0]]
assert sort_with_custom_orders(values, prefix_orders=[[3, 3]]) == [[3, 3], [1, 1], [2, 2], [6, 0]]
# if "key" is provided, items are sorted in order of key(item)
# items in prefix_orders/suffix_orders don't need to be one-one correspondence with items to sort
# sum([6]) == sum([3, 3]) == sum([6, 0])
assert sort_with_custom_orders(values, prefix_orders=[[6]], key=sum) == [[3, 3], [6, 0], [1, 1], [2, 2]]
# tests for list of dicts
values = [{2: 2}, {1: 1}, {1: 2}]
assert sort_with_custom_orders(values, prefix_orders=[{2: 2}],
key=lambda data: sum(data.values())) == [{2: 2}, {1: 2}, {1: 1}]
branch_info: List[dict] = [{'branch': 'master', 'commit_id': 'v1.2'}, {'branch': 'release', 'commit_id': 'v1.1'}]
# Assume that we prefer choosing branch in order: release > master > others (develop, hotfix etc.)
res = sort_with_custom_orders(branch_info,
prefix_orders=[{'branch': 'release'}, {'branch': 'master'}],
key=itemgetter('branch'))
expected = [{'branch': 'release', 'commit_id': 'v1.1'}, {'branch': 'master', 'commit_id': 'v1.2'}]
assert res == expected
branch_info = [{'branch': 'develop', 'commit_id': 'v1.3'}, {'branch': 'master', 'commit_id': 'v1.2'}]
res = sort_with_custom_orders(branch_info,
prefix_orders=[{'branch': 'release'}, {'branch': 'master'}],
key=itemgetter('branch'))
expected = [{'branch': 'master', 'commit_id': 'v1.2'}, {'branch': 'develop', 'commit_id': 'v1.3'}]
assert res == expected
# tests for exceptions
with pytest.raises(ValueError) as exec_info:
sort_with_custom_orders([1, 2, 3], prefix_orders=[3], suffix_orders=[3])
assert exec_info.value.args[0] == 'prefix and suffix contains same value'
with pytest.raises(ValueError) as exec_info:
sort_with_custom_orders([1, 2, 3], prefix_orders=[1, 1])
assert exec_info.value.args[0] == 'prefix_orders contains duplicated values'
# tests for class
class Person:
def __init__(self, id, name, age):
self.id = id
self.name = name
self.age = age
def __lt__(self, other: 'Person'):
return self.age < other.age
def __eq__(self, other: 'Person'):
return self.age == other.age
def __hash__(self):
return self.id
def __str__(self):
return f'Person({self.id}, {self.name}, {self.age})'
def __repr__(self):
return str(self)
Albert = Person(1, 'Albert', 28)
Alice = Person(2, 'Alice', 26)
Menglong = Person(3, 'Menglong', 33)
persons = [Albert, Alice, Menglong]
expected = [Alice, Albert, Menglong]
assert sort_with_custom_orders(persons) == expected
expected = [Menglong, Alice, Albert]
assert sort_with_custom_orders(persons, prefix_orders=[Menglong, Person(4, 'Anyone', 40)]) == expected
unpack_list
unpack_list unpacks nested iterables into positional variables with clear error reporting.
import pytest
from pythonic_toolbox.utils.list_utils import unpack_list
first, second, third = unpack_list(['a', 'b', 'c', 'd'], target_num=3)
assert first == 'a' and second == 'b' and third == 'c'
first, second, third = unpack_list(['a', 'b'], target_num=3, default=None)
assert first == 'a' and second == 'b' and third is None
first, second, third = unpack_list(range(1, 3), target_num=3, default=None)
assert first == 1 and second == 2 and third is None
first, second, third = unpack_list([], target_num=3, default=0)
assert first == second == third == 0
first, second, *rest = unpack_list(['a', 'b', 'c'], target_num=4, default='x')
assert first == 'a' and second == 'b' and rest == ['c', 'x']
# test case for type range
first, second, third = unpack_list(range(1, 3), target_num=3, default=None)
assert first == 1 and second == 2 and third is None
def fib():
a, b = 0, 1
while 1:
yield a
a, b = b, a + b
# test case for type generator
fib_generator = fib() # generates data like [0, 1, 1, 2, 3, 5, 8, 13, 21 ...]
first, second, third, *rest = unpack_list(fib_generator, target_num=6)
assert first == 0 and second == 1 and third == 1
assert rest == [2, 3, 5]
seventh, eighth = unpack_list(fib_generator, target_num=2)
assert seventh == 8 and eighth == 13
# test edge case, nothing to unpack
empty = unpack_list([], target_num=0, default=None)
assert empty == []
res = unpack_list([], target_num=2, default=None)
assert res == [None, None]
empty = unpack_list(['a', 'b'], target_num=0, default=None)
assert empty == []
empty = unpack_list(range(0, 0), target_num=0)
assert empty == []
empty = unpack_list(iter([]), target_num=0, default=None)
assert empty == []
with pytest.raises(ValueError):
# ValueError: not enough values to unpack (expected 3, got 2)
first, second, third = unpack_list([1, 2], target_num=2)
until
until iterates through data until a stopping condition is met, mirroring familiar functional-programming patterns.
from itertools import count
from pythonic_toolbox.utils.list_utils import until
# basic usage
counter = count(1, 2) # generator of odd numbers: 1, 3, 5, 7 ...
assert until(counter, lambda x: x > 10) == 11
assert until([1, 2, 3], lambda x: x > 10, default=11) == 11
# test case for when there's no default value and no item in the iterable satisfies the condition
assert until([1, 2, 3], lambda x: x > 10) is None
# edge cases
assert until([], default=3) == 3 # nothing provided, return default
assert until(None, lambda x: x > 10, default=11) == 11
# test case for when there's no item in the counter satisfies the condition
# the following codes will run forever, so comment them out
# counter = count(1, 2) # generator of odd numbers: 1, 3, 5, 7 ...
# assert until(counter, lambda x: x % 2 == 0) is None
# test case for when max_iter_num is provided, only iterate the counter for max_iter_num times
counter = count(1, 2) # generator of odd numbers: 1, 3, 5, 7 ...
assert until(counter, lambda x: x % 2 == 0, default=None, max_iter_num=100) is None
numbers = [1, 2, 3, 4, 5, 6]
assert until(numbers, lambda x: x >= 5, default=None, max_iter_num=1) is None
assert until(numbers, lambda x: x >= 5, default=None, max_iter_num=4) is None
assert until(numbers, lambda x: x >= 5, default=None, max_iter_num=5) == 5
assert until(numbers, lambda x: x >= 5, default=None, max_iter_num=100) == 5
string_utils
The string_utils module streamlines templating and value substitution when building dynamic strings.
substitute_string_template_dict
substitute_string_template_dict safely fills placeholders in string templates using dictionary-based parameters.
from unittest.mock import patch, PropertyMock
import pytest
from pythonic_toolbox.utils.string_utils import substitute_string_template_dict, CycleError
# simple usage
# both $variable ${variable} declarations are supported in string template format
str_template_dict = {
'greeting': 'Good Morning, Everyone!',
'first_name': 'Albert',
'last_name': 'Lee',
'full_name': '$first_name $last_name',
'age': 34,
'speech': '$greeting, I am $full_name, a ${age}-year-old programmer, very glad to meet you!'
}
output_dict = substitute_string_template_dict(str_template_dict)
assert output_dict['full_name'] == 'Albert Lee'
expected_speech = 'Good Morning, Everyone!, I am Albert Lee, a 34-year-old programmer, very glad to meet you!'
assert output_dict['speech'] == expected_speech
# complex usage, with dynamic values, and multi value-providing holders
str_template_dict = {
'first_name': 'Daenerys',
'last_name': 'Targaryen',
'nick_name': 'Dany',
'full_name': '$first_name $last_name',
'speech': "$nick_name: I'm $full_name ($title1, $title2, $title3), it's $current_time_str, $greeting!",
}
variables_dict = {'title1': 'Queen of Meereen',
'title2': 'Mother of Dragons'}
class DynamicVariables:
@property
def current_time_str(self):
import datetime
return datetime.datetime.now().strftime("%H:%M:%S")
class DefaultUnknownTitle:
"""
A class will always return UnknownTitle, when try to access attribute like
title1, title2, ..., titleX
"""
def __getattribute__(self, item):
if isinstance(item, str) and item.startswith('title') and item[len(item) - 1:].isdigit():
return 'UnknownTitle'
return super(DefaultUnknownTitle, self).__getattribute__(item)
expected_speech = ("Dany: I'm Daenerys Targaryen (Queen of Meereen, Mother of Dragons, UnknownTitle), "
"it's 08:00:00, good morning everyone!")
# using mock to make DynamicVariables().current_time_str always return 08:00:00
with patch.object(DynamicVariables, 'current_time_str', return_value='08:00:00', new_callable=PropertyMock):
output_dict = substitute_string_template_dict(str_template_dict, variables_dict, DynamicVariables(),
DefaultUnknownTitle(),
greeting='good morning everyone')
assert output_dict['speech'] == expected_speech
# edge cases
assert substitute_string_template_dict({}) == {}
# cycle detection
str_template_dict = {
'variable_a': 'Hello $variable_b', # variable_a depends on variable_b
'variable_b': 'Hello $variable_a', # variable_b depends on variable_a, it's a cycle!
}
with pytest.raises(CycleError) as exec_info:
substitute_string_template_dict(str_template_dict)
context
context demonstrates context managers that gracefully gate execution paths based on runtime conditions.
SkipContext
SkipContext conditionally suppresses execution within a context manager, ideal for pre-emptive locking or runtime flags.
import itertools
import pytest
from pythonic_toolbox.utils.context_utils import SkipContext
# Usage: define a class that inherits the SkipContext,
# and takes control of the skip or not logic
class MyWorkStation(SkipContext):
def __init__(self, week_day: str):
working_days = {'monday', 'tuesday', 'wednesday', 'thursday', 'friday'}
weekends = {'saturday', 'sunday'}
if week_day.lower() not in working_days.union(weekends):
raise ValueError(f'Invalid weekday {week_day}')
skip = True if week_day.lower() in weekends else False
super(MyWorkStation, self).__init__(skip=skip)
seven_week_days = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
logged_opening_days = []
total_working_hours = 0
for cur_week_day in seven_week_days:
# MyWorkStation will skip the code block when encountering weekends
with MyWorkStation(week_day=cur_week_day):
# log this working day
logged_opening_days.append(cur_week_day)
# accumulate working hours, 8 hours on each working day
total_working_hours += 8
# only working days are logged
assert logged_opening_days == ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday']
assert total_working_hours == 8 * 5
# test basic SkipContext
count_iterator = itertools.count(start=0, step=1)
flg_skip = True
with SkipContext(skip=flg_skip):
# if skip = True, all codes inside the context will be skipped(not executed)
next(count_iterator) # this will not be executed
assert sum([1, 1]) == 3
raise Exception('Codes will not be executed')
assert next(count_iterator) == 0 # check previous context is skipped
flg_skip = False
with SkipContext(skip=flg_skip):
# codes will be executed as normal, if skip = False
next(count_iterator) # generate value 1
assert sum([1, 1]) == 2
assert next(count_iterator) == 2 # check previous context is executed
with pytest.raises(Exception) as exec_info:
with SkipContext(skip=False):
# if skip = False, this SkipContextManager is transparent,
# internal exception will be detected as normal
raise Exception('MyError')
assert exec_info.value.args[0] == 'MyError'
# another example: ensure there will be only one job, who acquire the lock, run the increase +1
from multiprocessing import Manager, Pool
import time
from pythonic_toolbox.utils.context_utils import SkipContext
def plain_cronjob_increase(ns, lock):
start = time.time()
with lock:
now = time.time()
if now - start >= 0.5:
pass
else:
ns.cnt += 1
time.sleep(1)
return ns.cnt
class PreemptiveLockContext(SkipContext):
def __init__(self, lock):
self.start_time = time.perf_counter()
self.lock = lock
self.acquired = self.lock.acquire(timeout=0.5)
skip = not self.acquired
super(PreemptiveLockContext, self).__init__(skip=skip)
def __exit__(self, type, value, traceback):
if self.acquired:
time.sleep(1)
self.lock.release()
if type is None:
return # No exception
else:
if issubclass(type, self.SkipContentException):
return True # Suppress special SkipWithBlockException
return False
def cronjob_increase(ns, lock):
# for those who cannot acquire the lock within some time
# this context block will be skipped, quite simple
with PreemptiveLockContext(lock):
ns.cnt += 1
return ns.cnt
manager = Manager()
lock = manager.Lock()
ns = manager.Namespace()
pool = Pool(2)
ns.cnt = 0
processes = [pool.apply_async(plain_cronjob_increase, args=(ns, lock)) for __ in range(0, 2)]
result = [p.get() for p in processes]
assert result == [1, 1]
assert ns.cnt == 1
# reset global cnt=0
ns.cnt = 0
processes = [pool.apply_async(cronjob_increase, args=(ns, lock)) for __ in range(0, 2)]
result = [p.get() for p in processes]
assert result == [1, 1]
assert ns.cnt == 1
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pythonic-toolbox-1.1.40.tar.gz.
File metadata
- Download URL: pythonic-toolbox-1.1.40.tar.gz
- Upload date:
- Size: 57.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.8.0 pkginfo/1.10.0 readme-renderer/34.0 requests/2.27.1 requests-toolbelt/1.0.0 urllib3/1.26.18 tqdm/4.64.1 importlib-metadata/4.8.3 keyring/23.4.1 rfc3986/1.5.0 colorama/0.4.5 CPython/3.6.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a6786239c13dbd929d74d0759c8039abd81ef84d859f4d99f600df09edf774d0
|
|
| MD5 |
ca757286a98b1bba481f614f970efedc
|
|
| BLAKE2b-256 |
1f156904e12e7c4c93194ccf7489f8033da0fd484a3801ae7bdc09198377b2ae
|
File details
Details for the file pythonic_toolbox-1.1.40-py3-none-any.whl.
File metadata
- Download URL: pythonic_toolbox-1.1.40-py3-none-any.whl
- Upload date:
- Size: 33.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.8.0 pkginfo/1.10.0 readme-renderer/34.0 requests/2.27.1 requests-toolbelt/1.0.0 urllib3/1.26.18 tqdm/4.64.1 importlib-metadata/4.8.3 keyring/23.4.1 rfc3986/1.5.0 colorama/0.4.5 CPython/3.6.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
76d4f8434d85f33e6dd64f2e93e2af63677713ff3dd675b4adbd106468e127dc
|
|
| MD5 |
790e543c1c13d63458d58ec0a5cf5994
|
|
| BLAKE2b-256 |
7c80d6a77888c0cdfcdc6b69d8b4a8fea63d530f2e0eef951ae14f9a4d65aa45
|