A package for creating task helpers.
Project description
Task helpers - a package for creating task helpers.
The package allows you to work with tasks. The idea is that it would be possible to create a task and send it for execution / processing somewhere (to the worker), without waiting for the result to be executed in the same block of code. Or, for example, different clients (from different threads) can send many tasks for processing and each wait for its own result.
Usage example
Client side:
import redis
from task_helpers.couriers.redis import RedisClientTaskCourier
task_courier = RedisClientTaskCourier(redis_connection=redis.Redis())
QUEUE_NAME = "bulk_data_saving"
def to_save(task_data):
# add a task to the queue
task_id = task_courier.add_task_to_queue(
queue_name=QUEUE_NAME,
task_data=task_data)
# waiting for the task to complete in the worker.
saved_object = task_courier.wait_for_task_result(
queue_name=QUEUE_NAME,
task_id=task_id)
return saved_object
if __name__ == "__main__":
# Many clients send data
task_data = {
"id": "123",
"name": "tomato",
"price": "12.45"
}
saved_object = to_save(task_data=task_data)
print(saved_object)
# {'name': 'tomato', 'price': '12.45', 'id': '123')}
Worker side:
import redis
from task_helpers.couriers.redis import RedisWorkerTaskCourier
from task_helpers.workers.base import BaseWorker
task_courier = RedisWorkerTaskCourier(redis_connection=redis.Redis())
QUEUE_NAME = "bulk_data_saving"
class BulkSaveWorker(BaseWorker):
queue_name = QUEUE_NAME
def perform_tasks(self, tasks):
data_dicts = [task_data for task_id, task_data in tasks]
# bulk saving data_dicts (it's faster than saving 1 at a time.)
print(f"saved {len(data_dicts)} objects: \n{data_dicts}")
# saved 1 objects:
# [{'id': '123', 'name': 'tomato', 'price': '12.45'}]
return tasks
if __name__ == "__main__":
worker = BulkSaveWorker(task_courier=task_courier)
worker.perform(total_iterations=500)
# the worker will complete its work after 500 iterations
# (in the future functionality it is necessary to prevent memory leaks)
The couriers module
the couriers module is responsible for sending tasks from the worker to the client and back, as well as checking the execution status.
Client side methods (ClientTaskCourier):
- get_task_result - returns task retuls, if it exists.
- wait_for_task_result - Waits for the task result to appear.
- add_task_to_queue - adds a task to the queue for processing;
Worker side methods (WorkerTaskCourier):
- get_tasks - pops tasks from queue and returns it.
- get_task - returns one task from queue.
- wait_for_task - waits for task and returns it.
- return_task_result - returns result to the client side.
ClientWorkerTaskCourier:
- all of the above
The workers module
The workers module is intended for executing and processing tasks.
BaseWorker
A worker that can process many tasks in one iteration. (This can be useful if task_data are objects on which some operations can be done in bulk)
BaseWorker methods:
- wait_for_tasks - waits for tasks in the queue, pops and returns them;
- perform_tasks - abstract method for processing tasks. Should return a list of tasks.
- return_task_results - method method for sending task results to the clients.
- perform - the main method that starts the task worker. total_iterations argument are required (how many processing iterations the worker should do.)
ClassicWorker
Сlassic worker, where the task is a tuple: (task_id, task_data). task_data is a dictionary with keys "function", "args" and "kwargs". Arguments "args" and "kwargs" are optional.
ClassicWorker methods:
- perform_tasks - Method for processing tasks. Returns a list of tasks. task is a tuple: (task_id, task_data). task_data is a dictionary with keys "function", "args" and "kwargs". Calls a function with args "args" and kwargs "kwargs", unpacking them, and returns the execution result. Arguments "args" and "kwargs" are optional.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for task_helpers-1.0.0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 211c839fd64cb733ff215ae85e1fcad5ae2281aa0ec3f273bf084fb6b6f05d90 |
|
MD5 | e5b886e0f27a9e3d29ddd824d023b175 |
|
BLAKE2b-256 | cf9d0b5c9a5a2eb6914ed0c6b304ba22b76256087a856c09cdb3d723f294652f |