Skip to main content

a toolbox with pythonic utils, tools

Project description

Pythonic toolbox

PyPI version License Supported Python versions Stability CodeQL Status Python3.6 Test Status SNYK Status

Table of Contents

README.md is auto generated by the script tests/generate_readme_markdown.py from testing files,

DO NOT EDIT DIRECTLY! ;)

python3 tests/generate_readme_markdown.py

Introduction

A python3.6+ toolbox with multi useful utils, functions, decorators in pythonic way, and is fully tested from python3.6 to python3.11 .

Installation

pip3 install pythonic-toolbox --upgrade

Usage

decorators

ignore_unexpected_kwargs

import pytest
from pythonic_toolbox.decorators.common import ignore_unexpected_kwargs

# Following functions are named under Metasyntactic Variables, like:
# foobar, foo, bar, baz, qux, quux, quuz, corge,
# grault, garply, waldo, fred, plugh, xyzzy, thud

def foo(a, b=0, c=3):
    return a, b, c

dct = {'a': 1, 'b': 2, 'd': 4}
with pytest.raises(TypeError) as __:
    assert foo(**dct) == (1, 2, 3)

wrapped_foo = ignore_unexpected_kwargs(foo)
assert wrapped_foo(**dct) == (1, 2, 3)

assert wrapped_foo(0, 0, 0) == (0, 0, 0)
assert wrapped_foo(a=1, b=2, c=3) == (1, 2, 3)

@ignore_unexpected_kwargs
def bar(*args: int):
    return sum(args)

# should not change original behavior
assert bar(1, 2, 3) == 6
assert bar(1, 2, 3, unexpected='Gotcha') == 6
nums = [1, 2, 3]
assert bar(*nums, unexpected='Gotcha') == 6

@ignore_unexpected_kwargs
def qux(a, b, **kwargs):
    # function with Parameter.VAR_KEYWORD Aka **kwargs
    return a, b, kwargs.get('c', 3), kwargs.get('d', 4)

assert qux(**{'a': 1, 'b': 2, 'd': 4, 'e': 5}) == (1, 2, 3, 4)

class Person:
    @ignore_unexpected_kwargs
    def __init__(self, name, age, sex):
        self.name = name
        self.age = age
        self.sex = sex

    @classmethod
    @ignore_unexpected_kwargs
    def create(cls, name, age, sex):
        return cls(name, age, sex)

    @staticmethod
    @ignore_unexpected_kwargs
    def greetings(name):
        return f'Hello, I am {name}'

params = {
    'name': 'albert',
    'age': 34,
    'sex': 'male',
    'height': '170cm',
}
__ = Person(**params)
__ = Person('albert', 35, 'male', height='170cm')

# test cases for classmethod, staticmethod
__ = Person.create(**params)
assert Person.greetings(**params)

retry

import pytest

from pythonic_toolbox.decorators.common import retry

# use decorator without any arguments, using retry default params
@retry
def func_fail_first_time():
    """func_fail_first_time"""
    self = func_fail_first_time
    if not hasattr(self, 'call_times'):
        # set attribute call_times for function, to count call times
        self.call_times = 0
    self.call_times += 1
    if self.call_times == 1:
        raise Exception('Fail when first called')
    return 'ok'

assert func_fail_first_time() == 'ok'
assert func_fail_first_time.call_times == 2
assert func_fail_first_time.__doc__ == 'func_fail_first_time'

@retry(tries=2, delay=0.1)  # use decorator with customized params
def func_fail_twice():
    """func_fail_twice"""
    self = func_fail_twice
    if not hasattr(self, 'call_times'):
        self.call_times = 0
    self.call_times += 1
    if self.call_times <= 2:
        raise Exception('Fail when called first, second time')
    return 'ok'

assert func_fail_twice() == 'ok'
assert func_fail_twice.call_times == 3
assert func_fail_twice.__doc__ == 'func_fail_twice'

@retry(tries=2, delay=0.1)
def func_fail_three_times():
    """func_fail_three_times"""
    self = func_fail_three_times
    if not hasattr(self, 'call_times'):
        self.call_times = 0
    self.call_times += 1
    if self.call_times <= 3:  # 1, 2, 3
        raise Exception('Fail when called first, second, third time')
    return 'ok'

with pytest.raises(Exception) as exec_info:
    func_fail_three_times()
assert func_fail_three_times.call_times == 3
assert exec_info.value.args[0] == 'Fail when called first, second, third time'

def raw_func_fail_first_time():
    """func_fail_first_time"""
    self = raw_func_fail_first_time
    if not hasattr(self, 'call_times'):
        # set attribute call_times for function, to count call times
        self.call_times = 0
    self.call_times += 1
    if self.call_times == 1:
        raise Exception('Fail when first called')
    return 'ok'

assert retry(raw_func_fail_first_time)() == 'ok'

# test cases when function has arguments, kwargs
@retry(tries=1, delay=0.1)
def func_fail_first_time_with_parameters(p1, p2):
    """func_fail_first_time"""
    self = func_fail_first_time_with_parameters
    if not hasattr(self, 'call_times'):
        # set attribute call_times for function, to count call times
        self.call_times = 0
    self.call_times += 1
    if self.call_times == 1:
        raise Exception('Fail when first called')
    return p1 + p2

assert func_fail_first_time_with_parameters(1, 2) == 3

def func_fail_first_time_with_parameters(p1, p2):
    """func_fail_first_time"""
    self = func_fail_first_time_with_parameters
    if not hasattr(self, 'call_times'):
        # set attribute call_times for function, to count call times
        self.call_times = 0
    self.call_times += 1
    if self.call_times == 1:
        raise Exception('Fail when first called')
    return p1 + p2

assert retry(tries=1, delay=0.1)(func_fail_first_time_with_parameters)(1, 2) == 3
assert retry(tries=1, delay=0.1)(func_fail_first_time_with_parameters)(p1=1, p2=2) == 3

import asyncio

@retry
async def async_func_fail_first_time():
    """async_func_fail_first_time"""
    self = async_func_fail_first_time
    if not hasattr(self, 'call_times'):
        self.call_times = 0
    self.call_times += 1
    if self.call_times == 1:
        raise Exception('Fail when first called')
    return 'ok'

@retry(delay=0.1)
async def async_func_fail_first_time2():
    """async_func_fail_first_time2"""
    self = async_func_fail_first_time2
    if not hasattr(self, 'call_times'):
        self.call_times = 0
    self.call_times += 1
    if self.call_times == 1:
        raise Exception('Fail when first called')
    return 'ok'

async def async_main():
    assert await async_func_fail_first_time() == 'ok'
    assert async_func_fail_first_time.__doc__ == 'async_func_fail_first_time'
    assert async_func_fail_first_time.call_times == 2
    assert await async_func_fail_first_time2() == 'ok'
    assert async_func_fail_first_time2.call_times == 2
    assert async_func_fail_first_time2.__doc__ == 'async_func_fail_first_time2'

loop = asyncio.get_event_loop()
if loop.is_closed():
    loop = asyncio.new_event_loop()
try:
    loop.run_until_complete(async_main())
finally:
    loop.close()

import random
fail_count = 0

@retry(delay=0.1)
async def always_fail_func():
    nonlocal fail_count
    fail_count += 1
    await asyncio.sleep(random.random())
    raise ValueError()

async def async_main_for_always_fail():
    nonlocal fail_count
    tasks = [always_fail_func() for i in range(0, 3)]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    assert all(map(lambda e: isinstance(e, ValueError), results))
    assert fail_count == 2 * 3  # each func run twice, three func calls

loop = asyncio.get_event_loop()
if loop.is_closed():
    loop = asyncio.new_event_loop()
try:
    loop.run_until_complete(async_main_for_always_fail())
finally:
    loop.close()

deque_utils

deque_pop_any

from collections import deque

import pytest
from pythonic_toolbox.utils.deque_utils import deque_pop_any

queue = deque([1, 2, 3, 4, 5])
assert deque_pop_any(queue, idx=1) == 2
assert queue == deque([1, 3, 4, 5])

# edge case: same as deque.popleft()
queue = deque([1, 2, 3, 4, 5])
assert deque_pop_any(queue, idx=0) == 1
assert queue == deque([2, 3, 4, 5])

# edge case: same as deque.popright()
queue = deque([1, 2, 3, 4, 5])
assert deque_pop_any(queue, idx=len(queue) - 1) == 5
assert queue == deque([1, 2, 3, 4])

queue = deque([1, 2, 3, 4, 5])
with pytest.raises(IndexError) as exec_info:
    deque_pop_any(queue, idx=102)

# edge case: pop from empty deque
queue = deque()
with pytest.raises(IndexError) as exec_info:
    deque_pop_any(queue, idx=0)
assert exec_info.value.args[0] == 'pop from empty deque'

deque_split

import pytest

from collections import deque

from pythonic_toolbox.utils.deque_utils import deque_split

queue1, queue2 = deque_split(deque([1, 2, 3, 4, 5]), num=3)
assert queue1 == deque([1, 2, 3])
assert queue2 == deque([4, 5])

queue1, queue2 = deque_split(deque([1, 2, 3, 4, 5]), num=0)
assert queue1 == deque([])
assert queue2 == deque([1, 2, 3, 4, 5])

queue1, queue2 = deque_split(deque([1, 2, 3, 4, 5]), num=100)
assert queue1 == deque([1, 2, 3, 4, 5])
assert queue2 == deque([])

with pytest.raises(ValueError) as exec_info:
    deque_split(deque([1, 2, 3, 4, 5]), -1)
assert exec_info.value.args[0] == 'num must be integer: 0 <= num <= sys.maxsize'

dict_utils

DictObj

from copy import deepcopy

import pytest
from pythonic_toolbox.utils.dict_utils import DictObj

naive_dct = {
    'key1': 'val1',
    'key2': 'val2',
}

obj = DictObj(naive_dct)

# test basic functional methods like dict
assert len(obj) == 2
assert bool(obj) is True
# same behavior like ordinary dict according to the python version (FILO for popitem for 3.6+)
assert obj.popitem() == ('key2', 'val2')
assert obj.popitem() == ('key1', 'val1')
with pytest.raises(KeyError) as __:
    obj.popitem()

# a key can be treated like an attribute
# an attribute can be treated like a key
obj.key3 = 'val3'
assert obj.pop('key3') == 'val3'
with pytest.raises(KeyError) as __:
    obj.pop('key4')
obj.key5 = 'val5'
del obj.key5
with pytest.raises(KeyError) as __:
    obj.pop('key5')
with pytest.raises(AttributeError) as __:
    del obj.key5

# test deepcopy
obj = DictObj({'languages': ['Chinese', 'English']})
copied_obj = deepcopy(obj)
assert copied_obj == obj
copied_obj.languages = obj.languages + ['Japanese']
assert obj.languages == ['Chinese', 'English']
assert copied_obj.languages == ['Chinese', 'English', 'Japanese']
assert copied_obj != obj

person_dct = {'name': 'Albert', 'age': '34', 'sex': 'Male', 'languages': ['Chinese', 'English']}

person = DictObj(person_dct)
assert DictObj(person_dct) == DictObj(person_dct)
assert person.to_dict() == person_dct
assert set(person.keys()) == {'name', 'age', 'sex', 'languages'}
assert hasattr(person, 'name') is True
assert person.name == 'Albert'
assert person['name'] == 'Albert'
person.languages.append('Japanese')
assert person.languages == ['Chinese', 'English', 'Japanese']

person.height = '170'
assert person['height'] == '170'
assert 'height' in person
assert 'height' in person.keys()
assert hasattr(person, 'height') is True
del person['height']
assert 'height' not in person
assert 'height' not in person.keys()
person['height'] = '170cm'

person.update({'weight': '50'})
weight_val = person.pop('weight')
assert weight_val == '50'
person.update(DictObj({'weight': '50kg'}))
assert person.weight == '50kg'

expected = {
    'name': 'Albert', 'age': '34', 'sex': 'Male',
    'languages': ['Chinese', 'English', 'Japanese'],  # appended new language
    'height': '170cm',  # new added attribute
    'weight': '50kg',  # new added attribute
}
assert person.to_dict() == expected

repr_expected: str = ("{'name': 'Albert', 'age': '34', 'sex': 'Male', "
                      "'languages': ['Chinese', 'English', 'Japanese'],"
                      " 'height': '170cm', 'weight': '50kg'}")
assert repr(person) == repr_expected

# nested structure will be detected, and changed to DictObj
chessboard_data = {
    'position': [
        [{'name': 'knight'}, {'name': 'pawn'}],
        [{'name': 'pawn'}, {'name': 'queen'}],
    ]
}
chessboard_obj = DictObj(chessboard_data)
# test comparing instances of DictObj
assert DictObj(chessboard_data) == DictObj(chessboard_data)
assert isinstance(chessboard_obj.position, list)
assert len(chessboard_obj.position) == 2
assert isinstance(chessboard_obj.position[0][0], DictObj)
assert chessboard_obj.position[0][0].name == 'knight'
assert chessboard_obj.position[1][1].name == 'queen'

# edge case empty DictObj
empty_dict_obj = DictObj({})
assert len(empty_dict_obj) == 0
assert bool(empty_dict_obj) is False

obj_dict = DictObj({'data': 'oops'})
assert obj_dict.data == 'oops'

# params validation
invalid_key_dct = {
    1: '1',
}

# test when dict's key is not str
with pytest.raises(ValueError) as __:
    __ = DictObj(invalid_key_dct)

complicated_key_dct = {
    '1abc': 'Gotcha',  # '1abc' is not valid identifier for Python, so obj.1abc will cause SyntaxError
    'class': 'MyClass',  # 'class' is keyword in Python, so obj.class will cause SyntaxError
}

obj_dict = DictObj(complicated_key_dct)
assert obj_dict['1abc'] == 'Gotcha'
assert getattr(obj_dict, '1abc') == 'Gotcha'
# you can access '1abc' as attribute by adding prefix '_'
assert obj_dict._1abc == 'Gotcha'
del obj_dict._1abc

assert obj_dict['class'] == 'MyClass'
assert getattr(obj_dict, 'class') == 'MyClass'
# you can access 'class' as attribute by adding prefix '_'
assert obj_dict._class == 'MyClass'

