Skip to main content

a distributed task runner

Project description

taskkit

pypi: https://pypi.org/project/taskkit/

Overview

taskkit is a distributed task runner.

How to use

1. Implement TaskHandler

This is the core part.

import json
from typing import Any
from taskkit import TaskHandler, Task, DiscardTask


class Handler(TaskHandler):
    def handle(self, task: Task):
        # Use `tagk.group` and `task.name` to determine how to handle the task
        if task.group == '...':
            if task.name == 'foo':
                # decode the data which encoded by `self.encode_data` if needed
                data = json.loads(task.data)
                # do something with the `data`
                ...
                # return result for the task
                return ...

            elif task.name == 'bar':
                # do something
                return ...

        # you should raise DiscardTask if you want to discard the task
        raise DiscardTask

    def get_retry_interval(self,
                           task: Task,
                           exception: Exception) -> float | None:
        # This method will be called if the handle method raises exceptions. You
        # should return how long time should be wait to retry the task in seconds
        # as float. If you don't want to retry the task, you can return None to
        # make the task fail or raise DiscardTask to discard the task.
        return task.retry_count if task.retry_count < 10 else None

    def encode_data(self, group: str, task_name: str, data: Any) -> bytes:
        # encode data of tasks for serializing it
        return json.dumps(data).encode()

    def encode_result(self, task: Task, result: Any) -> bytes:
        # encode the result of the task
        return json.dumps(result).encode()

    def decode_result(self, task: Task, encoded: bytes) -> Any:
        # decode the result of the task
        return json.loads(encoded)

2. Make Kit

Use redis impl

You can use redis backend like this:

from redis.client import Redis
from taskkit.impl.redis import make_kit

REDIS_HOST = '...'
REDIS_PORT = '...'

redis = Redis(host=REDIS_HOST, port=REDIS_PORT)
kit = make_kit(redis, Handler())

Use django impl

  1. Add 'taskkit.contrib.django' to INSTALLED_APPS in the settings
  2. Run python manage.py migrate
  3. Make a kit instance like below:
from redis.client import Redis
from taskkit.impl.django import make_kit

kit = make_kit(Handler())

3. Run workers

GROUP_NAME = 'Any task group name'

# it starts busy loop
kit.start(
    # number of processes
    num_processes=3,
    # number of worker threads per process
    num_worker_threads_per_group={GROUP_NAME: 3})

# you can use `start_processes` to avoid busy loop
kit.start_processes(
    num_processes=3,
    num_worker_threads_per_group={GROUP_NAME: 3},
    daemon=True)

4. Initiate task

from datetime import timedelta
from taskkit import ResultGetTimedOut


result = kit.initiate_task(
    GROUP_NAME,
    # task name
    'your task name',
    # task data which can be encoded by `Handler.encode_data`
    dict(some_data=1),
    # run the task after 10 or more seconds.
    due=datetime.now() + timedelta(seconds=10))

try:
    value = result.get(timeout=10)
except ResultGetTimedOut:
    ...

Scheduled Tasks

from datetime import timezone, timedelta
from taskkit import ScheduleEntry, RegularSchedule

# define entries
# key is a name for scheduler
# value is a list of instances of ScheduleEntry
schedule_entries = {
    'scheduler_name': [
        ScheduleEntry(
            # A key which can identify the schedule in the list
            key='...',
            # group name
            group=GROUP_NAME,
            # task name
            name='test2',
            # task data encoded by the same algorithm as `Handler.encode_data`
            data=b'...',

            # It means that the scheduler will initiate the task twice
            # an hour at **:00:00 and **:30:00.
            schedule=RegularSchedule(
                seconds={0},
                minutes={0, 30},
            ),
        ),
    ],

    # You can have multiple schedulers
    'another_scheduler': [
        # other entries ...
    ],
}

# pass the entries with kit.start
kit.start(
    num_processes=3,
    num_worker_threads_per_group={GROUP_NAME: 3},

    schedule_entries=schedule_entries,
    tzinfo=timezone(timedelta(hours=9), 'JST'))

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

taskkit-0.1.2.tar.gz (21.6 kB view details)

Uploaded Source

Built Distribution

taskkit-0.1.2-py3-none-any.whl (27.8 kB view details)

Uploaded Python 3

File details

Details for the file taskkit-0.1.2.tar.gz.

File metadata

  • Download URL: taskkit-0.1.2.tar.gz
  • Upload date:
  • Size: 21.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.9.15

File hashes

Hashes for taskkit-0.1.2.tar.gz
Algorithm Hash digest
SHA256 0a28046620c4c906d3e1dd65ee1400c8abfb25028a00b1eae6256927c13be8b9
MD5 6b95ffaa7d781b43b9ab3224fa3e3103
BLAKE2b-256 0ac97823b1313690712cdd07a0643e94ac5b768e269f0c153349b3a5932bccfb

See more details on using hashes here.

File details

Details for the file taskkit-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: taskkit-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 27.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.9.15

File hashes

Hashes for taskkit-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 3fe5db363a263d01a429c7116397afb8a180c5a5cf021eaf2c13e85f1fde654b
MD5 ed6c0061a803c2c460f0df5085d15d80
BLAKE2b-256 f05c36f7a50d1317ad12b01f67f14e376bac39a92e446c9e4e30a5d3b697b1a9

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page