a distributed task runner
Project description
taskkit
pypi: https://pypi.org/project/taskkit/
Overview
taskkit
is a distributed task runner.
How to use
1. Implement TaskHandler
This is the core part.
import json
from typing import Any
from taskkit import TaskHandler, Task, DiscardTask
class Handler(TaskHandler):
def handle(self, task: Task):
# Use `tagk.group` and `task.name` to determine how to handle the task
if task.group == '...':
if task.name == 'foo':
# decode the data which encoded by `self.encode_data` if needed
data = json.loads(task.data)
# do something with the `data`
...
# return result for the task
return ...
elif task.name == 'bar':
# do something
return ...
# you should raise DiscardTask if you want to discard the task
raise DiscardTask
def get_retry_interval(self,
task: Task,
exception: Exception) -> float | None:
# This method will be called if the handle method raises exceptions. You
# should return how long time should be wait to retry the task in seconds
# as float. If you don't want to retry the task, you can return None to
# make the task fail or raise DiscardTask to discard the task.
return task.retry_count if task.retry_count < 10 else None
def encode_data(self, group: str, task_name: str, data: Any) -> bytes:
# encode data of tasks for serializing it
return json.dumps(data).encode()
def encode_result(self, task: Task, result: Any) -> bytes:
# encode the result of the task
return json.dumps(result).encode()
def decode_result(self, task: Task, encoded: bytes) -> Any:
# decode the result of the task
return json.loads(encoded)
2. Make Kit
Use redis impl
You can use redis backend like this:
from redis.client import Redis
from taskkit.impl.redis import make_kit
REDIS_HOST = '...'
REDIS_PORT = '...'
redis = Redis(host=REDIS_HOST, port=REDIS_PORT)
kit = make_kit(redis, Handler())
Use django impl
- Add
'taskkit.contrib.django'
toINSTALLED_APPS
in the settings - Run
python manage.py migrate
- Make a
kit
instance like below:
from redis.client import Redis
from taskkit.impl.django import make_kit
kit = make_kit(Handler())
3. Run workers
GROUP_NAME = 'Any task group name'
# it starts busy loop
kit.start(
# number of processes
num_processes=3,
# number of worker threads per process
num_worker_threads_per_group={GROUP_NAME: 3})
# you can use `start_processes` to avoid busy loop
kit.start_processes(
num_processes=3,
num_worker_threads_per_group={GROUP_NAME: 3},
daemon=True)
4. Initiate task
from datetime import timedelta
from taskkit import ResultGetTimedOut
result = kit.initiate_task(
GROUP_NAME,
# task name
'your task name',
# task data which can be encoded by `Handler.encode_data`
dict(some_data=1),
# run the task after 10 or more seconds.
due=datetime.now() + timedelta(seconds=10))
try:
value = result.get(timeout=10)
except ResultGetTimedOut:
...
Scheduled Tasks
from datetime import timezone, timedelta
from taskkit import ScheduleEntry, ScheduleEntryDict, RegularSchedule, ScheduleEntriesCompatMapping
# define entries
# key is a name for scheduler
# value is a list of instances of ScheduleEntry
# or a list of dicts conforming to ScheduleEntryDict
#
# ScheduleEntryCompat: TypeAlias = ScheduleEntry | ScheduleEntryDict
# ScheduleEntriesCompat: TypeAlias = Sequence[ScheduleEntryCompat]
# ScheduleEntriesCompatMapping: TypeAlias = Mapping[str, ScheduleEntriesCompat]
#
schedule_entries: ScheduleEntriesCompatMapping = {
'scheduler_name': [
# You can use ScheduleEntry instance as follows. Note that the data
# MUST be encoded by the same algorithm as `Handler.encode_data`.
ScheduleEntry(
# A key which can identify the schedule in the list
key='...',
# group name
group=GROUP_NAME,
# task name
name='test2',
# The data MUST BE encoded by the same algorithm as
# `Handler.encode_data` so it would looks like:
data=Handler.encode_data(GROUP_NAME, 'test2', 'SOME DATA'),
# It means that the scheduler will initiate the task twice
# an hour at **:00:00 and **:30:00.
schedule=RegularSchedule(
seconds={0},
minutes={0, 30},
),
),
# You can use dict form of schedule entry (recommended).
# Note that in dict form, the data MUST NOT be encoded because `Kit`
# takes care of encoding for convenience. Other properties are same
# as ScheduleEntry. Also you can use ScheduleEntryDict for annotation.
{
'key': '...',
'group': GROUP_NAME,
'name': 'test3',
# IT MUST NOT BE ENCODED
'data': 2,
'schedule': RegularSchedule(seconds={0}, minutes={30}),
}
],
# You can have multiple schedulers
'another_scheduler': [
# other entries ...
],
}
# pass the entries with kit.start
kit.start(
num_processes=3,
num_worker_threads_per_group={GROUP_NAME: 3},
schedule_entries=schedule_entries,
tzinfo=timezone(timedelta(hours=9), 'JST'))
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
taskkit-0.1.6.tar.gz
(20.4 kB
view details)
Built Distribution
taskkit-0.1.6-py3-none-any.whl
(25.3 kB
view details)
File details
Details for the file taskkit-0.1.6.tar.gz
.
File metadata
- Download URL: taskkit-0.1.6.tar.gz
- Upload date:
- Size: 20.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.9.16
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 4c3880d313e1e4b86e4f37d889e1cfa9c8e2451f23d42da2c1cfce38c73f64b4 |
|
MD5 | 007aba49a3dec4ed4d17ec9b4694abe9 |
|
BLAKE2b-256 | 31d2a6387f135e0183f02783b3f6cbbe38d47c675348e42860cba3150bdd7f8d |
File details
Details for the file taskkit-0.1.6-py3-none-any.whl
.
File metadata
- Download URL: taskkit-0.1.6-py3-none-any.whl
- Upload date:
- Size: 25.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.9.16
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 2ec291fdf6b0206a1f5e4f6dc7bbce01304b4cb43ba83e787054d092faa5c193 |
|
MD5 | 968c2f87cc4b0378a9ba68e950956329 |
|
BLAKE2b-256 | 9d1f00fa3a49bb1dd60824970cee42d0c8508870bba979932f64fba55ad10969 |