Skip to main content

a distributed task runner

Project description

taskkit

pypi: https://pypi.org/project/taskkit/

Overview

taskkit is a distributed task runner.

How to use

1. Implement TaskHandler

This is the core part.

import json
from typing import Any
from taskkit import TaskHandler, Task, DiscardTask


class Handler(TaskHandler):
    def handle(self, task: Task):
        # Use `tagk.group` and `task.name` to determine how to handle the task
        if task.group == '...':
            if task.name == 'foo':
                # decode the data which encoded by `self.encode_data` if needed
                data = json.loads(task.data)
                # do something with the `data`
                ...
                # return result for the task
                return ...

            elif task.name == 'bar':
                # do something
                return ...

        # you should raise DiscardTask if you want to discard the task
        raise DiscardTask

    def get_retry_interval(self,
                           task: Task,
                           exception: Exception) -> float | None:
        # This method will be called if the handle method raises exceptions. You
        # should return how long time should be wait to retry the task in seconds
        # as float. If you don't want to retry the task, you can return None to
        # make the task fail or raise DiscardTask to discard the task.
        return task.retry_count if task.retry_count < 10 else None

    def encode_data(self, group: str, task_name: str, data: Any) -> bytes:
        # encode data of tasks for serializing it
        return json.dumps(data).encode()

    def encode_result(self, task: Task, result: Any) -> bytes:
        # encode the result of the task
        return json.dumps(result).encode()

    def decode_result(self, task: Task, encoded: bytes) -> Any:
        # decode the result of the task
        return json.loads(encoded)

2. Make Kit

Use redis impl

You can use redis backend like this:

from redis.client import Redis
from taskkit.impl.redis import make_kit

REDIS_HOST = '...'
REDIS_PORT = '...'

redis = Redis(host=REDIS_HOST, port=REDIS_PORT)
kit = make_kit(redis, Handler())

Use django impl

  1. Add 'taskkit.contrib.django' to INSTALLED_APPS in the settings
  2. Run python manage.py migrate
  3. Make a kit instance like below:
from taskkit.impl.django import make_kit

kit = make_kit(Handler())

3. Run workers

GROUP_NAME = 'Any task group name'

# it starts busy loop
kit.start(
    # number of processes
    num_processes=3,
    # number of worker threads per process
    num_worker_threads_per_group={GROUP_NAME: 3})

# you can use `start_processes` to avoid busy loop
kit.start_processes(
    num_processes=3,
    num_worker_threads_per_group={GROUP_NAME: 3},
    daemon=True)

4. Initiate task

from datetime import timedelta
from taskkit import ResultGetTimedOut


result = kit.initiate_task(
    GROUP_NAME,
    # task name
    'your task name',
    # task data which can be encoded by `Handler.encode_data`
    dict(some_data=1),
    # run the task after 10 or more seconds.
    due=datetime.now() + timedelta(seconds=10))

try:
    value = result.get(timeout=10)
except ResultGetTimedOut:
    ...

Scheduled Tasks

from datetime import timezone, timedelta
from taskkit import ScheduleEntry, ScheduleEntryDict, RegularSchedule, ScheduleEntriesCompatMapping

# define entries
# key is a name for scheduler
# value is a list of instances of ScheduleEntry
#       or a list of dicts conforming to ScheduleEntryDict
# 
# ScheduleEntryCompat: TypeAlias = ScheduleEntry | ScheduleEntryDict
# ScheduleEntriesCompat: TypeAlias = Sequence[ScheduleEntryCompat]
# ScheduleEntriesCompatMapping: TypeAlias = Mapping[str, ScheduleEntriesCompat]
#
schedule_entries: ScheduleEntriesCompatMapping = {
    'scheduler_name': [
        # You can use ScheduleEntry instance as follows. Note that the data
        # MUST be encoded by the same algorithm as `Handler.encode_data`.
        ScheduleEntry(
            # A key which can identify the schedule in the list
            key='...',
            # group name
            group=GROUP_NAME,
            # task name
            name='test2',

            # The data MUST BE encoded by the same algorithm as
            # `Handler.encode_data` so it would looks like:
            data=Handler.encode_data(GROUP_NAME, 'test2', 'SOME DATA'),

            # It means that the scheduler will initiate the task twice
            # an hour at **:00:00 and **:30:00.
            schedule=RegularSchedule(
                seconds={0},
                minutes={0, 30},
            ),
        ),

        # You can use dict form of schedule entry (recommended).
        # Note that in dict form, the data MUST NOT be encoded because `Kit`
        # takes care of encoding for convenience. Other properties are same
        # as ScheduleEntry. Also you can use ScheduleEntryDict for annotation.
        {
            'key': '...',
            'group': GROUP_NAME,
            'name': 'test3',

            # IT MUST NOT BE ENCODED
            'data': 2,

            'schedule': RegularSchedule(seconds={0}, minutes={30}),
        }
    ],

    # You can have multiple schedulers
    'another_scheduler': [
        # other entries ...
    ],
}

# pass the entries with kit.start
kit.start(
    num_processes=3,
    num_worker_threads_per_group={GROUP_NAME: 3},

    schedule_entries=schedule_entries,
    tzinfo=timezone(timedelta(hours=9), 'JST'))

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

taskkit-0.1.15.tar.gz (21.2 kB view details)

Uploaded Source

Built Distribution

taskkit-0.1.15-py3-none-any.whl (26.5 kB view details)

Uploaded Python 3

File details

Details for the file taskkit-0.1.15.tar.gz.

File metadata

  • Download URL: taskkit-0.1.15.tar.gz
  • Upload date:
  • Size: 21.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.9.16

File hashes

Hashes for taskkit-0.1.15.tar.gz
Algorithm Hash digest
SHA256 e9e44a63db7c0e1019424060affe4e4d4333acc594aea60ae904dc2405ecbbd7
MD5 2bc39172d687a05901919bdaba8c36cd
BLAKE2b-256 02da8d68a799eef4f1a30208e8de2fd3f7e19560a7e0b9b6eab867e792df4082

See more details on using hashes here.

File details

Details for the file taskkit-0.1.15-py3-none-any.whl.

File metadata

  • Download URL: taskkit-0.1.15-py3-none-any.whl
  • Upload date:
  • Size: 26.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.9.16

File hashes

Hashes for taskkit-0.1.15-py3-none-any.whl
Algorithm Hash digest
SHA256 91050fa9d774e5e8162fabdfda50127910c13f727e08f5e666ab6535341a672c
MD5 059abd8a74de4c083693aac93d1be2db
BLAKE2b-256 dbe5473d101811f1d9a333ece049ee2899e0c7c563aabc1bf993ac5545e6f550

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page