a distributed task runner
Project description
taskkit
pypi: https://pypi.org/project/taskkit/
Overview
taskkit
is a distributed task runner.
How to use
1. Implement TaskHandler
This is the core part.
import json
from typing import Any
from taskkit import TaskHandler, Task, DiscardTask
class Handler(TaskHandler):
def handle(self, task: Task):
# Use `tagk.group` and `task.name` to determine how to handle the task
if task.group == '...':
if task.name == 'foo':
# decode the data which encoded by `self.encode_data` if needed
data = json.loads(task.data)
# do something with the `data`
...
# return result for the task
return ...
elif task.name == 'bar':
# do something
return ...
# you should raise DiscardTask if you want to discard the task
raise DiscardTask
def get_retry_interval(self,
task: Task,
exception: Exception) -> float | None:
# This method will be called if the handle method raises exceptions. You
# should return how long time should be wait to retry the task in seconds
# as float. If you don't want to retry the task, you can return None to
# make the task fail or raise DiscardTask to discard the task.
return task.retry_count if task.retry_count < 10 else None
def encode_data(self, group: str, task_name: str, data: Any) -> bytes:
# encode data of tasks for serializing it
return json.dumps(data).encode()
def encode_result(self, task: Task, result: Any) -> bytes:
# encode the result of the task
return json.dumps(result).encode()
def decode_result(self, task: Task, encoded: bytes) -> Any:
# decode the result of the task
return json.loads(encoded)
2. Make Kit
Use redis impl
You can use redis backend like this:
from redis.client import Redis
from taskkit.impl.redis import make_kit
REDIS_HOST = '...'
REDIS_PORT = '...'
redis = Redis(host=REDIS_HOST, port=REDIS_PORT)
kit = make_kit(redis, Handler())
Use django impl
- Add
'taskkit.contrib.django'
toINSTALLED_APPS
in the settings - Run
python manage.py migrate
- Make a
kit
instance like below:
from taskkit.impl.django import make_kit
kit = make_kit(Handler())
3. Run workers
GROUP_NAME = 'Any task group name'
# it starts busy loop
kit.start(
# number of processes
num_processes=3,
# number of worker threads per process
num_worker_threads_per_group={GROUP_NAME: 3})
# you can use `start_processes` to avoid busy loop
kit.start_processes(
num_processes=3,
num_worker_threads_per_group={GROUP_NAME: 3},
daemon=True)
4. Initiate task
from datetime import timedelta
from taskkit import ResultGetTimedOut
result = kit.initiate_task(
GROUP_NAME,
# task name
'your task name',
# task data which can be encoded by `Handler.encode_data`
dict(some_data=1),
# run the task after 10 or more seconds.
due=datetime.now() + timedelta(seconds=10))
try:
value = result.get(timeout=10)
except ResultGetTimedOut:
...
Scheduled Tasks
from datetime import timezone, timedelta
from taskkit import ScheduleEntry, ScheduleEntryDict, RegularSchedule, ScheduleEntriesCompatMapping
# define entries
# key is a name for scheduler
# value is a list of instances of ScheduleEntry
# or a list of dicts conforming to ScheduleEntryDict
#
# ScheduleEntryCompat: TypeAlias = ScheduleEntry | ScheduleEntryDict
# ScheduleEntriesCompat: TypeAlias = Sequence[ScheduleEntryCompat]
# ScheduleEntriesCompatMapping: TypeAlias = Mapping[str, ScheduleEntriesCompat]
#
schedule_entries: ScheduleEntriesCompatMapping = {
'scheduler_name': [
# You can use ScheduleEntry instance as follows. Note that the data
# MUST be encoded by the same algorithm as `Handler.encode_data`.
ScheduleEntry(
# A key which can identify the schedule in the list
key='...',
# group name
group=GROUP_NAME,
# task name
name='test2',
# The data MUST BE encoded by the same algorithm as
# `Handler.encode_data` so it would looks like:
data=Handler.encode_data(GROUP_NAME, 'test2', 'SOME DATA'),
# It means that the scheduler will initiate the task twice
# an hour at **:00:00 and **:30:00.
schedule=RegularSchedule(
seconds={0},
minutes={0, 30},
),
),
# You can use dict form of schedule entry (recommended).
# Note that in dict form, the data MUST NOT be encoded because `Kit`
# takes care of encoding for convenience. Other properties are same
# as ScheduleEntry. Also you can use ScheduleEntryDict for annotation.
{
'key': '...',
'group': GROUP_NAME,
'name': 'test3',
# IT MUST NOT BE ENCODED
'data': 2,
'schedule': RegularSchedule(seconds={0}, minutes={30}),
}
],
# You can have multiple schedulers
'another_scheduler': [
# other entries ...
],
}
# pass the entries with kit.start
kit.start(
num_processes=3,
num_worker_threads_per_group={GROUP_NAME: 3},
schedule_entries=schedule_entries,
tzinfo=timezone(timedelta(hours=9), 'JST'))
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
taskkit-0.1.8.tar.gz
(20.5 kB
view details)
Built Distribution
taskkit-0.1.8-py3-none-any.whl
(25.3 kB
view details)
File details
Details for the file taskkit-0.1.8.tar.gz
.
File metadata
- Download URL: taskkit-0.1.8.tar.gz
- Upload date:
- Size: 20.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.9.16
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 3c6b79405d929b6091666d283041d1420dce2e6ec24195772732c88ff7981cdd |
|
MD5 | 3bc74890572955237ef0e621ca4598c6 |
|
BLAKE2b-256 | 21a07b8168e1022874279b64a3c145f59ddae17af7cdb8e3e1fb56d69dba6dab |
File details
Details for the file taskkit-0.1.8-py3-none-any.whl
.
File metadata
- Download URL: taskkit-0.1.8-py3-none-any.whl
- Upload date:
- Size: 25.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.9.16
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 20177e36c9fe2cab6251abcb26d1bb6e50a0d2f8b6aeafda3cb14f34f0243b6e |
|
MD5 | eca99df36e880135e47351045763f6e0 |
|
BLAKE2b-256 | 6245dfb34ea7fc2dd01f4c9119203e427ff867254e6fa38670b1e7600cbd7a45 |