# test re-assign new value for 'class'
obj_dict._class = 'MyClass2'
assert obj_dict._class == 'MyClass2'
assert obj_dict['class'] == 'MyClass2'
assert getattr(obj_dict, 'class') == 'MyClass2'
del obj_dict._class

# if assign new attributes (_2, _try), ObjDict will treat it like what the originally are
# this is fully considered by design, you're not encouraged to mess up keys
obj_dict._2x = 'NewAttr'
assert obj_dict._2x == 'NewAttr'
assert obj_dict['_2x'] == 'NewAttr'
with pytest.raises(KeyError):
    __ = obj_dict['2x']
with pytest.raises(AttributeError):
    __ = getattr(obj_dict, '2x')

obj_dict._try = 'NewAttr'
assert obj_dict._try == 'NewAttr'
assert obj_dict['_try'] == 'NewAttr'
with pytest.raises(KeyError):
    __ = obj_dict['NewAttr']
with pytest.raises(AttributeError):
    __ = getattr(obj_dict, 'NewAttr')

# Demo for messing up key 'class'
# delete and re-assign _class
complicated_key_dct = {
    'class': 'MyClass',  # 'class' is keyword in Python, so obj.class will cause SyntaxError
}
obj_dict = DictObj(complicated_key_dct)

assert obj_dict['class'] == 'MyClass'
obj_dict._class = 'MyClass2'
assert obj_dict['class'] == 'MyClass2'
del obj_dict._class

# obj_dict has no knowledge about 'class' or '_class'
# so '_class' is a brand-new attribute, and will be stored as '_class'
obj_dict._class = 'MyClass3'
with pytest.raises(KeyError):
    # Oops!!! by-design
    # 'class' cannot be accessed as key anymore,
    # because we store '_class' as key as other valid keys behave
    assert obj_dict['class'] == 'MyClass3'
assert obj_dict['_class'] == 'MyClass3'

# thread safe testing
import sys
from threading import Thread
from pythonic_toolbox.decorators.decorator_utils import method_synchronized

class MyObjDict(DictObj):
    # implement a thread-safe method to increase the value of cnt
    @method_synchronized
    def increase_cnt_by_n(self, n):
        self.cnt += n

def increase_cnt_by_100(dict_obj):
    for i in range(100):
        dict_obj.increase_cnt_by_n(1)

sw_interval = sys.getswitchinterval()
try:
    sys.setswitchinterval(0.0001)
    my_dict_obj = MyObjDict({'cnt': 0})
    threads = [Thread(target=increase_cnt_by_100, args=(my_dict_obj,)) for _ in range(100)]
    [t.start() for t in threads]
    [t.join() for t in threads]
    assert my_dict_obj.cnt == 10000
finally:
    sys.setswitchinterval(sw_interval)

# test copy/deepcopy of DictObj
import copy

person = DictObj({'name': 'albert', 'age': 33})
team = DictObj({'leader': person})
shallow_copy_of_team = copy.copy(team)
assert team.leader is shallow_copy_of_team.leader

deep_copy_of_team = copy.deepcopy(team)
assert team.leader is not deep_copy_of_team.leader
assert team.leader == deep_copy_of_team.leader

FinalDictObj

from typing import cast

import pytest
from pythonic_toolbox.utils.dict_utils import FinalDictObj

person_dct = {'name': 'Albert', 'age': '34', 'sex': 'Male', 'languages': ['Chinese', 'English']}

fixed_person = FinalDictObj(person_dct)
assert fixed_person.name == 'Albert'

# FINAL means once initialized, you cannot change the key/attribute anymore
with pytest.raises(RuntimeError) as exec_info:
    fixed_person.name = 'Steve'
expected_error_str = 'Cannot modify attribute/item in an already initialized FinalDictObj'
assert exec_info.value.args[0] == expected_error_str

with pytest.raises(RuntimeError) as __:
    fixed_person.popitem()

with pytest.raises(RuntimeError) as __:
    fixed_person.pop('name')

assert isinstance(fixed_person.languages, tuple)
with pytest.raises(AttributeError) as exec_info:
    # list values are changed into tuple to avoid being modified
    cast(list, fixed_person.languages).append('Japanese')
expected_error_str = "'tuple' object has no attribute 'append'"
assert exec_info.value.args[0] == expected_error_str
assert fixed_person.to_dict() == person_dct

# nested structure will be detected, and changed to FinalDictObj
chessboard_data = {
    'position': [
        [{'name': 'knight'}, {'name': 'pawn'}],
        [{'name': 'pawn'}, {'name': 'queen'}],
    ]
}
chessboard_obj = FinalDictObj(chessboard_data)
# test comparing instances of FinalDictObj
assert FinalDictObj(chessboard_data) == FinalDictObj(chessboard_data)
assert isinstance(chessboard_obj.position, tuple)
assert isinstance(chessboard_obj.position[0][0], FinalDictObj)
assert chessboard_obj.position[1][1].name == 'queen'
with pytest.raises(RuntimeError) as __:
    chessboard_obj.position[1][1].name = 'knight'

# test for keyword/non-identifier key as attribute
final_obj_dict = FinalDictObj({
    'class': 'MyClass',  # 'class' is keyword in Python, so obj.class will cause SyntaxError
})
assert final_obj_dict['class'] == 'MyClass'
assert getattr(final_obj_dict, 'class') == 'MyClass'
assert final_obj_dict._class == 'MyClass'

# test copy/deepcopy of FileDictObj
import copy
person = FinalDictObj({'name': 'albert', 'age': 33})
team = FinalDictObj({'leader': person})
shallow_copy_of_team = copy.copy(team)
assert team.leader is shallow_copy_of_team.leader
assert team.leader == shallow_copy_of_team.leader

deep_copy_of_team = copy.deepcopy(team)
assert team.leader is not deep_copy_of_team.leader
assert team.leader == deep_copy_of_team.leader

RangeKeyDict

import pytest
from pythonic_toolbox.utils.dict_utils import RangeKeyDict

# test normal case
range_key_dict: RangeKeyDict[float, str] = RangeKeyDict({
    (float('-inf'), 0): 'Negative',
    (0, 60): 'F',  # 0 <= val < 60
    (60, 70): 'D',  # 60 <= val < 70
    (70, 80): 'C',  # 70 <= val < 80
    (80, 90): 'B',  # 80 <= val < 90
    (90, 100): 'A',  # 90 <= val < 100
    100: 'A+',  # val == 100
})

# Big O of querying is O(log n), n is the number of ranges, due to using bisect inside
assert range_key_dict[-1] == 'Negative'
assert range_key_dict[0] == 'F'
assert range_key_dict[55] == 'F'
assert range_key_dict[60] == 'D'
assert range_key_dict[75] == 'C'
assert range_key_dict[85] == 'B'
assert range_key_dict[95] == 'A'
assert range_key_dict[100] == 'A+'

with pytest.raises(KeyError) as exec_info:
    _ = range_key_dict['95']  # when key is not comparable with other integer keys
assert exec_info.value.args[0] == "KeyError: '95' is not comparable with other keys"

with pytest.raises(KeyError) as exec_info:
    _ = range_key_dict[150]
assert exec_info.value.args[0] == 'KeyError: 150'

