Threads utilities in Python
Project description
Threads Utilities
Threads utilities in Python.
Threads pool manager
Description
Threads pool manager is a threads pool manager that manages a pool of task executors that execute tasks.
Queue Placement Algorithms
Algorithm | Description |
---|---|
STRICT_PRIORITY |
Higher priority tasks are added to the queue before lower priority tasks. |
AVOID_STARVATION_PRIORITY |
Tasks are added to the queue in STRICT_PRIORITY ,and if tasks are not added to the last place in the queue in avoid_starvation_amount times,the next task is added to the last place in the queue and will be flagged with avoid_starvation flag.Other tasks will be added to the queue in AVOID_STARVATION_PRIORITY order after that flagged task, so this task will be executed before them. |
Executors extension pool
The threads pool manager has an executors extension pool that extends the pool of task executors
when the pool is full and at least one task executor run more than executors_timeout_ms
.
ThreadsPoolManager class
Methods
Method | Description | Parameters | Returns |
---|---|---|---|
add_task |
Adds a task to the threads pool. | task (ThreadTask | MethodTask) The task to add to the threads pool.task_id (Optional[str], Default: None) Task identifier.priority (int, Default: 1) Task Priority. Higher number is higher priority.queue_placement (QueuePlacementEnum) Queue placement algorithm. |
None |
get_task |
Gets a task from the threads pool. | task_id (str) Task identifier. |
ThreadTask | MethodTask | None Return task if task is found, otherwise return None. |
is_task_exists |
Checks if task exists in the threads pool. | task_id (str) Task identifier. |
bool True if task exists, otherwise False. |
reset_metrics |
Resets Metrics. | None |
None |
start |
Starts the threads pool. | None |
None |
shutdown |
Shuts down the threads pool. | None |
None |
active_tasks_amount |
Returns the amount of active tasks in the threads pool. | @propery |
int Active tasks amount. |
avoid_starvation_amount |
Returns Avoid Starvation Amount | @propery |
int Avoid Starvation Amount. |
executors_extension_pool_size |
Returns Executors Extension Pool Size. | @propery |
int Executors Extension Pool Size. |
executors_timeout_ms |
Returns Executors Timeout in milliseconds. | @propery |
int Executors Timeout in milliseconds. |
executors_timeout_ms |
Sets Executors Timeout in milliseconds. | @setter Executors Timeout in milliseconds. |
None |
finished_tasks |
Returns finished tasks. Finished tasks list will be truncated after each call. |
@propery |
List[ThreadTask | MethodTask] Finished tasks. |
max_executors_extension_pool_size |
Returns Max Executors Extension Pool Size. | @propery |
int Max Executors Extension Pool Size. |
max_executors_extension_pool_size |
Sets Max Executors Extension Pool Size. | @setter Max Executors Extension Pool Size. |
None |
max_executors_pool_size |
Returns Max Executors Pool Size. | @propery |
int Max Executors Pool Size. |
max_executors_pool_size |
Sets Max Executors Pool Size. | @setter Max Executors Pool Size. |
None |
max_queue_size |
Returns Queue Size limitation. | @propery |
int Max Queue Size. |
max_queue_size |
Sets Max Queue Size. | @setter Max Queue Size. |
None |
metrics |
Returns Metrics. | @propery |
ThreadsPoolManagerMetrics Metrics. |
name |
Returns Threads Pool Name. | @propery |
str Threads Pool Name. |
name |
Sets Threads Pool Name. | @setter Threads Pool Name. |
None |
queue |
Returns Queue. | @propery |
Queue Queue. |
queue_size |
Returns Queue Size. | @propery |
int Queue Size. |
TaskExecutor class
Methods
Method | Description | Parameters | Returns |
---|---|---|---|
start |
Starts the task executor. | None |
None |
avoid_starvation_flag |
Returns Avoid Starvation Flag. | @propery |
bool |
avoid_starvation_flag |
Sets Avoid Starvation Flag. | @setter |
None |
exception |
Returns Exception, if it happened in method that was executed in MethodTask. | @propery |
Exception |
stack_trace |
Returns stack trace, if exception happened in method that was executed in MethodTask. | @propery |
str Exception stack trace. |
priority |
Returns priority. | @propery |
int |
task |
Returns task. | @propery |
ThreadTask | MethodTask |
task_id |
Returns task id. | @propery |
str |
ThreadTask class
Task that runs a thread.
Methods
Method | Description | Parameters | Returns |
---|---|---|---|
execute |
Executes the task. | None |
None |
task_instance |
Returns task instance. | @propery |
Thread Task Thread instance. |
alive_date_ms |
Returns task alive date ms. | @propery |
int Alive date in ms. |
start_date_ms |
Returns task start date ms. | @propery |
int Start date in ms. |
start_date_ms |
Sets task start date ms. | @setter |
None |
task_state |
Returns task state. | @propery |
TaskStateEnum Task state. |
task_state |
Sets task state. | @setter |
None |
MethodTask class
Task that runs a method.
Methods
Method | Description | Parameters | Returns |
---|---|---|---|
execute - |
Executes the task. | None |
None |
alive_date_ms |
Returns task alive date ms. | @propery |
int Alive date in ms. |
exception |
Returns Exception, if it happened. | @propery |
Exception |
stack_trace |
Returns stack trace, if exception happened. | @propery |
str Exception stack trace. |
start_date_ms |
Returns task start date ms. | @propery |
int Start date in ms. |
start_date_ms |
Sets task start date ms. | @setter |
None |
task_state |
Returns task state. | @propery |
TaskStateEnum Task state. |
task_state |
Sets task state. | @setter |
None |
ThreadsPoolManagerMetrics class
dataclass that holds threads pool manager metrics.
Properties
Property | Description | Type |
---|---|---|
avoid_starvation_counter |
Count the times that AVOID_STARVATION_PRORITY flag has been raised. |
int |
executed_method_counter |
Count the times that MethodTask has been executed. |
int |
executed_task_counter |
Count the times that ThreadTask|MethodTask has been executed. |
int |
executed_thread_counter |
Count the times that ThreadTask has been executed. |
int |
max_execution_date_ms |
Max execution date in ms. | int |
max_queue_size |
Max size that the queue has reached. | int |
method_tasks_counter_dict |
Count the times that MethodTask has been executed by task id.key is task priority. |
dict[int, int] |
tasks_priority_counter_dict |
Count the times that ThreadTask|MethodTask has been executed by task id.key is task priority. |
dict[int, int] |
thread_tasks_counter_dict |
Count the times that ThreadTask has been executed by task id.key is task priority. |
dict[int, int] |
TaskStateEnum Enum
Task state enum.
Enum | Description |
---|---|
QUEUE |
Task is in queue. |
EXECUTORS_POOL |
Task is in executors pool. |
EXECUTED |
Task is finished. |
QueuePlacementEnum Enum
Queue placement enum.
Enum | Description |
---|---|
STRICT_PRIORITY |
Strict Priority. |
AVOID_STARVATION_PRIORITY |
Avoid Starvation Priority. |
Examples:
-
Add tasks with
STRICT_PRIORITY
queue placement.Create a threads pool manager with 2 executors pool size.
from nrt_threads_utils.threads_pool_manager.threads_pool_manager import ThreadsPoolManager threads_pool_manager = ThreadsPoolManager(executors_pool_size=2) threads_pool_manager.start()
Add two tasks to the threads pool manager.
The tasks will be executed as the two executors are empty.from nrt_threads_utils.threads_pool_manager.tasks import ThreadTask t_1 = CustomThread() t_2 = CustomThread() threads_pool_manager.add_task(ThreadTask(t_1), priority=1) threads_pool_manager.add_task(ThreadTask(t_1), priority=1)
Add 2 tasks in priority 1.
Default queue placement isSTRICT_PRIORITY
.t_3 = CustomThread() t_4 = CustomThread() threads_pool_manager.add_task(ThreadTask(t_3), priority=1) threads_pool_manager.add_task(ThreadTask(t_4), priority=1)
Add task in priority 2 (Higher priority) Default queue placement is
STRICT_PRIORITY
.t_5 = CustomThread() threads_pool_manager.add_task(ThreadTask(t_3), priority=2)
Task in priority 2 is executed before tasks in priority 1.
-
Add tasks with
AVOID_STARVATION_PRIORITY
queue placement.Create a threads pool manager with 2 executors pool size.
avoid_starvation_amount
is set to 1.from nrt_threads_utils.threads_pool_manager.threads_pool_manager import ThreadsPoolManager threads_pool_manager = ThreadsPoolManager(executors_pool_size=2) threads_pool_manager.avoid_starvation_amount = 1 threads_pool_manager.start()
Add 2 tasks in priority 1.
from nrt_threads_utils.threads_pool_manager.tasks import ThreadTask from nrt_threads_utils.threads_pool_manager.enums import QueuePlacementEnum t_1 = CustomThread() t_2 = CustomThread() threads_pool_manager.add_task( ThreadTask(t_1), priority=1, queue_placement=QueuePlacementEnum.AVOID_STARVATION_PRIORITY) threads_pool_manager.add_task( ThreadTask(t_2), priority=1, queue_placement=QueuePlacementEnum.AVOID_STARVATION_PRIORITY)
Add task in priority 2 (Higher priority)
t_5 = CustomThread() threads_pool_manager.add_task( ThreadTask(t_3), priority=2, queue_placement=QueuePlacementEnum.AVOID_STARVATION_PRIORITY)
Task in priority 2 is executed before tasks in priority 1.
Avoid starvation counter is increased by 1 because the task is not appended to the end of the queue.Add another task in priority 2 (Higher priority)
t_5 = CustomThread() threads_pool_manager.add_task( ThreadTask(t_3), priority=2, queue_placement=QueuePlacementEnum.AVOID_STARVATION_PRIORITY)
The task will flag with
avoid_starvation_flag
and will be added to the end of the queue.Add the next tasks will be added in Avoid Starvation Priority after the flagged task.
-
Metrics.
Create method that sleep for 10 seconds.
Code
from nrt_threads_utils.threads_pool_manager.threads_pool_manager import ThreadsPoolManager from nrt_threads_utils.threads_pool_manager.tasks import MethodTask from time import sleep def sleep_10_sec(): sleep(10) threads_pool_manager = \ ThreadsPoolManager(executors_pool_size=1) try: threads_pool_manager.start() threads_pool_manager.add_task(MethodTask(sleep_10_sec), priority=1) sleep(0.2) threads_pool_manager.add_task(MethodTask(sleep_10_sec), priority=2) threads_pool_manager.add_task(MethodTask(sleep_10_sec), priority=2) metrics = threads_pool_manager.metrics print(f'Max queue size: {metrics.max_queue_size}') print(f'Max execution date ms {metrics.max_execution_date_ms}') print(f'Executed tasks counter: {metrics.executed_tasks_counter}') print(f'Executed threads counter: {metrics.executed_threads_counter}') print(f'Executed methods counter: {metrics.executed_methods_counter}') print(f'Avoid starvation counter: {metrics.avoid_starvation_counter}') print(f'Tasks priority counter dict {metrics.tasks_priority_counter_dict}') finally: threads_pool_manager.shutdown() threads_pool_manager.join()
Output
Max queue size: 2 Max execution date ms 10000 Executed tasks counter: 3 Executed threads counter: 0 Executed methods counter: 3 Avoid starvation counter: 0 Tasks priority counter dict {1: 1, 2: 2}
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file nrt_threads_utils-2.0.4.tar.gz
.
File metadata
- Download URL: nrt_threads_utils-2.0.4.tar.gz
- Upload date:
- Size: 17.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.0.0 CPython/3.11.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | e3f14d80ec3e9629cb76efe80ff8262ef81b66d4f90dc72dffd7128a830888c8 |
|
MD5 | f38338dd8bae9c15da831df0ce6d4d6c |
|
BLAKE2b-256 | e87b56760d7f3412d04392f5139db15148b6b6b127842da9ebb306b8cc248997 |
File details
Details for the file nrt_threads_utils-2.0.4-py3-none-any.whl
.
File metadata
- Download URL: nrt_threads_utils-2.0.4-py3-none-any.whl
- Upload date:
- Size: 16.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.0.0 CPython/3.11.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 9d4512725583c85ee627e157cd1f85955cb8d381637d40acce77354d673356b0 |
|
MD5 | 67ae50079f6584c190f3c515398cc1cf |
|
BLAKE2b-256 | 63e669b569aee41a1b2e21165f1842fe63e8d71db34f8a5ebef985de5296bfe4 |