a distributed task runner
Project description
taskkit
pypi: https://pypi.org/project/taskkit/
Overview
taskkit
is a distributed task runner.
How to use
1. Implement TaskHandler
This is the core part.
import json
from typing import Any
from taskkit import TaskHandler, Task, DiscardTask
class Handler(TaskHandler):
def handle(self, task: Task):
# Use `tagk.group` and `task.name` to determine how to handle the task
if task.group == '...':
if task.name == 'foo':
# decode the data which encoded by `self.encode_data` if needed
data = json.loads(task.data)
# do something with the `data`
...
# return result for the task
return ...
elif task.name == 'bar':
# do something
return ...
# you should raise DiscardTask if you want to discard the task
raise DiscardTask
def get_retry_interval(self,
task: Task,
exception: Exception) -> float | None:
# This method will be called if the handle method raises exceptions. You
# should return how long time should be wait to retry the task in seconds
# as float. If you don't want to retry the task, you can return None to
# make the task fail or raise DiscardTask to discard the task.
return task.retry_count if task.retry_count < 10 else None
def encode_data(self, group: str, task_name: str, data: Any) -> bytes:
# encode data of tasks for serializing it
return json.dumps(data).encode()
def encode_result(self, task: Task, result: Any) -> bytes:
# encode the result of the task
return json.dumps(result).encode()
def decode_result(self, task: Task, encoded: bytes) -> Any:
# decode the result of the task
return json.loads(encoded)
2. Make Kit
Use redis impl
You can use redis backend like this:
from redis.client import Redis
from taskkit.impl.redis import make_kit
REDIS_HOST = '...'
REDIS_PORT = '...'
redis = Redis(host=REDIS_HOST, port=REDIS_PORT)
kit = make_kit(redis, Handler())
Use django impl
- Add
'taskkit.contrib.django'
toINSTALLED_APPS
in the settings - Run
python manage.py migrate
- Make a
kit
instance like below:
from redis.client import Redis
from taskkit.impl.django import make_kit
kit = make_kit(Handler())
3. Run workers
GROUP_NAME = 'Any task group name'
# it starts busy loop
kit.start(
# number of processes
num_processes=3,
# number of worker threads per process
num_worker_threads_per_group={GROUP_NAME: 3})
# you can use `start_processes` to avoid busy loop
kit.start_processes(
num_processes=3,
num_worker_threads_per_group={GROUP_NAME: 3},
daemon=True)
4. Initiate task
from datetime import timedelta
from taskkit import ResultGetTimedOut
result = kit.initiate_task(
GROUP_NAME,
# task name
'your task name',
# task data which can be encoded by `Handler.encode_data`
dict(some_data=1),
# run the task after 10 or more seconds.
due=datetime.now() + timedelta(seconds=10))
try:
value = result.get(timeout=10)
except ResultGetTimedOut:
...
Scheduled Tasks
from datetime import timezone, timedelta
from taskkit import ScheduleEntry, RegularSchedule
# define entries
# key is a name for scheduler
# value is a list of instances of ScheduleEntry
schedule_entries = {
'scheduler_name': [
ScheduleEntry(
# A key which can identify the schedule in the list
key='...',
# group name
group=GROUP_NAME,
# task name
name='test2',
# task data encoded by the same algorithm as `Handler.encode_data`
data=b'...',
# It means that the scheduler will initiate the task twice
# an hour at **:00:00 and **:30:00.
schedule=RegularSchedule(
seconds={0},
minutes={0, 30},
),
),
],
# You can have multiple schedulers
'another_scheduler': [
# other entries ...
],
}
# pass the entries with kit.start
kit.start(
num_processes=3,
num_worker_threads_per_group={GROUP_NAME: 3},
schedule_entries=schedule_entries,
tzinfo=timezone(timedelta(hours=9), 'JST'))
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
taskkit-0.1.2.tar.gz
(21.6 kB
view details)
Built Distribution
taskkit-0.1.2-py3-none-any.whl
(27.8 kB
view details)
File details
Details for the file taskkit-0.1.2.tar.gz
.
File metadata
- Download URL: taskkit-0.1.2.tar.gz
- Upload date:
- Size: 21.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.1 CPython/3.9.15
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 0a28046620c4c906d3e1dd65ee1400c8abfb25028a00b1eae6256927c13be8b9 |
|
MD5 | 6b95ffaa7d781b43b9ab3224fa3e3103 |
|
BLAKE2b-256 | 0ac97823b1313690712cdd07a0643e94ac5b768e269f0c153349b3a5932bccfb |
File details
Details for the file taskkit-0.1.2-py3-none-any.whl
.
File metadata
- Download URL: taskkit-0.1.2-py3-none-any.whl
- Upload date:
- Size: 27.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.1 CPython/3.9.15
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 3fe5db363a263d01a429c7116397afb8a180c5a5cf021eaf2c13e85f1fde654b |
|
MD5 | ed6c0061a803c2c460f0df5085d15d80 |
|
BLAKE2b-256 | f05c36f7a50d1317ad12b01f67f14e376bac39a92e446c9e4e30a5d3b697b1a9 |