assert range_key_dict.get(150, 'N/A') == 'N/A'

# test comparison with other RangeKeyDict
assert RangeKeyDict({(0, 10): '1'}) == RangeKeyDict({(0, 10): '1'})
assert RangeKeyDict({(0, 10): '1'}) != RangeKeyDict({(0, 10): '2'})
assert RangeKeyDict({(0, 10): '1'}) != RangeKeyDict({(0, 1000): '1'})

with pytest.raises(ValueError):
    # [1, 1) is not a valid range
    # there's no value x satisfy 1 <= x < 1
    RangeKeyDict({(1, 1): '1'})

with pytest.raises(ValueError):
    # [1, -1) is not a valid range
    RangeKeyDict({(1, -1): '1'})

# validate input keys types and detect range overlaps(segment intersect)
with pytest.raises(ValueError) as exec_info:
    RangeKeyDict({
        (0, 10): 'val-between-0-and-10',
        (0, 5): 'val-between-0-and-5'
    })
expected_error_msg = ("Duplicated left boundary key 0 detected: "
                      "(0, 10): 'val-between-0-and-10', (0, 5): 'val-between-0-and-5'")
assert exec_info.value.args[0] == expected_error_msg

with pytest.raises(ValueError) as exec_info:
    RangeKeyDict({
        (0, 10): 'val-between-0-and-10',
        (5, 15): 'val-between-5-and-15'
    })
expected_error_msg = ("Overlap detected: "
                      "(0, 10): 'val-between-0-and-10', (5, 15): 'val-between-5-and-15'")
assert exec_info.value.args[0] == expected_error_msg

# test RangeKeyDict with no continuous ranges
range_key_dict: RangeKeyDict[float, str] = RangeKeyDict({
    (0, 60): 'F',  # 0 <= val < 60
    (70, 80): 'C',  # 70 <= val < 80
})

assert range_key_dict[10] == 'F'

with pytest.raises(KeyError) as exec_info:
    _ = range_key_dict[-100]
assert exec_info.value.args[0] == 'KeyError: -100'

with pytest.raises(KeyError) as exec_info:
    _ = range_key_dict[65]
assert exec_info.value.args[0] == 'KeyError: 65'

with pytest.raises(KeyError) as exec_info:
    _ = range_key_dict[100]
assert exec_info.value.args[0] == 'KeyError: 100'

from functools import total_ordering

@total_ordering
class Age:
    def __init__(self, val: float):
        if not isinstance(val, (int, float)):
            raise ValueError('Invalid age value')
        self.val = val

    def __le__(self, other):
        return self.val <= other.val

    def __repr__(self):
        return f'Age({repr(self.val)})'

    def __hash__(self):
        return hash(self.val)

age_categories_map: RangeKeyDict[Age, str] = RangeKeyDict({
    (Age(0), Age(2)): 'Baby',
    (Age(2), Age(15)): 'Children',
    (Age(15), Age(25)): 'Youth',
    (Age(25), Age(65)): 'Adults',
    (Age(65), Age(123)): 'Seniors',
})

assert age_categories_map[Age(0.5)] == 'Baby'
assert age_categories_map[Age(12)] == 'Children'
assert age_categories_map[Age(20)] == 'Youth'
assert age_categories_map[Age(35)] == 'Adults'
assert age_categories_map[Age(70)] == 'Seniors'

StrKeyIdDict

import pytest
from pythonic_toolbox.utils.dict_utils import StrKeyIdDict

data = {1: 'a', 2: 'b', '3': 'c'}
my_dict = StrKeyIdDict(data)

# usage: value can be accessed by id (str: int-like/uuid-like/whatever) or id (int)
assert my_dict['1'] == my_dict[1] == 'a'
assert my_dict.keys() == {'1', '2', '3'}  # all keys are str type
my_dict['4'] = 'd'
assert my_dict['4'] == 'd'
my_dict[4] = 'd'
assert my_dict['4'] == 'd'
my_dict.update({4: 'd'})
assert my_dict['4'] == 'd'

# test comparing instances of the class
assert StrKeyIdDict(data) == StrKeyIdDict(data)
assert StrKeyIdDict(data) != StrKeyIdDict(dict(data, **{'4': 'd'}))
assert StrKeyIdDict(data) == {'1': 'a', '2': 'b', '3': 'c'}
assert StrKeyIdDict(data) != {'1': 'a', '2': 'b', '3': 'd'}
assert StrKeyIdDict(data) != {1: 'a', 2: 'b', 3: 'c'}  # StrKeyIdDict assumes all keys are strings

# test delete key
del my_dict[4]
assert my_dict.keys() == {'1', '2', '3'}  # '4' is not in the dict anymore

# assign value to an arbitrary string key that is not in the dict
my_dict.update({'some-uuid': 'something'})
assert my_dict['some-uuid'] == 'something'

with pytest.raises(TypeError):
    # key '1', 1 both stands for key '1',
    # so we get duplicated keys when initializing instance, oops!
    my_dict = StrKeyIdDict({'1': 'a', 1: 'A'})

assert my_dict.get(1) == 'a'
assert my_dict.get('NotExistKey') is None
assert my_dict.get('NotExistKey', 'NotExistValue') == 'NotExistValue'

# test edge cases
assert StrKeyIdDict() == {}

# test shallow copy
my_dict[5] = ['e1', 'e2', 'e3']
copy_dict = my_dict.copy()
copy_dict[1] = 'A'
assert my_dict[1] == 'a'
my_dict['5'].append('e4')
assert copy_dict['5'] == ['e1', 'e2', 'e3', 'e4']

# test deep copy
from copy import deepcopy

copy_dict = deepcopy(my_dict)
my_dict[5].append('e5')
assert my_dict['5'] == ['e1', 'e2', 'e3', 'e4', 'e5']
assert copy_dict[5] == ['e1', 'e2', 'e3', 'e4']

# test constructor
my_dict = StrKeyIdDict(uuid1='a', uuid2='b')
assert my_dict['uuid1'] == 'a'

# test constructor (from keys)
my_dict = StrKeyIdDict.fromkeys([1, 2, 3], None)
assert my_dict == {'1': None, '2': None, '3': None}
# test update and overwrite
my_dict.update(StrKeyIdDict({1: 'a', 2: 'b', 3: 'c', 4: 'd'}))
assert my_dict == {'1': 'a', '2': 'b', '3': 'c', '4': 'd'}

