a distributed task runner
Project description
taskkit
pypi: https://pypi.org/project/taskkit/
Overview
taskkit is a distributed task runner.
How to use
1. Implement TaskHandler
This is the core part.
import json
from typing import Any
from taskkit import TaskHandler, Task, DiscardTask
class Handler(TaskHandler):
def handle(self, task: Task):
# Use `tagk.group` and `task.name` to determine how to handle the task
if task.group == '...':
if task.name == 'foo':
# decode the data which encoded by `self.encode_data` if needed
data = json.loads(task.data)
# do something with the `data`
...
# return result for the task
return ...
elif task.name == 'bar':
# do something
return ...
# you should raise DiscardTask if you want to discard the task
raise DiscardTask
def get_retry_interval(self,
task: Task,
exception: Exception) -> float | None:
# This method will be called if the handle method raises exceptions. You
# should return how long time should be wait to retry the task in seconds
# as float. If you don't want to retry the task, you can return None to
# make the task fail or raise DiscardTask to discard the task.
return task.retry_count if task.retry_count < 10 else None
def encode_data(self, group: str, task_name: str, data: Any) -> bytes:
# encode data of tasks for serializing it
return json.dumps(data).encode()
def encode_result(self, task: Task, result: Any) -> bytes:
# encode the result of the task
return json.dumps(result).encode()
def decode_result(self, task: Task, encoded: bytes) -> Any:
# decode the result of the task
return json.loads(encoded)
2. Make Kit
Use redis impl
You can use redis backend like this:
from redis.client import Redis
from taskkit.impl.redis import make_kit
REDIS_HOST = '...'
REDIS_PORT = '...'
redis = Redis(host=REDIS_HOST, port=REDIS_PORT)
kit = make_kit(redis, Handler())
Use django impl
- Add
'taskkit.contrib.django'toINSTALLED_APPSin the settings - Run
python manage.py migrate - Make a
kitinstance like below:
from taskkit.impl.django import make_kit
kit = make_kit(Handler())
3. Run workers
GROUP_NAME = 'Any task group name'
# it starts busy loop
kit.start(
# number of processes
num_processes=3,
# number of worker threads per process
num_worker_threads_per_group={GROUP_NAME: 3})
# you can use `start_processes` to avoid busy loop
kit.start_processes(
num_processes=3,
num_worker_threads_per_group={GROUP_NAME: 3},
daemon=True)
4. Initiate task
from datetime import timedelta
from taskkit import ResultGetTimedOut
result = kit.initiate_task(
GROUP_NAME,
# task name
'your task name',
# task data which can be encoded by `Handler.encode_data`
dict(some_data=1),
# run the task after 10 or more seconds.
due=datetime.now() + timedelta(seconds=10))
try:
value = result.get(timeout=10)
except ResultGetTimedOut:
...
Scheduled Tasks
from datetime import timezone, timedelta
from taskkit import ScheduleEntry, ScheduleEntryDict, RegularSchedule, ScheduleEntriesCompatMapping
# define entries
# key is a name for scheduler
# value is a list of instances of ScheduleEntry
# or a list of dicts conforming to ScheduleEntryDict
#
# ScheduleEntryCompat: TypeAlias = ScheduleEntry | ScheduleEntryDict
# ScheduleEntriesCompat: TypeAlias = Sequence[ScheduleEntryCompat]
# ScheduleEntriesCompatMapping: TypeAlias = Mapping[str, ScheduleEntriesCompat]
#
schedule_entries: ScheduleEntriesCompatMapping = {
'scheduler_name': [
# You can use ScheduleEntry instance as follows. Note that the data
# MUST be encoded by the same algorithm as `Handler.encode_data`.
ScheduleEntry(
# A key which can identify the schedule in the list
key='...',
# group name
group=GROUP_NAME,
# task name
name='test2',
# The data MUST BE encoded by the same algorithm as
# `Handler.encode_data` so it would looks like:
data=Handler.encode_data(GROUP_NAME, 'test2', 'SOME DATA'),
# It means that the scheduler will initiate the task twice
# an hour at **:00:00 and **:30:00.
schedule=RegularSchedule(
seconds={0},
minutes={0, 30},
),
),
# You can use dict form of schedule entry (recommended).
# Note that in dict form, the data MUST NOT be encoded because `Kit`
# takes care of encoding for convenience. Other properties are same
# as ScheduleEntry. Also you can use ScheduleEntryDict for annotation.
{
'key': '...',
'group': GROUP_NAME,
'name': 'test3',
# IT MUST NOT BE ENCODED
'data': 2,
'schedule': RegularSchedule(seconds={0}, minutes={30}),
}
],
# You can have multiple schedulers
'another_scheduler': [
# other entries ...
],
}
# pass the entries with kit.start
kit.start(
num_processes=3,
num_worker_threads_per_group={GROUP_NAME: 3},
schedule_entries=schedule_entries,
tzinfo=timezone(timedelta(hours=9), 'JST'))
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file taskkit-0.2.2.tar.gz.
File metadata
- Download URL: taskkit-0.2.2.tar.gz
- Upload date:
- Size: 24.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/5.1.1 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
76dca097c986ad9eaf06cc27746fba7c8ce667589d7c0f46b2900f238a061cd1
|
|
| MD5 |
57f31fa10f9351097fe2257f5b102f0d
|
|
| BLAKE2b-256 |
9caf13a85f9e9e8833db4fa3dc4ce48ee607278b18626f0245b084dce2b148e2
|
Provenance
The following attestation bundles were made for taskkit-0.2.2.tar.gz:
Publisher:
publish.yml on saryou/taskkit
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
taskkit-0.2.2.tar.gz -
Subject digest:
76dca097c986ad9eaf06cc27746fba7c8ce667589d7c0f46b2900f238a061cd1 - Sigstore transparency entry: 153918756
- Sigstore integration time:
-
Permalink:
saryou/taskkit@da80e5d1a79e017deca91448ac6a8c9ab16240e1 -
Branch / Tag:
refs/tags/0.2.2 - Owner: https://github.com/saryou
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@da80e5d1a79e017deca91448ac6a8c9ab16240e1 -
Trigger Event:
release
-
Statement type:
File details
Details for the file taskkit-0.2.2-py3-none-any.whl.
File metadata
- Download URL: taskkit-0.2.2-py3-none-any.whl
- Upload date:
- Size: 28.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/5.1.1 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
dfe5f260c96227b8819fb264e0596bc655fb5f4e3aa899d685074920c576a461
|
|
| MD5 |
cbee4ac7545ffdb636f0d34429186d3b
|
|
| BLAKE2b-256 |
59b254aa0bfef0a27301c5b25f21b394018a3403e94ddedc6bc96f6e6a4afb7f
|
Provenance
The following attestation bundles were made for taskkit-0.2.2-py3-none-any.whl:
Publisher:
publish.yml on saryou/taskkit
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
taskkit-0.2.2-py3-none-any.whl -
Subject digest:
dfe5f260c96227b8819fb264e0596bc655fb5f4e3aa899d685074920c576a461 - Sigstore transparency entry: 153918757
- Sigstore integration time:
-
Permalink:
saryou/taskkit@da80e5d1a79e017deca91448ac6a8c9ab16240e1 -
Branch / Tag:
refs/tags/0.2.2 - Owner: https://github.com/saryou
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@da80e5d1a79e017deca91448ac6a8c9ab16240e1 -
Trigger Event:
release
-
Statement type: