a toolbox with pythonic utils, tools
Project description
Pythonic toolbox
Table of Contents
- Introduction
- Installation
- Usage
README.md is auto generated by the script tests/generate_readme_markdown.py from testing files,
python3 tests/generate_readme_markdown.py
A python3.6+ toolbox with multi useful utils, functions, decorators in pythonic way, and is fully tested from python3.6 to python3.11 .
pip3 install pythonic-toolbox --upgrade
import pytest
from pythonic_toolbox.decorators.common import ignore_unexpected_kwargs
# Following functions are named under Metasyntactic Variables, like:
# foobar, foo, bar, baz, qux, quux, quuz, corge,
# grault, garply, waldo, fred, plugh, xyzzy, thud
def foo(a, b=0, c=3):
return a, b, c
dct = {'a': 1, 'b': 2, 'd': 4}
with pytest.raises(TypeError) as __:
assert foo(**dct) == (1, 2, 3)
wrapped_foo = ignore_unexpected_kwargs(foo)
assert wrapped_foo(**dct) == (1, 2, 3)
assert wrapped_foo(0, 0, 0) == (0, 0, 0)
assert wrapped_foo(a=1, b=2, c=3) == (1, 2, 3)
def bar(*args: int):
return sum(args)
# should not change original behavior
assert bar(1, 2, 3) == 6
assert bar(1, 2, 3, unexpected='Gotcha') == 6
nums = [1, 2, 3]
assert bar(*nums, unexpected='Gotcha') == 6
def qux(a, b, **kwargs):
# function with Parameter.VAR_KEYWORD Aka **kwargs
return a, b, kwargs.get('c', 3), kwargs.get('d', 4)
assert qux(**{'a': 1, 'b': 2, 'd': 4, 'e': 5}) == (1, 2, 3, 4)
class Person:
def __init__(self, name, age, sex):
self.name = name
self.age = age
self.sex = sex
def create(cls, name, age, sex):
return cls(name, age, sex)
def greetings(name):
return f'Hello, I am {name}'
params = {
'name': 'albert',
'age': 34,
'sex': 'male',
'height': '170cm',
__ = Person(**params)
__ = Person('albert', 35, 'male', height='170cm')
# test cases for classmethod, staticmethod
__ = Person.create(**params)
assert Person.greetings(**params)
import pytest
from pythonic_toolbox.decorators.common import retry
# use decorator without any arguments, using retry default params
def func_fail_first_time():
self = func_fail_first_time
if not hasattr(self, 'call_times'):
# set attribute call_times for function, to count call times
self.call_times = 0
self.call_times += 1
if self.call_times == 1:
raise Exception('Fail when first called')
return 'ok'
assert func_fail_first_time() == 'ok'
assert func_fail_first_time.call_times == 2
assert func_fail_first_time.__doc__ == 'func_fail_first_time'
@retry(tries=2, delay=0.1) # use decorator with customized params
def func_fail_twice():
self = func_fail_twice
if not hasattr(self, 'call_times'):
self.call_times = 0
self.call_times += 1
if self.call_times <= 2:
raise Exception('Fail when called first, second time')
return 'ok'
assert func_fail_twice() == 'ok'
assert func_fail_twice.call_times == 3
assert func_fail_twice.__doc__ == 'func_fail_twice'
@retry(tries=2, delay=0.1)
def func_fail_three_times():
self = func_fail_three_times
if not hasattr(self, 'call_times'):
self.call_times = 0
self.call_times += 1
if self.call_times <= 3: # 1, 2, 3
raise Exception('Fail when called first, second, third time')
return 'ok'
with pytest.raises(Exception) as exec_info:
assert func_fail_three_times.call_times == 3
assert exec_info.value.args[0] == 'Fail when called first, second, third time'
def raw_func_fail_first_time():
self = raw_func_fail_first_time
if not hasattr(self, 'call_times'):
# set attribute call_times for function, to count call times
self.call_times = 0
self.call_times += 1
if self.call_times == 1:
raise Exception('Fail when first called')
return 'ok'
assert retry(raw_func_fail_first_time)() == 'ok'
# test cases when function has arguments, kwargs
@retry(tries=1, delay=0.1)
def func_fail_first_time_with_parameters(p1, p2):
self = func_fail_first_time_with_parameters
if not hasattr(self, 'call_times'):
# set attribute call_times for function, to count call times
self.call_times = 0
self.call_times += 1
if self.call_times == 1:
raise Exception('Fail when first called')
return p1 + p2
assert func_fail_first_time_with_parameters(1, 2) == 3
def func_fail_first_time_with_parameters(p1, p2):
self = func_fail_first_time_with_parameters
if not hasattr(self, 'call_times'):
# set attribute call_times for function, to count call times
self.call_times = 0
self.call_times += 1
if self.call_times == 1:
raise Exception('Fail when first called')
return p1 + p2
assert retry(tries=1, delay=0.1)(func_fail_first_time_with_parameters)(1, 2) == 3
assert retry(tries=1, delay=0.1)(func_fail_first_time_with_parameters)(p1=1, p2=2) == 3
import asyncio
async def async_func_fail_first_time():
self = async_func_fail_first_time
if not hasattr(self, 'call_times'):
self.call_times = 0
self.call_times += 1
if self.call_times == 1:
raise Exception('Fail when first called')
return 'ok'
async def async_func_fail_first_time2():
self = async_func_fail_first_time2
if not hasattr(self, 'call_times'):
self.call_times = 0
self.call_times += 1
if self.call_times == 1:
raise Exception('Fail when first called')
return 'ok'
async def async_main():
assert await async_func_fail_first_time() == 'ok'
assert async_func_fail_first_time.__doc__ == 'async_func_fail_first_time'
assert async_func_fail_first_time.call_times == 2
assert await async_func_fail_first_time2() == 'ok'
assert async_func_fail_first_time2.call_times == 2
assert async_func_fail_first_time2.__doc__ == 'async_func_fail_first_time2'
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
import random
fail_count = 0
async def always_fail_func():
nonlocal fail_count
fail_count += 1
await asyncio.sleep(random.random())
raise ValueError()
async def async_main_for_always_fail():
nonlocal fail_count
tasks = [always_fail_func() for i in range(0, 3)]
results = await asyncio.gather(*tasks, return_exceptions=True)
assert all(map(lambda e: isinstance(e, ValueError), results))
assert fail_count == 2 * 3 # each func run twice, three func calls
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
from collections import deque
import pytest
from pythonic_toolbox.utils.deque_utils import deque_pop_any
queue = deque([1, 2, 3, 4, 5])
assert deque_pop_any(queue, idx=1) == 2
assert queue == deque([1, 3, 4, 5])
# edge case: same as deque.popleft()
queue = deque([1, 2, 3, 4, 5])
assert deque_pop_any(queue, idx=0) == 1
assert queue == deque([2, 3, 4, 5])
# edge case: same as deque.popright()
queue = deque([1, 2, 3, 4, 5])
assert deque_pop_any(queue, idx=len(queue) - 1) == 5
assert queue == deque([1, 2, 3, 4])
queue = deque([1, 2, 3, 4, 5])
with pytest.raises(IndexError) as exec_info:
deque_pop_any(queue, idx=102)
# edge case: pop from empty deque
queue = deque()
with pytest.raises(IndexError) as exec_info:
deque_pop_any(queue, idx=0)
assert exec_info.value.args[0] == 'pop from empty deque'
import pytest
from collections import deque
from pythonic_toolbox.utils.deque_utils import deque_split
queue1, queue2 = deque_split(deque([1, 2, 3, 4, 5]), num=3)
assert queue1 == deque([1, 2, 3])
assert queue2 == deque([4, 5])
queue1, queue2 = deque_split(deque([1, 2, 3, 4, 5]), num=0)
assert queue1 == deque([])
assert queue2 == deque([1, 2, 3, 4, 5])
queue1, queue2 = deque_split(deque([1, 2, 3, 4, 5]), num=100)
assert queue1 == deque([1, 2, 3, 4, 5])
assert queue2 == deque([])
with pytest.raises(ValueError) as exec_info:
deque_split(deque([1, 2, 3, 4, 5]), -1)
assert exec_info.value.args[0] == 'num must be integer: 0 <= num <= sys.maxsize'
from copy import deepcopy
import pytest
from pythonic_toolbox.utils.dict_utils import DictObj
naive_dct = {
'key1': 'val1',
'key2': 'val2',
obj = DictObj(naive_dct)
# test basic functional methods like dict
assert len(obj) == 2
assert bool(obj) is True
# same behavior like ordinary dict according to the python version (FILO for popitem for 3.6+)
assert obj.popitem() == ('key2', 'val2')
assert obj.popitem() == ('key1', 'val1')
with pytest.raises(KeyError) as __:
# a key can be treated like an attribute
# an attribute can be treated like a key
obj.key3 = 'val3'
assert obj.pop('key3') == 'val3'
with pytest.raises(KeyError) as __:
obj.key5 = 'val5'
del obj.key5
with pytest.raises(KeyError) as __:
with pytest.raises(AttributeError) as __:
del obj.key5
# test deepcopy
obj = DictObj({'languages': ['Chinese', 'English']})
copied_obj = deepcopy(obj)
assert copied_obj == obj
copied_obj.languages = obj.languages + ['Japanese']
assert obj.languages == ['Chinese', 'English']
assert copied_obj.languages == ['Chinese', 'English', 'Japanese']
assert copied_obj != obj
person_dct = {'name': 'Albert', 'age': '34', 'sex': 'Male', 'languages': ['Chinese', 'English']}
person = DictObj(person_dct)
assert DictObj(person_dct) == DictObj(person_dct)
assert person.to_dict() == person_dct
assert set(person.keys()) == {'name', 'age', 'sex', 'languages'}
assert hasattr(person, 'name') is True
assert person.name == 'Albert'
assert person['name'] == 'Albert'
assert person.languages == ['Chinese', 'English', 'Japanese']
person.height = '170'
assert person['height'] == '170'
assert 'height' in person
assert 'height' in person.keys()
assert hasattr(person, 'height') is True
del person['height']
assert 'height' not in person
assert 'height' not in person.keys()
person['height'] = '170cm'
person.update({'weight': '50'})
weight_val = person.pop('weight')
assert weight_val == '50'
person.update(DictObj({'weight': '50kg'}))
assert person.weight == '50kg'
expected = {
'name': 'Albert', 'age': '34', 'sex': 'Male',
'languages': ['Chinese', 'English', 'Japanese'], # appended new language
'height': '170cm', # new added attribute
'weight': '50kg', # new added attribute
assert person.to_dict() == expected
repr_expected: str = ("{'name': 'Albert', 'age': '34', 'sex': 'Male', "
"'languages': ['Chinese', 'English', 'Japanese'],"
" 'height': '170cm', 'weight': '50kg'}")
assert repr(person) == repr_expected
# nested structure will be detected, and changed to DictObj
chessboard_data = {
'position': [
[{'name': 'knight'}, {'name': 'pawn'}],
[{'name': 'pawn'}, {'name': 'queen'}],
chessboard_obj = DictObj(chessboard_data)
# test comparing instances of DictObj
assert DictObj(chessboard_data) == DictObj(chessboard_data)
assert isinstance(chessboard_obj.position, list)
assert len(chessboard_obj.position) == 2
assert isinstance(chessboard_obj.position[0][0], DictObj)
assert chessboard_obj.position[0][0].name == 'knight'
assert chessboard_obj.position[1][1].name == 'queen'
# edge case empty DictObj
empty_dict_obj = DictObj({})
assert len(empty_dict_obj) == 0
assert bool(empty_dict_obj) is False
obj_dict = DictObj({'data': 'oops'})
assert obj_dict.data == 'oops'
# params validation
invalid_key_dct = {
1: '1',
# test when dict's key is not str
with pytest.raises(ValueError) as __:
__ = DictObj(invalid_key_dct)
complicated_key_dct = {
'1abc': 'Gotcha', # '1abc' is not valid identifier for Python, so obj.1abc will cause SyntaxError
'class': 'MyClass', # 'class' is keyword in Python, so obj.class will cause SyntaxError
obj_dict = DictObj(complicated_key_dct)
assert obj_dict['1abc'] == 'Gotcha'
assert getattr(obj_dict, '1abc') == 'Gotcha'
# you can access '1abc' as attribute by adding prefix '_'
assert obj_dict._1abc == 'Gotcha'
del obj_dict._1abc
assert obj_dict['class'] == 'MyClass'
assert getattr(obj_dict, 'class') == 'MyClass'
# you can access 'class' as attribute by adding prefix '_'
assert obj_dict._class == 'MyClass'
# test re-assign new value for 'class'
obj_dict._class = 'MyClass2'
assert obj_dict._class == 'MyClass2'
assert obj_dict['class'] == 'MyClass2'
assert getattr(obj_dict, 'class') == 'MyClass2'
del obj_dict._class
# if assign new attributes (_2, _try), ObjDict will treat it like what the originally are
# this is fully considered by design, you're not encouraged to mess up keys
obj_dict._2x = 'NewAttr'
assert obj_dict._2x == 'NewAttr'
assert obj_dict['_2x'] == 'NewAttr'
with pytest.raises(KeyError):
__ = obj_dict['2x']
with pytest.raises(AttributeError):
__ = getattr(obj_dict, '2x')
obj_dict._try = 'NewAttr'
assert obj_dict._try == 'NewAttr'
assert obj_dict['_try'] == 'NewAttr'
with pytest.raises(KeyError):
__ = obj_dict['NewAttr']
with pytest.raises(AttributeError):
__ = getattr(obj_dict, 'NewAttr')
# Demo for messing up key 'class'
# delete and re-assign _class
complicated_key_dct = {
'class': 'MyClass', # 'class' is keyword in Python, so obj.class will cause SyntaxError
obj_dict = DictObj(complicated_key_dct)
assert obj_dict['class'] == 'MyClass'
obj_dict._class = 'MyClass2'
assert obj_dict['class'] == 'MyClass2'
del obj_dict._class
# obj_dict has no knowledge about 'class' or '_class'
# so '_class' is a brand-new attribute, and will be stored as '_class'
obj_dict._class = 'MyClass3'
with pytest.raises(KeyError):
# Oops!!! by-design
# 'class' cannot be accessed as key anymore,
# because we store '_class' as key as other valid keys behave
assert obj_dict['class'] == 'MyClass3'
assert obj_dict['_class'] == 'MyClass3'
# thread safe testing
import sys
from threading import Thread
from pythonic_toolbox.decorators.decorator_utils import method_synchronized
class MyObjDict(DictObj):
# implement a thread-safe method to increase the value of cnt
def increase_cnt_by_n(self, n):
self.cnt += n
def increase_cnt_by_100(dict_obj):
for i in range(100):
sw_interval = sys.getswitchinterval()
my_dict_obj = MyObjDict({'cnt': 0})
threads = [Thread(target=increase_cnt_by_100, args=(my_dict_obj,)) for _ in range(100)]
[t.start() for t in threads]
[t.join() for t in threads]
assert my_dict_obj.cnt == 10000
# test copy/deepcopy of DictObj
import copy
person = DictObj({'name': 'albert', 'age': 33})
team = DictObj({'leader': person})
shallow_copy_of_team = copy.copy(team)
assert team.leader is shallow_copy_of_team.leader
deep_copy_of_team = copy.deepcopy(team)
assert team.leader is not deep_copy_of_team.leader
assert team.leader == deep_copy_of_team.leader
from typing import cast
import pytest
from pythonic_toolbox.utils.dict_utils import FinalDictObj
person_dct = {'name': 'Albert', 'age': '34', 'sex': 'Male', 'languages': ['Chinese', 'English']}
fixed_person = FinalDictObj(person_dct)
assert fixed_person.name == 'Albert'
# FINAL means once initialized, you cannot change the key/attribute anymore
with pytest.raises(RuntimeError) as exec_info:
fixed_person.name = 'Steve'
expected_error_str = 'Cannot modify attribute/item in an already initialized FinalDictObj'
assert exec_info.value.args[0] == expected_error_str
with pytest.raises(RuntimeError) as __:
with pytest.raises(RuntimeError) as __:
assert isinstance(fixed_person.languages, tuple)
with pytest.raises(AttributeError) as exec_info:
# list values are changed into tuple to avoid being modified
cast(list, fixed_person.languages).append('Japanese')
expected_error_str = "'tuple' object has no attribute 'append'"
assert exec_info.value.args[0] == expected_error_str
assert fixed_person.to_dict() == person_dct
# nested structure will be detected, and changed to FinalDictObj
chessboard_data = {
'position': [
[{'name': 'knight'}, {'name': 'pawn'}],
[{'name': 'pawn'}, {'name': 'queen'}],
chessboard_obj = FinalDictObj(chessboard_data)
# test comparing instances of FinalDictObj
assert FinalDictObj(chessboard_data) == FinalDictObj(chessboard_data)
assert isinstance(chessboard_obj.position, tuple)
assert isinstance(chessboard_obj.position[0][0], FinalDictObj)
assert chessboard_obj.position[1][1].name == 'queen'
with pytest.raises(RuntimeError) as __:
chessboard_obj.position[1][1].name = 'knight'
# test for keyword/non-identifier key as attribute
final_obj_dict = FinalDictObj({
'class': 'MyClass', # 'class' is keyword in Python, so obj.class will cause SyntaxError
assert final_obj_dict['class'] == 'MyClass'
assert getattr(final_obj_dict, 'class') == 'MyClass'
assert final_obj_dict._class == 'MyClass'
# test copy/deepcopy of FileDictObj
import copy
person = FinalDictObj({'name': 'albert', 'age': 33})
team = FinalDictObj({'leader': person})
shallow_copy_of_team = copy.copy(team)
assert team.leader is shallow_copy_of_team.leader
assert team.leader == shallow_copy_of_team.leader
deep_copy_of_team = copy.deepcopy(team)
assert team.leader is not deep_copy_of_team.leader
assert team.leader == deep_copy_of_team.leader
import pytest
from pythonic_toolbox.utils.dict_utils import RangeKeyDict
# test normal case
range_key_dict: RangeKeyDict[float, str] = RangeKeyDict({
(float('-inf'), 0): 'Negative',
(0, 60): 'F', # 0 <= val < 60
(60, 70): 'D', # 60 <= val < 70
(70, 80): 'C', # 70 <= val < 80
(80, 90): 'B', # 80 <= val < 90
(90, 100): 'A', # 90 <= val < 100
100: 'A+', # val == 100
# Big O of querying is O(log n), n is the number of ranges, due to using bisect inside
assert range_key_dict[-1] == 'Negative'
assert range_key_dict[0] == 'F'
assert range_key_dict[55] == 'F'
assert range_key_dict[60] == 'D'
assert range_key_dict[75] == 'C'
assert range_key_dict[85] == 'B'
assert range_key_dict[95] == 'A'
assert range_key_dict[100] == 'A+'
with pytest.raises(KeyError) as exec_info:
_ = range_key_dict['95'] # when key is not comparable with other integer keys
assert exec_info.value.args[0] == "KeyError: '95' is not comparable with other keys"
with pytest.raises(KeyError) as exec_info:
_ = range_key_dict[150]
assert exec_info.value.args[0] == 'KeyError: 150'
assert range_key_dict.get(150, 'N/A') == 'N/A'
# test comparison with other RangeKeyDict
assert RangeKeyDict({(0, 10): '1'}) == RangeKeyDict({(0, 10): '1'})
assert RangeKeyDict({(0, 10): '1'}) != RangeKeyDict({(0, 10): '2'})
assert RangeKeyDict({(0, 10): '1'}) != RangeKeyDict({(0, 1000): '1'})
with pytest.raises(ValueError):
# [1, 1) is not a valid range
# there's no value x satisfy 1 <= x < 1
RangeKeyDict({(1, 1): '1'})
with pytest.raises(ValueError):
# [1, -1) is not a valid range
RangeKeyDict({(1, -1): '1'})
# validate input keys types and detect range overlaps(segment intersect)
with pytest.raises(ValueError) as exec_info:
(0, 10): 'val-between-0-and-10',
(0, 5): 'val-between-0-and-5'
expected_error_msg = ("Duplicated left boundary key 0 detected: "
"(0, 10): 'val-between-0-and-10', (0, 5): 'val-between-0-and-5'")
assert exec_info.value.args[0] == expected_error_msg
with pytest.raises(ValueError) as exec_info:
(0, 10): 'val-between-0-and-10',
(5, 15): 'val-between-5-and-15'
expected_error_msg = ("Overlap detected: "
"(0, 10): 'val-between-0-and-10', (5, 15): 'val-between-5-and-15'")
assert exec_info.value.args[0] == expected_error_msg
# test RangeKeyDict with no continuous ranges
range_key_dict: RangeKeyDict[float, str] = RangeKeyDict({
(0, 60): 'F', # 0 <= val < 60
(70, 80): 'C', # 70 <= val < 80
assert range_key_dict[10] == 'F'
with pytest.raises(KeyError) as exec_info:
_ = range_key_dict[-100]
assert exec_info.value.args[0] == 'KeyError: -100'
with pytest.raises(KeyError) as exec_info:
_ = range_key_dict[65]
assert exec_info.value.args[0] == 'KeyError: 65'
with pytest.raises(KeyError) as exec_info:
_ = range_key_dict[100]
assert exec_info.value.args[0] == 'KeyError: 100'
from functools import total_ordering
class Age:
def __init__(self, val: float):
if not isinstance(val, (int, float)):
raise ValueError('Invalid age value')
self.val = val
def __le__(self, other):
return self.val <= other.val
def __repr__(self):
return f'Age({repr(self.val)})'
def __hash__(self):
return hash(self.val)
age_categories_map: RangeKeyDict[Age, str] = RangeKeyDict({
(Age(0), Age(2)): 'Baby',
(Age(2), Age(15)): 'Children',
(Age(15), Age(25)): 'Youth',
(Age(25), Age(65)): 'Adults',
(Age(65), Age(123)): 'Seniors',
assert age_categories_map[Age(0.5)] == 'Baby'
assert age_categories_map[Age(12)] == 'Children'
assert age_categories_map[Age(20)] == 'Youth'
assert age_categories_map[Age(35)] == 'Adults'
assert age_categories_map[Age(70)] == 'Seniors'
import pytest
from pythonic_toolbox.utils.dict_utils import StrKeyIdDict
data = {1: 'a', 2: 'b', '3': 'c'}
my_dict = StrKeyIdDict(data)
# usage: value can be accessed by id (str: int-like/uuid-like/whatever) or id (int)
assert my_dict['1'] == my_dict[1] == 'a'
assert my_dict.keys() == {'1', '2', '3'} # all keys are str type
my_dict['4'] = 'd'
assert my_dict['4'] == 'd'
my_dict[4] = 'd'
assert my_dict['4'] == 'd'
my_dict.update({4: 'd'})
assert my_dict['4'] == 'd'
# test comparing instances of the class
assert StrKeyIdDict(data) == StrKeyIdDict(data)
assert StrKeyIdDict(data) != StrKeyIdDict(dict(data, **{'4': 'd'}))
assert StrKeyIdDict(data) == {'1': 'a', '2': 'b', '3': 'c'}
assert StrKeyIdDict(data) != {'1': 'a', '2': 'b', '3': 'd'}
assert StrKeyIdDict(data) != {1: 'a', 2: 'b', 3: 'c'} # StrKeyIdDict assumes all keys are strings
# test delete key
del my_dict[4]
assert my_dict.keys() == {'1', '2', '3'} # '4' is not in the dict anymore
# assign value to an arbitrary string key that is not in the dict
my_dict.update({'some-uuid': 'something'})
assert my_dict['some-uuid'] == 'something'
with pytest.raises(TypeError):
# key '1', 1 both stands for key '1',
# so we get duplicated keys when initializing instance, oops!
my_dict = StrKeyIdDict({'1': 'a', 1: 'A'})
assert my_dict.get(1) == 'a'
assert my_dict.get('NotExistKey') is None
assert my_dict.get('NotExistKey', 'NotExistValue') == 'NotExistValue'
# test edge cases
assert StrKeyIdDict() == {}
# test shallow copy
my_dict[5] = ['e1', 'e2', 'e3']
copy_dict = my_dict.copy()
copy_dict[1] = 'A'
assert my_dict[1] == 'a'
assert copy_dict['5'] == ['e1', 'e2', 'e3', 'e4']
# test deep copy
from copy import deepcopy
copy_dict = deepcopy(my_dict)
assert my_dict['5'] == ['e1', 'e2', 'e3', 'e4', 'e5']
assert copy_dict[5] == ['e1', 'e2', 'e3', 'e4']
# test constructor
my_dict = StrKeyIdDict(uuid1='a', uuid2='b')
assert my_dict['uuid1'] == 'a'
# test constructor (from keys)
my_dict = StrKeyIdDict.fromkeys([1, 2, 3], None)
assert my_dict == {'1': None, '2': None, '3': None}
# test update and overwrite
my_dict.update(StrKeyIdDict({1: 'a', 2: 'b', 3: 'c', 4: 'd'}))
assert my_dict == {'1': 'a', '2': 'b', '3': 'c', '4': 'd'}
my_dict = StrKeyIdDict([(1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')])
assert my_dict['1'] == my_dict[1] == 'a'
# reassign StrKeyIdDict instance to another StrKeyIdDict instance
my_dict = StrKeyIdDict(my_dict)
assert my_dict == {'1': 'a', '2': 'b', '3': 'c', '4': 'd'}
assert dict(my_dict) == {'1': 'a', '2': 'b', '3': 'c', '4': 'd'}
# test case when "key" is "data", which is a reserved keyword inside StrKeyIdDict
my_dict = StrKeyIdDict({'data': 'data_value', '1': 'a'})
assert my_dict['data'] == 'data_value'
assert my_dict['1'] == 'a'
# delete key 'data', should not affect other keys
del my_dict['data']
assert my_dict['1'] == 'a'
from pythonic_toolbox.utils.dict_utils import collect_leaves
# a nested dict-like struct
my_dict = {
'node_1': {
'node_1_1': {
'node_1_1_1': 'A',
'node_1_2': {
'node_1_2_1': 'B',
'node_1_2_2': 'C',
'node_1_2_3': None,
'node_1_3': [ # dict list
'node_1_3_1_1': 'D',
'node_1_3_1_2': 'E',
'node_1_3_2_1': 'FF',
'node_1_3_2_2': 'GG',
expected = ['A', 'B', 'C', None, 'D', 'E', 'FF', 'GG']
assert collect_leaves(my_dict) == expected
expected = ['A', 'B', 'C', 'D', 'E', 'FF', 'GG']
assert collect_leaves(my_dict, leaf_pred=lambda lf: lf) == expected
assert collect_leaves(my_dict, keypath_pred=lambda kp: len(kp) == 1) == []
expected = ['B', 'C']
assert collect_leaves(my_dict, keypath_pred=lambda kp: kp[-1] in {'node_1_2_1', 'node_1_2_2'}) == expected
expected = ['C']
assert collect_leaves(my_dict, leaf_pred=lambda lf: lf == 'C') == expected
assert collect_leaves(my_dict,
keypath_pred=lambda kp: kp[-1] == 'node_1_2_2',
leaf_pred=lambda lf: lf == 'C') == expected
assert collect_leaves(my_dict,
keypath_pred=lambda kp: kp[-1] == 'node_1_1_1',
leaf_pred=lambda lf: lf == 'C') == []
expected = ['D', 'E', 'FF', 'GG']
assert collect_leaves(my_dict,
keypath_pred=lambda kp: len(kp) >= 2 and kp[-2] == 'node_1_3') == expected
expected = ['FF', 'GG']
assert collect_leaves(my_dict,
keypath_pred=lambda kp: len(kp) >= 2 and kp[-2] == 'node_1_3',
leaf_pred=lambda lf: isinstance(lf, str) and len(lf) == 2) == expected
# edge cases
assert collect_leaves([]) == []
assert collect_leaves({}) == []
assert collect_leaves(None) == []
from pythonic_toolbox.utils.dict_utils import dict_until
data = {'full_name': 'Albert Lee', 'pen_name': None}
assert dict_until(data, keys=['name', 'full_name']) == 'Albert Lee'
assert dict_until(data, keys=['full_name', 'name']) == 'Albert Lee'
assert dict_until(data, keys=['name', 'english_name']) is None
assert dict_until(data, keys=['name', 'english_name'], default='anonymous') == 'anonymous'
# test when pen_name is set None on purpose
assert dict_until(data, keys=['pen_name'], default='anonymous') is None
# test when value with None value is not acceptable
assert dict_until(data, keys=['pen_name'], terminate=lambda x: x is not None, default='anonymous') == 'anonymous'
from pythonic_toolbox.utils.dict_utils import select_list_of_dicts
dict_lst = [
{'name': 'Tony Stark', 'sex': 'male', 'age': 49, 'alias': 'Iron Man'},
{'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
# another Peter Parker from multiverse
{'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
# age unknown for Carol Danvers, no age field
{'name': 'Carol Danvers', 'sex': 'female', 'alias': 'Captain Marvel'},
{'name': 'Natasha Romanoff', 'sex': 'female', 'age': 35, 'alias': 'Black Widow'},
assert select_list_of_dicts(dict_lst, look_like={'name': 'Peter Parker'}) == [
{'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
{'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'}]
assert select_list_of_dicts(dict_lst, look_like={'sex': 'female'}) == [
{'name': 'Carol Danvers', 'sex': 'female', 'alias': 'Captain Marvel'},
{'name': 'Natasha Romanoff', 'sex': 'female', 'age': 35, 'alias': 'Black Widow'}]
assert select_list_of_dicts(dict_lst, look_like={'sex': 'female'}, keys=['name']) == [
{'name': 'Carol Danvers'}, {'name': 'Natasha Romanoff'}]
# unique is supported for return list
assert select_list_of_dicts(dict_lst, look_like={'sex': 'male'}, keys=['name', 'age']) == [
{'name': 'Tony Stark', 'age': 49},
{'name': 'Peter Parker', 'age': 16},
{'name': 'Peter Parker', 'age': 16},
assert select_list_of_dicts(dict_lst, look_like={'sex': 'male'}, keys=['name', 'age'], unique=True) == [
{'name': 'Tony Stark', 'age': 49},
{'name': 'Peter Parker', 'age': 16}]
# dict keys are ordered as the keys passed-in
assert list(select_list_of_dicts(dict_lst, keys=['name', 'age'], unique=True)[0].keys()) == ['name', 'age']
assert list(select_list_of_dicts(dict_lst, keys=['age', 'name'], unique=True)[0].keys()) == ['age', 'name']
# locate Captain Marvel, with default val for missing key
assert select_list_of_dicts(dict_lst,
look_like={'alias': 'Captain Marvel'},
keys=['name', 'sex', 'age', 'alias'],
val_for_missing_key='Unknown')[0]['age'] == 'Unknown'
# edge cases, get the original dict
assert select_list_of_dicts([]) == []
assert select_list_of_dicts(dict_lst) == dict_lst
# new list of dicts is returned, leaving the original list of dicts untouched
black_widow = select_list_of_dicts(dict_lst, look_like={'name': 'Natasha Romanoff'})[0]
black_widow['age'] += 1
assert black_widow['age'] == 36
# we don't modify the original dict data, Natasha is always 35 years old
assert select_list_of_dicts(dict_lst, look_like={'name': 'Natasha Romanoff'})[0]['age'] == 35
# preds provide more flexibility, filter the ones with age info
assert len(select_list_of_dicts(dict_lst, preds=[lambda d: 'age' in d, lambda d: d['age'] >= 0])) == 4
assert len(select_list_of_dicts(dict_lst, preds=[lambda d: 'age' in d, lambda d: d['age'] >= 0], unique=True)) == 3
# combine look_like and preds parameters
expected = [{'name': 'Tony Stark', 'sex': 'male', 'age': 49, 'alias': 'Iron Man'}]
assert select_list_of_dicts(dict_lst, look_like={'sex': 'male'},
preds=[lambda d: 'age' in d, lambda d: d['age'] > 20]) == expected
# empty list is returned if no dict matches the criteria
assert select_list_of_dicts(dict_lst, look_like={'sex': 'male'},
preds=[lambda d: 'sex' in d and d['sex'] == 'female']) == []
from pythonic_toolbox.utils.dict_utils import unique_list_of_dicts
dict_lst = [
{'name': 'Tony Stark', 'sex': 'male', 'age': 49, 'alias': 'Iron Man'},
{'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
# Peter Parkers from multiverse in same age.
{'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
{'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
# test same dict, but the order of dict is different
{'name': 'Peter Parker', 'sex': 'male', 'alias': 'Spider Man', 'age': 16},
# Only one Peter Parker will be kept, for all data are exactly same.
assert unique_list_of_dicts(dict_lst) == [
{'name': 'Tony Stark', 'sex': 'male', 'age': 49, 'alias': 'Iron Man'},
{'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
# edge cases
assert unique_list_of_dicts([]) == []
from pythonic_toolbox.utils.dict_utils import walk_leaves
data = {
'k1': {
'k1_1': 1,
'k1_2': 2,
'k2': 'N/A', # stands for not available
expected = {
'k1': {
'k1_1': 2,
'k1_2': 4,
'k2': 'N/A', # stands for not available
assert walk_leaves(data) == data # no transform function provided, just a deepcopy
assert walk_leaves(data, trans_fun=lambda x: x * 2 if isinstance(x, int) else x) == expected
# if inplace is set True, will change data inplace, return nothing
assert walk_leaves(data, trans_fun=lambda x: x * 2 if isinstance(x, int) else x, inplace=True) is None
assert data == expected
data = [{'name': 'lml', 'age': 33}, {'name': 'albert', 'age': 18}]
expected = [{'name': 'lml', 'age': 66}, {'name': 'albert', 'age': 36}]
assert walk_leaves(data, trans_fun=lambda x: x * 2 if isinstance(x, int) else x) == expected
assert walk_leaves(data, trans_fun=lambda x: x * 2 if isinstance(x, int) else x, inplace=True) is None
assert data == expected
# edge cases
assert walk_leaves(None) is None
assert walk_leaves([]) == []
assert walk_leaves({}) == {}
assert walk_leaves(None, inplace=True) is None
assert walk_leaves([], inplace=True) is None
assert walk_leaves({}, inplace=True) is None
from pythonic_toolbox.utils.functional_utils import lfilter_multi, filter_multi
from collections.abc import Iterable
def is_even(x):
return x % 2 == 0
def is_divisible_by_5(x):
return x % 5 == 0
# select numbers which are divisible by 2 and 5
assert lfilter_multi([is_even, is_divisible_by_5], range(1, 30)) == [10, 20]
assert lfilter_multi([is_even, is_divisible_by_5], [5, 10, 15, 20]) == [10, 20]
from itertools import count, takewhile
# if you want to pass an iterator, make sure the iterator will end/break,
# Note: a bare count(start=0, step=2) will generate number like 0, 2, 4, 6, .... (never ends)
even_numbers_less_equal_than_50 = takewhile(lambda x: x <= 50, count(start=0, step=2))
expected = [0, 10, 20, 30, 40, 50]
assert lfilter_multi([is_even, is_divisible_by_5], even_numbers_less_equal_than_50) == expected
# testing for filter_multi, not converted to list directly
num_iterator = filter_multi([is_even, is_divisible_by_5], [5, 10, 15, 20])
assert type(num_iterator) is filter
assert isinstance(num_iterator, Iterable)
expected = [10, 20]
for idx, value in enumerate(num_iterator):
assert value == expected[idx]
# when items are infinite, choose filter_multi instead of lfilter_multi
expected = [0, 10, 20, 30, 40, 50]
for idx, value in enumerate(filter_multi([is_even, is_divisible_by_5], count(start=0, step=1))):
if value > 50:
assert value == expected[idx]
from pythonic_toolbox.utils.list_utils import filter_allowable
fruits = ['apple', 'banana', 'orange']
vegetables = ['carrot', 'potato', 'tomato']
meats = ['beef', 'chicken', 'fish']
foods = fruits + vegetables + meats
assert list(filter_allowable(foods)) == foods
assert list(filter_allowable(foods, allow_list=[], block_list=[])) == foods
assert list(filter_allowable(foods, allow_list=['apple', 'banana', 'blueberry'])) == ['apple', 'banana']
assert list(filter_allowable(foods, allow_list=[], block_list=foods)) == []
assert list(filter_allowable(foods, block_list=meats)) == fruits + vegetables
assert list(filter_allowable(foods, allow_list=['apple'], block_list=[])) == ['apple']
assert list(filter_allowable(foods, allow_list=['apple'], block_list=['apple'])) == []
assert list(filter_allowable(foods + ['blueberry'], allow_list=[], block_list=foods)) == ['blueberry']
assert list(filter_allowable(['blueberry'], allow_list=[], block_list=[])) == ['blueberry']
assert list(filter_allowable(['blueberry'], allow_list=[], block_list=['apple', 'banana'])) == ['blueberry']
assert list(filter_allowable(['blueberry'], allow_list=['orange'], block_list=['apple', 'banana'])) == []
# test cases with parameter key
assert list(filter_allowable(foods, allow_list=['a', 'b'], key=lambda x: x[0])) == ['apple', 'banana', 'beef']
# test some basic cases
assert list(filter_allowable()) == []
assert list(filter_allowable(candidates=None)) == []
assert list(filter_allowable(candidates=[])) == []
assert list(filter_allowable(candidates=[], allow_list=[], block_list=[])) == []
from operator import itemgetter
from typing import List
import pytest
from pythonic_toolbox.utils.list_utils import sort_with_custom_orders
# basic usage
values = ['branch2', 'branch1', 'branch3', 'master', 'release']
expected = ['master', 'release', 'branch1', 'branch2', 'branch3']
assert sort_with_custom_orders(values, prefix_orders=['master', 'release']) == expected
assert sort_with_custom_orders(values, prefix_orders=['master', 'release'], reverse=True) == expected[::-1]
values = [1, 2, 3, 9, 9]
expected = [9, 9, 1, 2, 3]
assert sort_with_custom_orders(values, prefix_orders=[9, 8, 7]) == expected
values = [1, 2, 3, 9]
expected = [9, 2, 3, 1]
assert sort_with_custom_orders(values, prefix_orders=[9], suffix_orders=[1]) == expected
assert sort_with_custom_orders([]) == []
assert sort_with_custom_orders([], prefix_orders=[], suffix_orders=[]) == []
assert sort_with_custom_orders([], prefix_orders=['master']) == []
# tests for unhashable values
values = [[2, 2], [1, 1], [3, 3], [6, 0]]
assert sort_with_custom_orders(values, prefix_orders=[[3, 3]]) == [[3, 3], [1, 1], [2, 2], [6, 0]]
# if "key" is provided, items are sorted in order of key(item)
# items in prefix_orders/suffix_orders don't need to be one-one correspondence with items to sort
# sum([6]) == sum([3, 3]) == sum([6, 0])
assert sort_with_custom_orders(values, prefix_orders=[[6]], key=sum) == [[3, 3], [6, 0], [1, 1], [2, 2]]
# tests for list of dicts
values = [{2: 2}, {1: 1}, {1: 2}]
assert sort_with_custom_orders(values, prefix_orders=[{2: 2}],
key=lambda data: sum(data.values())) == [{2: 2}, {1: 2}, {1: 1}]
branch_info: List[dict] = [{'branch': 'master', 'commit_id': 'v1.2'}, {'branch': 'release', 'commit_id': 'v1.1'}]
# Assume that we prefer choosing branch in order: release > master > others (develop, hotfix etc.)
res = sort_with_custom_orders(branch_info,
prefix_orders=[{'branch': 'release'}, {'branch': 'master'}],
expected = [{'branch': 'release', 'commit_id': 'v1.1'}, {'branch': 'master', 'commit_id': 'v1.2'}]
assert res == expected
branch_info = [{'branch': 'develop', 'commit_id': 'v1.3'}, {'branch': 'master', 'commit_id': 'v1.2'}]
res = sort_with_custom_orders(branch_info,
prefix_orders=[{'branch': 'release'}, {'branch': 'master'}],
expected = [{'branch': 'master', 'commit_id': 'v1.2'}, {'branch': 'develop', 'commit_id': 'v1.3'}]
assert res == expected
# tests for exceptions
with pytest.raises(ValueError) as exec_info:
sort_with_custom_orders([1, 2, 3], prefix_orders=[3], suffix_orders=[3])
assert exec_info.value.args[0] == 'prefix and suffix contains same value'
with pytest.raises(ValueError) as exec_info:
sort_with_custom_orders([1, 2, 3], prefix_orders=[1, 1])
assert exec_info.value.args[0] == 'prefix_orders contains duplicated values'
# tests for class
class Person:
def __init__(self, id, name, age):
self.id = id
self.name = name
self.age = age
def __lt__(self, other: 'Person'):
return self.age < other.age
def __eq__(self, other: 'Person'):
return self.age == other.age
def __hash__(self):
return self.id
def __str__(self):
return f'Person({self.id}, {self.name}, {self.age})'
def __repr__(self):
return str(self)
Albert = Person(1, 'Albert', 28)
Alice = Person(2, 'Alice', 26)
Menglong = Person(3, 'Menglong', 33)
persons = [Albert, Alice, Menglong]
expected = [Alice, Albert, Menglong]
assert sort_with_custom_orders(persons) == expected
expected = [Menglong, Alice, Albert]
assert sort_with_custom_orders(persons, prefix_orders=[Menglong, Person(4, 'Anyone', 40)]) == expected
import pytest
from pythonic_toolbox.utils.list_utils import unpack_list
first, second, third = unpack_list(['a', 'b', 'c', 'd'], target_num=3)
assert first == 'a' and second == 'b' and third == 'c'
first, second, third = unpack_list(['a', 'b'], target_num=3, default=None)
assert first == 'a' and second == 'b' and third is None
first, second, third = unpack_list(range(1, 3), target_num=3, default=None)
assert first == 1 and second == 2 and third is None
first, second, third = unpack_list([], target_num=3, default=0)
assert first == second == third == 0
first, second, *rest = unpack_list(['a', 'b', 'c'], target_num=4, default='x')
assert first == 'a' and second == 'b' and rest == ['c', 'x']
# test case for type range
first, second, third = unpack_list(range(1, 3), target_num=3, default=None)
assert first == 1 and second == 2 and third is None
def fib():
a, b = 0, 1
while 1:
yield a
a, b = b, a + b
# test case for type generator
fib_generator = fib() # generates data like [0, 1, 1, 2, 3, 5, 8, 13, 21 ...]
first, second, third, *rest = unpack_list(fib_generator, target_num=6)
assert first == 0 and second == 1 and third == 1
assert rest == [2, 3, 5]
seventh, eighth = unpack_list(fib_generator, target_num=2)
assert seventh == 8 and eighth == 13
# test edge case, nothing to unpack
empty = unpack_list([], target_num=0, default=None)
assert empty == []
res = unpack_list([], target_num=2, default=None)
assert res == [None, None]
empty = unpack_list(['a', 'b'], target_num=0, default=None)
assert empty == []
empty = unpack_list(range(0, 0), target_num=0)
assert empty == []
empty = unpack_list(iter([]), target_num=0, default=None)
assert empty == []
with pytest.raises(ValueError):
# ValueError: not enough values to unpack (expected 3, got 2)
first, second, third = unpack_list([1, 2], target_num=2)
from itertools import count
from pythonic_toolbox.utils.list_utils import until
# basic usage
counter = count(1, 2) # generator of odd numbers: 1, 3, 5, 7 ...
assert until(counter, lambda x: x > 10) == 11
assert until([1, 2, 3], lambda x: x > 10, default=11) == 11
# test case for when there's no default value and no item in the iterable satisfies the condition
assert until([1, 2, 3], lambda x: x > 10) is None
# edge cases
assert until([], default=3) == 3 # nothing provided, return default
assert until(None, lambda x: x > 10, default=11) == 11
# test case for when there's no item in the counter satisfies the condition
# the following codes will run forever, so comment them out
# counter = count(1, 2) # generator of odd numbers: 1, 3, 5, 7 ...
# assert until(counter, lambda x: x % 2 == 0) is None
# test case for when max_iter_num is provided, only iterate the counter for max_iter_num times
counter = count(1, 2) # generator of odd numbers: 1, 3, 5, 7 ...
assert until(counter, lambda x: x % 2 == 0, default=None, max_iter_num=100) is None
numbers = [1, 2, 3, 4, 5, 6]
assert until(numbers, lambda x: x >= 5, default=None, max_iter_num=1) is None
assert until(numbers, lambda x: x >= 5, default=None, max_iter_num=4) is None
assert until(numbers, lambda x: x >= 5, default=None, max_iter_num=5) == 5
assert until(numbers, lambda x: x >= 5, default=None, max_iter_num=100) == 5
from unittest.mock import patch, PropertyMock
import pytest
from pythonic_toolbox.utils.string_utils import substitute_string_template_dict, CycleError
# simple usage
# both $variable ${variable} declarations are supported in string template format
str_template_dict = {
'greeting': 'Good Morning, Everyone!',
'first_name': 'Albert',
'last_name': 'Lee',
'full_name': '$first_name $last_name',
'age': 34,
'speech': '$greeting, I am $full_name, a ${age}-year-old programmer, very glad to meet you!'
output_dict = substitute_string_template_dict(str_template_dict)
assert output_dict['full_name'] == 'Albert Lee'
expected_speech = 'Good Morning, Everyone!, I am Albert Lee, a 34-year-old programmer, very glad to meet you!'
assert output_dict['speech'] == expected_speech
# complex usage, with dynamic values, and multi value-providing holders
str_template_dict = {
'first_name': 'Daenerys',
'last_name': 'Targaryen',
'nick_name': 'Dany',
'full_name': '$first_name $last_name',
'speech': "$nick_name: I'm $full_name ($title1, $title2, $title3), it's $current_time_str, $greeting!",
variables_dict = {'title1': 'Queen of Meereen',
'title2': 'Mother of Dragons'}
class DynamicVariables:
def current_time_str(self):
import datetime
return datetime.datetime.now().strftime("%H:%M:%S")
class DefaultUnknownTitle:
A class will always return UnknownTitle, when try to access attribute like
title1, title2, ..., titleX
def __getattribute__(self, item):
if isinstance(item, str) and item.startswith('title') and item[len(item) - 1:].isdigit():
return 'UnknownTitle'
return super(DefaultUnknownTitle, self).__getattribute__(item)
expected_speech = ("Dany: I'm Daenerys Targaryen (Queen of Meereen, Mother of Dragons, UnknownTitle), "
"it's 08:00:00, good morning everyone!")
# using mock to make DynamicVariables().current_time_str always return 08:00:00
with patch.object(DynamicVariables, 'current_time_str', return_value='08:00:00', new_callable=PropertyMock):
output_dict = substitute_string_template_dict(str_template_dict, variables_dict, DynamicVariables(),
greeting='good morning everyone')
assert output_dict['speech'] == expected_speech
# edge cases
assert substitute_string_template_dict({}) == {}
# cycle detection
str_template_dict = {
'variable_a': 'Hello $variable_b', # variable_a depends on variable_b
'variable_b': 'Hello $variable_a', # variable_b depends on variable_a, it's a cycle!
with pytest.raises(CycleError) as exec_info:
import itertools
import pytest
from pythonic_toolbox.utils.context_utils import SkipContext
# Usage: define a class that inherits the SkipContext,
# and takes control of the skip or not logic
class MyWorkStation(SkipContext):
def __init__(self, week_day: str):
working_days = {'monday', 'tuesday', 'wednesday', 'thursday', 'friday'}
weekends = {'saturday', 'sunday'}
if week_day.lower() not in working_days.union(weekends):
raise ValueError(f'Invalid weekday {week_day}')
skip = True if week_day.lower() in weekends else False
super(MyWorkStation, self).__init__(skip=skip)
seven_week_days = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
logged_opening_days = []
total_working_hours = 0
for cur_week_day in seven_week_days:
# MyWorkStation will skip the code block when encountering weekends
with MyWorkStation(week_day=cur_week_day):
# log this working day
# accumulate working hours, 8 hours on each working day
total_working_hours += 8
# only working days are logged
assert logged_opening_days == ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday']
assert total_working_hours == 8 * 5
# test basic SkipContext
count_iterator = itertools.count(start=0, step=1)
flg_skip = True
with SkipContext(skip=flg_skip):
# if skip = True, all codes inside the context will be skipped(not executed)
next(count_iterator) # this will not be executed
assert sum([1, 1]) == 3
raise Exception('Codes will not be executed')
assert next(count_iterator) == 0 # check previous context is skipped
flg_skip = False
with SkipContext(skip=flg_skip):
# codes will be executed as normal, if skip = False
next(count_iterator) # generate value 1
assert sum([1, 1]) == 2
assert next(count_iterator) == 2 # check previous context is executed
with pytest.raises(Exception) as exec_info:
with SkipContext(skip=False):
# if skip = False, this SkipContextManager is transparent,
# internal exception will be detected as normal
raise Exception('MyError')
assert exec_info.value.args[0] == 'MyError'
# another example: ensure there will be only one job, who acquire the lock, run the increase +1
from multiprocessing import Manager, Pool
import time
from pythonic_toolbox.utils.context_utils import SkipContext
def plain_cronjob_increase(ns, lock):
start = time.time()
with lock:
now = time.time()
if now - start >= 0.5:
ns.cnt += 1
return ns.cnt
class PreemptiveLockContext(SkipContext):
def __init__(self, lock):
self.start_time = time.perf_counter()
self.lock = lock
self.acquired = self.lock.acquire(timeout=0.5)
skip = not self.acquired
super(PreemptiveLockContext, self).__init__(skip=skip)
def __exit__(self, type, value, traceback):
if self.acquired:
if type is None:
return # No exception
if issubclass(type, self.SkipContentException):
return True # Suppress special SkipWithBlockException
return False
def cronjob_increase(ns, lock):
# for those who cannot acquire the lock within some time
# this context block will be skipped, quite simple
with PreemptiveLockContext(lock):
ns.cnt += 1
return ns.cnt
manager = Manager()
lock = manager.Lock()
ns = manager.Namespace()
pool = Pool(2)
ns.cnt = 0
processes = [pool.apply_async(plain_cronjob_increase, args=(ns, lock)) for __ in range(0, 2)]
result = [p.get() for p in processes]
assert result == [1, 1]
assert ns.cnt == 1
# reset global cnt=0
ns.cnt = 0
processes = [pool.apply_async(cronjob_increase, args=(ns, lock)) for __ in range(0, 2)]
result = [p.get() for p in processes]
assert result == [1, 1]
assert ns.cnt == 1
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
(53.9 kB
view details)
Built Distribution
File details
Details for the file pythonic-toolbox-1.1.39.tar.gz
File metadata
- Download URL: pythonic-toolbox-1.1.39.tar.gz
- Upload date:
- Size: 53.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.8.0 pkginfo/1.10.0 readme-renderer/34.0 requests/2.27.1 requests-toolbelt/1.0.0 urllib3/1.26.18 tqdm/4.64.1 importlib-metadata/4.8.3 keyring/23.4.1 rfc3986/1.5.0 colorama/0.4.5 CPython/3.6.15
File hashes
Algorithm | Hash digest | |
SHA256 | 35cf55a0dfb41c799d8b059f1f175ab708e87365d71365432d508f641bd8520a |
MD5 | 2b7b94f2c2144e1ee4ef58646c1ee76d |
BLAKE2b-256 | 5f7e5d121510194fc42afeaaa426fd30a0b514248f04478e64543c9884bec106 |
File details
Details for the file pythonic_toolbox-1.1.39-py3-none-any.whl
File metadata
- Download URL: pythonic_toolbox-1.1.39-py3-none-any.whl
- Upload date:
- Size: 31.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.8.0 pkginfo/1.10.0 readme-renderer/34.0 requests/2.27.1 requests-toolbelt/1.0.0 urllib3/1.26.18 tqdm/4.64.1 importlib-metadata/4.8.3 keyring/23.4.1 rfc3986/1.5.0 colorama/0.4.5 CPython/3.6.15
File hashes
Algorithm | Hash digest | |
SHA256 | 9064b346ecd655d2889475cc17dd6617a74d18f2e90956b95ec6f3904609ce49 |
MD5 | 01730d524f79a32bb5ee582eae99d9d8 |
BLAKE2b-256 | 99ea03fdf7e398b8fa88d19531a024e257f6139d59d261f87169e72d90472a53 |