my_dict = StrKeyIdDict([(1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')])
assert my_dict['1'] == my_dict[1] == 'a'

# reassign StrKeyIdDict instance to another StrKeyIdDict instance
my_dict = StrKeyIdDict(my_dict)
assert my_dict == {'1': 'a', '2': 'b', '3': 'c', '4': 'd'}
assert dict(my_dict) == {'1': 'a', '2': 'b', '3': 'c', '4': 'd'}

# test case when "key" is "data", which is a reserved keyword inside StrKeyIdDict
my_dict = StrKeyIdDict({'data': 'data_value', '1': 'a'})
assert my_dict['data'] == 'data_value'
assert my_dict['1'] == 'a'
# delete key 'data', should not affect other keys
del my_dict['data']
assert my_dict['1'] == 'a'

collect_leaves

from pythonic_toolbox.utils.dict_utils import collect_leaves

# a nested dict-like struct
my_dict = {
    'node_1': {
        'node_1_1': {
            'node_1_1_1': 'A',
        },
        'node_1_2': {
            'node_1_2_1': 'B',
            'node_1_2_2': 'C',
            'node_1_2_3': None,
        },
        'node_1_3': [  # dict list
            {
                'node_1_3_1_1': 'D',
                'node_1_3_1_2': 'E',
            },
            {
                'node_1_3_2_1': 'FF',
                'node_1_3_2_2': 'GG',
            }
        ]
    }}

expected = ['A', 'B', 'C', None, 'D', 'E', 'FF', 'GG']
assert collect_leaves(my_dict) == expected

expected = ['A', 'B', 'C', 'D', 'E', 'FF', 'GG']
assert collect_leaves(my_dict, leaf_pred=lambda lf: lf) == expected

assert collect_leaves(my_dict, keypath_pred=lambda kp: len(kp) == 1) == []

expected = ['B', 'C']
assert collect_leaves(my_dict, keypath_pred=lambda kp: kp[-1] in {'node_1_2_1', 'node_1_2_2'}) == expected

expected = ['C']
assert collect_leaves(my_dict, leaf_pred=lambda lf: lf == 'C') == expected
assert collect_leaves(my_dict,
                      keypath_pred=lambda kp: kp[-1] == 'node_1_2_2',
                      leaf_pred=lambda lf: lf == 'C') == expected

assert collect_leaves(my_dict,
                      keypath_pred=lambda kp: kp[-1] == 'node_1_1_1',
                      leaf_pred=lambda lf: lf == 'C') == []

expected = ['D', 'E', 'FF', 'GG']
assert collect_leaves(my_dict,
                      keypath_pred=lambda kp: len(kp) >= 2 and kp[-2] == 'node_1_3') == expected

expected = ['FF', 'GG']
assert collect_leaves(my_dict,
                      keypath_pred=lambda kp: len(kp) >= 2 and kp[-2] == 'node_1_3',
                      leaf_pred=lambda lf: isinstance(lf, str) and len(lf) == 2) == expected

# edge cases
assert collect_leaves([]) == []
assert collect_leaves({}) == []
assert collect_leaves(None) == []

dict_until

from pythonic_toolbox.utils.dict_utils import dict_until

data = {'full_name': 'Albert Lee', 'pen_name': None}
assert dict_until(data, keys=['name', 'full_name']) == 'Albert Lee'
assert dict_until(data, keys=['full_name', 'name']) == 'Albert Lee'
assert dict_until(data, keys=['name', 'english_name']) is None
assert dict_until(data, keys=['name', 'english_name'], default='anonymous') == 'anonymous'
# test when pen_name is set None on purpose
assert dict_until(data, keys=['pen_name'], default='anonymous') is None
# test when value with None value is not acceptable
assert dict_until(data, keys=['pen_name'], terminate=lambda x: x is not None, default='anonymous') == 'anonymous'

select_list_of_dicts

from pythonic_toolbox.utils.dict_utils import select_list_of_dicts

dict_lst = [
    {'name': 'Tony Stark', 'sex': 'male', 'age': 49, 'alias': 'Iron Man'},
    {'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
    # another Peter Parker from multiverse
    {'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
    # age unknown for Carol Danvers, no age field
    {'name': 'Carol Danvers', 'sex': 'female', 'alias': 'Captain Marvel'},
    {'name': 'Natasha Romanoff', 'sex': 'female', 'age': 35, 'alias': 'Black Widow'},
]

assert select_list_of_dicts(dict_lst, look_like={'name': 'Peter Parker'}) == [
    {'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
    {'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'}]

assert select_list_of_dicts(dict_lst, look_like={'sex': 'female'}) == [
    {'name': 'Carol Danvers', 'sex': 'female', 'alias': 'Captain Marvel'},
    {'name': 'Natasha Romanoff', 'sex': 'female', 'age': 35, 'alias': 'Black Widow'}]

assert select_list_of_dicts(dict_lst, look_like={'sex': 'female'}, keys=['name']) == [
    {'name': 'Carol Danvers'}, {'name': 'Natasha Romanoff'}]

# unique is supported for return list
assert select_list_of_dicts(dict_lst, look_like={'sex': 'male'}, keys=['name', 'age']) == [
    {'name': 'Tony Stark', 'age': 49},
    {'name': 'Peter Parker', 'age': 16},
    {'name': 'Peter Parker', 'age': 16},
]

assert select_list_of_dicts(dict_lst, look_like={'sex': 'male'}, keys=['name', 'age'], unique=True) == [
    {'name': 'Tony Stark', 'age': 49},
    {'name': 'Peter Parker', 'age': 16}]

# dict keys are ordered as the keys passed-in
assert list(select_list_of_dicts(dict_lst, keys=['name', 'age'], unique=True)[0].keys()) == ['name', 'age']
assert list(select_list_of_dicts(dict_lst, keys=['age', 'name'], unique=True)[0].keys()) == ['age', 'name']

# locate Captain Marvel, with default val for missing key
assert select_list_of_dicts(dict_lst,
                            look_like={'alias': 'Captain Marvel'},
                            keys=['name', 'sex', 'age', 'alias'],
                            val_for_missing_key='Unknown')[0]['age'] == 'Unknown'

# edge cases, get the original dict
assert select_list_of_dicts([]) == []
assert select_list_of_dicts(dict_lst) == dict_lst

# new list of dicts is returned, leaving the original list of dicts untouched
black_widow = select_list_of_dicts(dict_lst, look_like={'name': 'Natasha Romanoff'})[0]
black_widow['age'] += 1
assert black_widow['age'] == 36
# we don't modify the original dict data, Natasha is always 35 years old
assert select_list_of_dicts(dict_lst, look_like={'name': 'Natasha Romanoff'})[0]['age'] == 35

# preds provide more flexibility, filter the ones with age info
assert len(select_list_of_dicts(dict_lst, preds=[lambda d: 'age' in d, lambda d: d['age'] >= 0])) == 4
assert len(select_list_of_dicts(dict_lst, preds=[lambda d: 'age' in d, lambda d: d['age'] >= 0], unique=True)) == 3

# combine look_like and preds parameters
expected = [{'name': 'Tony Stark', 'sex': 'male', 'age': 49, 'alias': 'Iron Man'}]
assert select_list_of_dicts(dict_lst, look_like={'sex': 'male'},
                            preds=[lambda d: 'age' in d, lambda d: d['age'] > 20]) == expected

# empty list is returned if no dict matches the criteria
assert select_list_of_dicts(dict_lst, look_like={'sex': 'male'},
                            preds=[lambda d: 'sex' in d and d['sex'] == 'female']) == []

unique_list_of_dicts

from pythonic_toolbox.utils.dict_utils import unique_list_of_dicts

dict_lst = [
    {'name': 'Tony Stark', 'sex': 'male', 'age': 49, 'alias': 'Iron Man'},
    {'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
    # Peter Parkers from multiverse in same age.
    {'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
    {'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
    # test same dict, but the order of dict is different
    {'name': 'Peter Parker', 'sex': 'male', 'alias': 'Spider Man', 'age': 16},
]

# Only one Peter Parker will be kept, for all data are exactly same.
assert unique_list_of_dicts(dict_lst) == [
    {'name': 'Tony Stark', 'sex': 'male', 'age': 49, 'alias': 'Iron Man'},
    {'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
]

# edge cases
assert unique_list_of_dicts([]) == []

walk_leaves

from pythonic_toolbox.utils.dict_utils import walk_leaves

data = {
    'k1': {
        'k1_1': 1,
        'k1_2': 2,
    },
    'k2': 'N/A',  # stands for not available
}

expected = {
    'k1': {
        'k1_1': 2,
        'k1_2': 4,
    },
    'k2': 'N/A',  # stands for not available
}
assert walk_leaves(data) == data  # no transform function provided, just a deepcopy
assert walk_leaves(data, trans_fun=lambda x: x * 2 if isinstance(x, int) else x) == expected

# if inplace is set True, will change data inplace, return nothing
assert walk_leaves(data, trans_fun=lambda x: x * 2 if isinstance(x, int) else x, inplace=True) is None
assert data == expected

data = [{'name': 'lml', 'age': 33}, {'name': 'albert', 'age': 18}]
expected = [{'name': 'lml', 'age': 66}, {'name': 'albert', 'age': 36}]
assert walk_leaves(data, trans_fun=lambda x: x * 2 if isinstance(x, int) else x) == expected
assert walk_leaves(data, trans_fun=lambda x: x * 2 if isinstance(x, int) else x, inplace=True) is None
assert data == expected

# edge cases
assert walk_leaves(None) is None
assert walk_leaves([]) == []
assert walk_leaves({}) == {}
assert walk_leaves(None, inplace=True) is None
assert walk_leaves([], inplace=True) is None
assert walk_leaves({}, inplace=True) is None

functional_utils

filter_multi

from pythonic_toolbox.utils.functional_utils import lfilter_multi, filter_multi
from collections.abc import Iterable

def is_even(x):
    return x % 2 == 0

def is_divisible_by_5(x):
    return x % 5 == 0

# select numbers which are divisible by 2 and 5
assert lfilter_multi([is_even, is_divisible_by_5], range(1, 30)) == [10, 20]
assert lfilter_multi([is_even, is_divisible_by_5], [5, 10, 15, 20]) == [10, 20]

from itertools import count, takewhile
# if you want to pass an iterator, make sure the iterator will end/break,
# Note: a bare count(start=0, step=2) will generate number like 0, 2, 4, 6, .... (never ends)
even_numbers_less_equal_than_50 = takewhile(lambda x: x <= 50, count(start=0, step=2))
expected = [0, 10, 20, 30, 40, 50]
assert lfilter_multi([is_even, is_divisible_by_5], even_numbers_less_equal_than_50) == expected

# testing for filter_multi, not converted to list directly
num_iterator = filter_multi([is_even, is_divisible_by_5], [5, 10, 15, 20])
assert type(num_iterator) is filter
assert isinstance(num_iterator, Iterable)
expected = [10, 20]
for idx, value in enumerate(num_iterator):
    assert value == expected[idx]

# when items are infinite, choose filter_multi instead of lfilter_multi
expected = [0, 10, 20, 30, 40, 50]
for idx, value in enumerate(filter_multi([is_even, is_divisible_by_5], count(start=0, step=1))):
    if value > 50:
        break
    else:
        assert value == expected[idx]

list_utils

filter_allowable

from pythonic_toolbox.utils.list_utils import filter_allowable

fruits = ['apple', 'banana', 'orange']
vegetables = ['carrot', 'potato', 'tomato']
meats = ['beef', 'chicken', 'fish']

foods = fruits + vegetables + meats

assert list(filter_allowable(foods)) == foods
assert list(filter_allowable(foods, allow_list=[], block_list=[])) == foods
assert list(filter_allowable(foods, allow_list=['apple', 'banana', 'blueberry'])) == ['apple', 'banana']
assert list(filter_allowable(foods, allow_list=[], block_list=foods)) == []
assert list(filter_allowable(foods, block_list=meats)) == fruits + vegetables
assert list(filter_allowable(foods, allow_list=['apple'], block_list=[])) == ['apple']
assert list(filter_allowable(foods, allow_list=['apple'], block_list=['apple'])) == []
assert list(filter_allowable(foods + ['blueberry'], allow_list=[], block_list=foods)) == ['blueberry']
assert list(filter_allowable(['blueberry'], allow_list=[], block_list=[])) == ['blueberry']
assert list(filter_allowable(['blueberry'], allow_list=[], block_list=['apple', 'banana'])) == ['blueberry']
assert list(filter_allowable(['blueberry'], allow_list=['orange'], block_list=['apple', 'banana'])) == []

# test cases with parameter key
assert list(filter_allowable(foods, allow_list=['a', 'b'], key=lambda x: x[0])) == ['apple', 'banana', 'beef']

# test some basic cases
assert list(filter_allowable()) == []
assert list(filter_allowable(candidates=None)) == []
assert list(filter_allowable(candidates=[])) == []
assert list(filter_allowable(candidates=[], allow_list=[], block_list=[])) == []

sort_with_custom_orders

from operator import itemgetter
from typing import List

import pytest
from pythonic_toolbox.utils.list_utils import sort_with_custom_orders

# basic usage
values = ['branch2', 'branch1', 'branch3', 'master', 'release']
expected = ['master', 'release', 'branch1', 'branch2', 'branch3']
assert sort_with_custom_orders(values, prefix_orders=['master', 'release']) == expected
assert sort_with_custom_orders(values, prefix_orders=['master', 'release'], reverse=True) == expected[::-1]

values = [1, 2, 3, 9, 9]
expected = [9, 9, 1, 2, 3]
assert sort_with_custom_orders(values, prefix_orders=[9, 8, 7]) == expected

values = [1, 2, 3, 9]
expected = [9, 2, 3, 1]
assert sort_with_custom_orders(values, prefix_orders=[9], suffix_orders=[1]) == expected

assert sort_with_custom_orders([]) == []
assert sort_with_custom_orders([], prefix_orders=[], suffix_orders=[]) == []
assert sort_with_custom_orders([], prefix_orders=['master']) == []

# tests for unhashable values
values = [[2, 2], [1, 1], [3, 3], [6, 0]]
assert sort_with_custom_orders(values, prefix_orders=[[3, 3]]) == [[3, 3], [1, 1], [2, 2], [6, 0]]
# if "key" is provided, items are sorted in order of key(item)
# items in prefix_orders/suffix_orders don't need to be one-one correspondence with items to sort
# sum([6]) == sum([3, 3]) == sum([6, 0])
assert sort_with_custom_orders(values, prefix_orders=[[6]], key=sum) == [[3, 3], [6, 0], [1, 1], [2, 2]]

# tests for list of dicts
values = [{2: 2}, {1: 1}, {1: 2}]
assert sort_with_custom_orders(values, prefix_orders=[{2: 2}],
                               key=lambda data: sum(data.values())) == [{2: 2}, {1: 2}, {1: 1}]

branch_info: List[dict] = [{'branch': 'master', 'commit_id': 'v1.2'}, {'branch': 'release', 'commit_id': 'v1.1'}]
# Assume that we prefer choosing branch in order: release > master > others (develop, hotfix etc.)
res = sort_with_custom_orders(branch_info,
                              prefix_orders=[{'branch': 'release'}, {'branch': 'master'}],
                              key=itemgetter('branch'))
expected = [{'branch': 'release', 'commit_id': 'v1.1'}, {'branch': 'master', 'commit_id': 'v1.2'}]
assert res == expected

branch_info = [{'branch': 'develop', 'commit_id': 'v1.3'}, {'branch': 'master', 'commit_id': 'v1.2'}]
res = sort_with_custom_orders(branch_info,
                              prefix_orders=[{'branch': 'release'}, {'branch': 'master'}],
                              key=itemgetter('branch'))
expected = [{'branch': 'master', 'commit_id': 'v1.2'}, {'branch': 'develop', 'commit_id': 'v1.3'}]
assert res == expected

# tests for exceptions
with pytest.raises(ValueError) as exec_info:
    sort_with_custom_orders([1, 2, 3], prefix_orders=[3], suffix_orders=[3])
assert exec_info.value.args[0] == 'prefix and suffix contains same value'

with pytest.raises(ValueError) as exec_info:
    sort_with_custom_orders([1, 2, 3], prefix_orders=[1, 1])
assert exec_info.value.args[0] == 'prefix_orders contains duplicated values'

# tests for class
class Person:
    def __init__(self, id, name, age):
        self.id = id
        self.name = name
        self.age = age

    def __lt__(self, other: 'Person'):
        return self.age < other.age

    def __eq__(self, other: 'Person'):
        return self.age == other.age

    def __hash__(self):
        return self.id

    def __str__(self):
        return f'Person({self.id}, {self.name}, {self.age})'

    def __repr__(self):
        return str(self)

Albert = Person(1, 'Albert', 28)
Alice = Person(2, 'Alice', 26)
Menglong = Person(3, 'Menglong', 33)

persons = [Albert, Alice, Menglong]
expected = [Alice, Albert, Menglong]
assert sort_with_custom_orders(persons) == expected

expected = [Menglong, Alice, Albert]
assert sort_with_custom_orders(persons, prefix_orders=[Menglong, Person(4, 'Anyone', 40)]) == expected

unpack_list

import pytest
from pythonic_toolbox.utils.list_utils import unpack_list

first, second, third = unpack_list(['a', 'b', 'c', 'd'], target_num=3)
assert first == 'a' and second == 'b' and third == 'c'

first, second, third = unpack_list(['a', 'b'], target_num=3, default=None)
assert first == 'a' and second == 'b' and third is None

first, second, third = unpack_list(range(1, 3), target_num=3, default=None)
assert first == 1 and second == 2 and third is None

first, second, third = unpack_list([], target_num=3, default=0)
assert first == second == third == 0

first, second, *rest = unpack_list(['a', 'b', 'c'], target_num=4, default='x')
assert first == 'a' and second == 'b' and rest == ['c', 'x']

# test case for type range
first, second, third = unpack_list(range(1, 3), target_num=3, default=None)
assert first == 1 and second == 2 and third is None

def fib():
    a, b = 0, 1
    while 1:
        yield a
        a, b = b, a + b

# test case for type generator
fib_generator = fib()  # generates data like [0, 1, 1, 2, 3, 5, 8, 13, 21 ...]
first, second, third, *rest = unpack_list(fib_generator, target_num=6)
assert first == 0 and second == 1 and third == 1
assert rest == [2, 3, 5]
seventh, eighth = unpack_list(fib_generator, target_num=2)
assert seventh == 8 and eighth == 13

# test edge case, nothing to unpack
empty = unpack_list([], target_num=0, default=None)
assert empty == []

res = unpack_list([], target_num=2, default=None)
assert res == [None, None]

empty = unpack_list(['a', 'b'], target_num=0, default=None)
assert empty == []

empty = unpack_list(range(0, 0), target_num=0)
assert empty == []

empty = unpack_list(iter([]), target_num=0, default=None)
assert empty == []

with pytest.raises(ValueError):
    # ValueError: not enough values to unpack (expected 3, got 2)
    first, second, third = unpack_list([1, 2], target_num=2)

until

from itertools import count

from pythonic_toolbox.utils.list_utils import until

# basic usage
counter = count(1, 2)  # generator of odd numbers: 1, 3, 5, 7 ...
assert until(counter, lambda x: x > 10) == 11

assert until([1, 2, 3], lambda x: x > 10, default=11) == 11

# test case for when there's no default value and no item in the iterable satisfies the condition
assert until([1, 2, 3], lambda x: x > 10) is None

# edge cases
assert until([], default=3) == 3  # nothing provided, return default
assert until(None, lambda x: x > 10, default=11) == 11

# test case for when there's no item in the counter satisfies the condition
# the following codes will run forever, so comment them out
# counter = count(1, 2)  # generator of odd numbers: 1, 3, 5, 7 ...
# assert until(counter, lambda x: x % 2 == 0) is None

# test case for when max_iter_num is provided, only iterate the counter for max_iter_num times
counter = count(1, 2)  # generator of odd numbers: 1, 3, 5, 7 ...
assert until(counter, lambda x: x % 2 == 0, default=None, max_iter_num=100) is None

numbers = [1, 2, 3, 4, 5, 6]
assert until(numbers, lambda x: x >= 5, default=None, max_iter_num=1) is None
assert until(numbers, lambda x: x >= 5, default=None, max_iter_num=4) is None
assert until(numbers, lambda x: x >= 5, default=None, max_iter_num=5) == 5
assert until(numbers, lambda x: x >= 5, default=None, max_iter_num=100) == 5

string_utils

substitute_string_template_dict

from unittest.mock import patch, PropertyMock

import pytest
from pythonic_toolbox.utils.string_utils import substitute_string_template_dict, CycleError

# simple usage
# both $variable ${variable} declarations are supported in string template format
str_template_dict = {
    'greeting': 'Good Morning, Everyone!',
    'first_name': 'Albert',
    'last_name': 'Lee',
    'full_name': '$first_name $last_name',
    'age': 34,
    'speech': '$greeting, I am $full_name, a ${age}-year-old programmer, very glad to meet you!'
}
output_dict = substitute_string_template_dict(str_template_dict)
assert output_dict['full_name'] == 'Albert Lee'
expected_speech = 'Good Morning, Everyone!, I am Albert Lee, a 34-year-old programmer, very glad to meet you!'
assert output_dict['speech'] == expected_speech

# complex usage, with dynamic values, and multi value-providing holders
str_template_dict = {
    'first_name': 'Daenerys',
    'last_name': 'Targaryen',
    'nick_name': 'Dany',
    'full_name': '$first_name $last_name',
    'speech': "$nick_name: I'm $full_name ($title1, $title2, $title3), it's $current_time_str, $greeting!",
}

variables_dict = {'title1': 'Queen of Meereen',
                  'title2': 'Mother of Dragons'}

class DynamicVariables:
    @property
    def current_time_str(self):
        import datetime
        return datetime.datetime.now().strftime("%H:%M:%S")

class DefaultUnknownTitle:
    """
    A class will always return UnknownTitle, when try to access attribute like
    title1, title2, ..., titleX
    """

    def __getattribute__(self, item):
        if isinstance(item, str) and item.startswith('title') and item[len(item) - 1:].isdigit():
            return 'UnknownTitle'
        return super(DefaultUnknownTitle, self).__getattribute__(item)

expected_speech = ("Dany: I'm Daenerys Targaryen (Queen of Meereen, Mother of Dragons, UnknownTitle), "
                   "it's 08:00:00, good morning everyone!")

# using mock to make DynamicVariables().current_time_str always return 08:00:00
with patch.object(DynamicVariables, 'current_time_str', return_value='08:00:00', new_callable=PropertyMock):
    output_dict = substitute_string_template_dict(str_template_dict, variables_dict, DynamicVariables(),
                                                  DefaultUnknownTitle(),
                                                  greeting='good morning everyone')
    assert output_dict['speech'] == expected_speech

# edge cases
assert substitute_string_template_dict({}) == {}

# cycle detection
str_template_dict = {
    'variable_a': 'Hello $variable_b',  # variable_a depends on variable_b
    'variable_b': 'Hello $variable_a',  # variable_b depends on variable_a, it's a cycle!
}

with pytest.raises(CycleError) as exec_info:
    substitute_string_template_dict(str_template_dict)

context

SkipContext

import itertools

import pytest
from pythonic_toolbox.utils.context_utils import SkipContext

# Usage: define a class that inherits the SkipContext,
# and takes control of the skip or not logic
class MyWorkStation(SkipContext):

    def __init__(self, week_day: str):
        working_days = {'monday', 'tuesday', 'wednesday', 'thursday', 'friday'}
        weekends = {'saturday', 'sunday'}

        if week_day.lower() not in working_days.union(weekends):
            raise ValueError(f'Invalid weekday {week_day}')

        skip = True if week_day.lower() in weekends else False
        super(MyWorkStation, self).__init__(skip=skip)

seven_week_days = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
logged_opening_days = []
total_working_hours = 0

for cur_week_day in seven_week_days:
    # MyWorkStation will skip the code block when encountering weekends
    with MyWorkStation(week_day=cur_week_day):
        # log this working day
        logged_opening_days.append(cur_week_day)
        # accumulate working hours, 8 hours on each working day
        total_working_hours += 8

# only working days are logged
assert logged_opening_days == ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday']
assert total_working_hours == 8 * 5

# test basic SkipContext
count_iterator = itertools.count(start=0, step=1)

flg_skip = True
with SkipContext(skip=flg_skip):
    # if skip = True, all codes inside the context will be skipped(not executed)
    next(count_iterator)  # this will not be executed
    assert sum([1, 1]) == 3
    raise Exception('Codes will not be executed')

assert next(count_iterator) == 0  # check previous context is skipped

flg_skip = False
with SkipContext(skip=flg_skip):
    # codes will be executed as normal, if skip = False
    next(count_iterator)  # generate value 1
    assert sum([1, 1]) == 2

assert next(count_iterator) == 2  # check previous context is executed

with pytest.raises(Exception) as exec_info:
    with SkipContext(skip=False):
        # if skip = False, this SkipContextManager is transparent,
        # internal exception will be detected as normal
        raise Exception('MyError')
assert exec_info.value.args[0] == 'MyError'

# another example: ensure there will be only one job, who acquire the lock, run the increase +1

from multiprocessing import Manager, Pool
import time

from pythonic_toolbox.utils.context_utils import SkipContext


def plain_cronjob_increase(ns, lock):
    start = time.time()
    with lock:
        now = time.time()
        if now - start >= 0.5:
            pass
        else:
            ns.cnt += 1
            time.sleep(1)
    return ns.cnt


class PreemptiveLockContext(SkipContext):
    def __init__(self, lock):
        self.start_time = time.perf_counter()
        self.lock = lock
        self.acquired = self.lock.acquire(timeout=0.5)
        skip = not self.acquired
        super(PreemptiveLockContext, self).__init__(skip=skip)

    def __exit__(self, type, value, traceback):
        if self.acquired:
            time.sleep(1)
            self.lock.release()
        if type is None:
            return  # No exception
        else:
            if issubclass(type, self.SkipContentException):
                return True  # Suppress special SkipWithBlockException
            return False


def cronjob_increase(ns, lock):
    # for those who cannot acquire the lock within some time
    # this context block will be skipped, quite simple
    with PreemptiveLockContext(lock):
        ns.cnt += 1
    return ns.cnt



manager = Manager()
lock = manager.Lock()
ns = manager.Namespace()
pool = Pool(2)

ns.cnt = 0
processes = [pool.apply_async(plain_cronjob_increase, args=(ns, lock)) for __ in range(0, 2)]
result = [p.get() for p in processes]
assert result == [1, 1]
assert ns.cnt == 1

# reset global cnt=0
ns.cnt = 0
processes = [pool.apply_async(cronjob_increase, args=(ns, lock)) for __ in range(0, 2)]
result = [p.get() for p in processes]
assert result == [1, 1]
assert ns.cnt == 1

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pythonic-toolbox-1.1.39.tar.gz (53.9 kB view details)

Uploaded Source

Built Distribution

pythonic_toolbox-1.1.39-py3-none-any.whl (31.8 kB view details)

Uploaded Python 3

File details

Details for the file pythonic-toolbox-1.1.39.tar.gz.

File metadata

  • Download URL: pythonic-toolbox-1.1.39.tar.gz
  • Upload date:
  • Size: 53.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.8.0 pkginfo/1.10.0 readme-renderer/34.0 requests/2.27.1 requests-toolbelt/1.0.0 urllib3/1.26.18 tqdm/4.64.1 importlib-metadata/4.8.3 keyring/23.4.1 rfc3986/1.5.0 colorama/0.4.5 CPython/3.6.15

File hashes

Hashes for pythonic-toolbox-1.1.39.tar.gz
Algorithm Hash digest
SHA256 35cf55a0dfb41c799d8b059f1f175ab708e87365d71365432d508f641bd8520a
MD5 2b7b94f2c2144e1ee4ef58646c1ee76d
BLAKE2b-256 5f7e5d121510194fc42afeaaa426fd30a0b514248f04478e64543c9884bec106

See more details on using hashes here.

File details

Details for the file pythonic_toolbox-1.1.39-py3-none-any.whl.

File metadata

  • Download URL: pythonic_toolbox-1.1.39-py3-none-any.whl
  • Upload date:
  • Size: 31.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.8.0 pkginfo/1.10.0 readme-renderer/34.0 requests/2.27.1 requests-toolbelt/1.0.0 urllib3/1.26.18 tqdm/4.64.1 importlib-metadata/4.8.3 keyring/23.4.1 rfc3986/1.5.0 colorama/0.4.5 CPython/3.6.15

File hashes

Hashes for pythonic_toolbox-1.1.39-py3-none-any.whl
Algorithm Hash digest
SHA256 9064b346ecd655d2889475cc17dd6617a74d18f2e90956b95ec6f3904609ce49
MD5 01730d524f79a32bb5ee582eae99d9d8
BLAKE2b-256 99ea03fdf7e398b8fa88d19531a024e257f6139d59d261f87169e72d90472a53

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page