Skip to main content

A package for creating task helpers.

Project description

Task helpers - a package for creating task helpers.

The package allows you to work with tasks. The idea is that it would be possible to create a task and send it for execution / processing somewhere (to the worker), without waiting for the result to be executed in the same block of code. Or, for example, different clients (from different threads) can send many tasks for processing and each wait for its own result.

Usage example

# Run redis (This can be done in many ways, not necessarily through docker):
docker run -p 6379:6379 redis

Client side:

import redis

from task_helpers.couriers.redis import RedisClientTaskCourier

task_courier = RedisClientTaskCourier(redis_connection=redis.Redis())
QUEUE_NAME = "bulk_data_saving"


def to_save(task_data):
    # Adding a task to the queue.
    task_id = task_courier.add_task_to_queue(
        queue_name=QUEUE_NAME,
        task_data=task_data)

    # waiting for the task to complete in the worker.
    saved_object = task_courier.wait_for_task_result(
        queue_name=QUEUE_NAME,
        task_id=task_id)
    return saved_object


if __name__ == "__main__":
    # Many clients can add tasks to the queue at the same time.
    task_data = {
        "name": "tomato",
        "price": "12.45"
    }
    saved_object = to_save(task_data=task_data)
    print(saved_object)
    # {'name': 'tomato', 'price': '12.45', 'id': UUID('...'), 'status': 'active')}

Worker side:

import uuid
import redis

from task_helpers.couriers.redis import RedisWorkerTaskCourier
from task_helpers.workers.base import BaseWorker

task_courier = RedisWorkerTaskCourier(redis_connection=redis.Redis())
QUEUE_NAME = "bulk_data_saving"


class BulkSaveWorker(BaseWorker):
    queue_name = QUEUE_NAME

    def bulk_saving_plug(self, tasks):
        for task_id, task_data in tasks:
            task_data["id"] = uuid.uuid4()
            task_data["status"] = "active"
        return tasks

    def perform_tasks(self, tasks):
        tasks = self.bulk_saving_plug(tasks)
        # Bulk saving data_dicts (it's faster than saving 1 at a time.)

        print(f"saved {len(tasks)} objects.")
        # saved 1 objects.

        return tasks


if __name__ == "__main__":
    worker = BulkSaveWorker(task_courier=task_courier)
    worker.perform(total_iterations=500)
    # the worker will complete its work after 500 iterations
    # (in the future functionality it is necessary to prevent memory leaks)

Installation

pip install task_helpers

The couriers module

the couriers module is responsible for sending tasks from the worker to the client and back, as well as checking the execution status.

Client side methods (ClientTaskCourier):

  • get_task_result - returns the result of the task, if it exists.
  • wait_for_task_result - waits for the result of the task to appear, and then returns it.
  • add_task_to_queue - adds one task to the queue for processing.
  • bulk_add_tasks_to_queue - adds many tasks to the queue for processing.
  • check_for_done - сhecks if the task has completed.

Worker side methods (WorkerTaskCourier):

  • get_task - pops one task from the queue and returns it.
  • bulk_get_tasks - pops many tasks from the queue and returns them.
  • wait_for_task - Waits for a task to appear, pops it from the queue, and returns it.
  • return_task_result - returns the result of the processing of the task to the client side.
  • bulk_return_task_results - returns the results of processing multiple tasks to the client side.

ClientWorkerTaskCourier:

  • all of the above

The workers module

The workers module is intended for executing and processing tasks.

BaseWorker

A worker that can process many tasks in one iteration. (This can be useful if task_data are objects on which some operations can be done in bulk)

BaseWorker methods:

  • wait_for_tasks - waits for tasks in the queue, pops and returns them;
  • perform_tasks - abstract method for processing tasks. Should return a list of tasks.
  • return_task_results - method method for sending task results to the clients.
  • perform - the main method that starts the task worker. total_iterations argument are required (how many processing iterations the worker should do.)

ClassicWorker

Сlassic worker, where the task is a tuple: (task_id, task_data). task_data is a dictionary with keys "function", "args" and "kwargs". Arguments "args" and "kwargs" are optional.

ClassicWorker methods:

  • perform_tasks - Method for processing tasks. Returns a list of tasks. task is a tuple: (task_id, task_data). task_data is a dictionary with keys "function", "args" and "kwargs". Calls a function with args "args" and kwargs "kwargs", unpacking them, and returns the execution result. Arguments "args" and "kwargs" are optional.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

task_helpers-1.2.0.tar.gz (16.6 kB view details)

Uploaded Source

Built Distribution

task_helpers-1.2.0-py3-none-any.whl (12.7 kB view details)

Uploaded Python 3

File details

Details for the file task_helpers-1.2.0.tar.gz.

File metadata

  • Download URL: task_helpers-1.2.0.tar.gz
  • Upload date:
  • Size: 16.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.10.6

File hashes

Hashes for task_helpers-1.2.0.tar.gz
Algorithm Hash digest
SHA256 502512cb95ea4dd880975340857b182f5b4f9baf0c9f18d0062a5fbfe51dd587
MD5 5b1b94a0833d5bf7a3f71b663a5ecba4
BLAKE2b-256 b19c903ba61ab0d27e48fca6e6ce2dbf4735ec50264b93a8fafd4dc825d62b8f

See more details on using hashes here.

File details

Details for the file task_helpers-1.2.0-py3-none-any.whl.

File metadata

  • Download URL: task_helpers-1.2.0-py3-none-any.whl
  • Upload date:
  • Size: 12.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.10.6

File hashes

Hashes for task_helpers-1.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 c0a32e771dbf9a19de7a5452dbd80cf73694f4dd022875ec406a336bb9b818c8
MD5 2270a76cf6360166e048231a7e23ffc9
BLAKE2b-256 cb31df042837a05db7320c3fc1d1ad071285e053e10e8410821d2e7efe9e6106

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page