Dynamic distributed preprocessing task loader and dispatcher
Project description
Kagurazaka
- Kagurazaka
Dynamic task dispatching and data processing framework.
To be brief, kagurazaka abstract any workload into the following two parts:
- Executor (Backbone): Execution model (e.g. process / thread pool, GPU, etc.)
- Task: The real workload to be executed
Given a task, kagurazaka will enable you to easily dispatch the workload, featuring:
- Argument parsing through CLI, YAML, or both, with the flexibility to override the default values and mixin the code if defined
- Sharding support, including hard sharding (data sharded) and soft sharding (index sharded)
- Cache management, avoiding redundant computation
- Automatic parallelization by specifying the backbone through single command line option
Kagurazaka also helps the task definition to be concise and reusable by providing a template compilation system.
Please refer to Basic Task Definition and Task Template Decorators for more details.
Installation
pip install kagurazaka
For development,
pip install -e .
Usage
General usage of kagurazaka is as follows:
$ kagurazaka -hk
usage: kagurazaka [--backbone {auto,mp,torch_vanilla_single}] [--mixin-cfg MIXIN_CFG] [--mixin MIXIN [MIXIN ...]] [-h] [-hb] [-hk] task_file_path task_name
Dynamically load a Python class from a module.
positional arguments:
task_file_path The file path of the task file
task_name The name of the task class to use
options:
--backbone {auto,mp,torch_vanilla_single}
The backbone to use (default: auto)
--mixin-cfg MIXIN_CFG
The file path of the mixin config file to use.
--mixin MIXIN [MIXIN ...]
The mixin to use. Has higher priority than --mixin-cfg. Provide in the format of "key=value".
-h, --help Show help message for task and exit
-hb, --help-backbone Show help message for chosen backbone and exit
-hk, --help-kagurazaka
Show Kagurazaka help message and exit
Different backbones might support different additional arguments.
mp Backbone
Using process pool as execution model to parallelize the task.
$ kagurazaka ... ... -hb
usage: kagurazaka [--num_process NUM_PROCESS] [--chunksize CHUNKSIZE] [-hb]
Kagurazaka MP Backbone.
options:
--num_process NUM_PROCESS
Number of processes to use, for normal task (default: number of CPU cores)
--chunksize CHUNKSIZE
Number of tasks to be sent to a worker process at a time, for normal task (default: 1)
-hb, --help-backbone Show help message for chosen backbone and exit
torch_vanilla_single Backbone
Using a single GPU and PyTorch dataloader as execution model to parallelize the task.
$ kagurazaka ... ... -hb
usage: kagurazaka [--device DEVICE] [-hb]
Kagurazaka Torch Vanilla Single GPU Backbone.
options:
--device DEVICE The device to use
-hb, --help-backbone Show help message for chosen backbone and exit
Basic Task Definition
Normal Task (i.e. CPU Task) - KagurazakaTask
Following methods should be implemented in the subclass:
def __init__(self, args) -> None- Initialize the task with command line arguments
argscan be passed toArgumentParser.parse_args()to parse remaining command line arguments
def generate_tasks(self) -> list- Return a list of workload inputs, each element is a dictionary of the workload input
def is_executed(self, **kwargs) -> bool- Check if the task has already been executed
**kwargsis the input of the task (contents released bygenerate_tasks)
def process(self, **kwargs) -> None- Process the task
**kwargsis the input of the task (contents released bygenerate_tasks)
Torch Task Vanilla V1 - KagurazakaVanillaTorchTaskV1
Following methods should be implemented in the subclass (for each section, choose one of the options):
Model / Dataloader
def load_model(self, device) -> None: Load the model on a devicedef get_dataset(self) -> Dataset: Return a Dataset object, each element should always be a dictdef get_dataloader(self) -> DataLoader: Return a DataLoader object
Bookkeeping
Option A - Easiest:
def is_each_executed(self, **kwargs) -> bool- Check if the task has already been executed for each element in the batch
**kwargsis the input of the task (contents released from the result of dict indataset)
Option B - If the above's expressiveness is not enough, i.e. check needs to done on the batch level, here is the second option:
def is_executed(self, batch) -> bool- Check if the task has already been executed
batchis the input of the task (the batch got from thedataloader)
Process
Option A - Easiest:
def inference(self, device, batch_size, **batch) -> torch.Tensor | dict | list- Inference the batch
**batchis the input of the task (the batch got from thedataloader)- return value can be a tensor, a dict, or a list
def postprocess_each(self, batch_idx, batch_size, **input_batch_and_result) -> None- Postprocess each element in the batch
**input_batch_and_resultconsists of two parts**batch: the input of the task (the batch got from thedataloader)result(ifresultis a tensor or a list) or**result(ifresultis a dict): the result of the inference
Option B - If postprocess_each is not enough
- Replace
postprocess_eachin option A withpostprocessdef postprocess(self, batch_size, results, **batch) -> None
Option C - If the whole stuff is not enough, i.e. you want to take over the whole process, here is the third option
def process(self, device, batch) -> None- Only implement this method
deviceis the device to usebatchis the input of the task (the batch got from thedataloader)
Task Template Decorators
Template decorators can help to simplify the task definition by providing a set of default methods and fields.
Normal Task Template @moemoecue_normal
Following compulsory methods can be omitted upon using the template decorator.
def generate_tasks(self) -> list- When
input_modeis notNone
- When
def is_executed(self, **kwargs) -> bool- When
finish_check_modeis notNoneandoutput_modeis notNone
- When
Further, following parameter injection will be done automatically
inputto**kwargsinprocess(...)andis_executed(...)- When
input_modeis notNone
- When
outputto**kwargsinprocess(...)andis_executed(...)- When
output_modeis notNone
- When
- Other fields might also be injected to
**kwargsbased on theinput_modeandoutput_mode
Torch Task Vanilla V1 Template @moemoecue_torch_vanilla_v1
Following compulsory methods can be omitted upon using the template decorator.
def get_dataset(self) -> Dataset- When
input_modeis notNoneanddef load_data(self, **kwargs) -> dictis implemented
- When
def get_dataloader(self) -> DataLoader- When
dataloader(dataloader parameters, exceptdataset) is notNone
- When
def is_each_executed(self, **kwargs) -> bool- When
finish_check_modeis notNoneandoutput_modeis notNone
- When
Further, following parameter injection will be done automatically
inputtobatch, and to whereeverbatchis releasedoutputtopostprocess_each- Other fields might also be injected to
batchbased on theinput_mode - Other fields might also be injected to
postprocess_eachbased on theoutput_mode
[!IMPORTANT] As mentioned, for
input_modeto take effect for this template, the torch task should also implementKagurazakaVanillaTorchMixinV1.
Configuration - Mixin, YAML, CLI and Template
In this section, we will briefly introduce the template configuration options for kagurazaka.
Notice that when using template decorators, some fields are compulsory but might differ from trial to trial. Thus here we provide a placeholder and mixin mechanism to make the task definition more flexible.
For any field (even inside a dict or class), one can define it as a placeholder by setting it to KAGURAZAKA_MIXIN_PLACEHOLDER. When the task is loaded, the placeholder will be replaced with the true value of the field through the mixin mechanism.
The mixin process is as follows:
- Highest priority: CLI arguments
--mixin, which is a list of "key=value" pairs - Second highest priority: YAML file specified by
--mixin-cfg - Least priority: original value of the field
Example: for the following template decorator:
@moemoecue_normal(
hard_shard_mode=NoHardShard(
input_path=KAGURAZAKA_MIXIN_PLACEHOLDER,
output_path=KAGURAZAKA_MIXIN_PLACEHOLDER
),
soft_shard=SoftShard(
shard_idx=0,
shard_num=1,
),
input_mode=InputByExts(exts=[".py"]),
output_mode=OutputByFilenameSubfoldering(file_to_write=KAGURAZAKA_MIXIN_PLACEHOLDER),
finish_check_mode=FinishCheckByOutputFileExistence(output_field_suffix=KAGURAZAKA_MIXIN_PLACEHOLDER)
)
class TestTask(KagurazakaTask):
pass
we can provide a YAML file like this:
output_mode:
file_to_write:
test: "test.txt"
finish_check_mode:
output_field_suffix: "test"
and specify the input / output through CLI arguments:
$ kagurazaka <TASK_FILE_PATH> <TASK_CLASS_NAME> --mixin-cfg <YAML_PATH> --mixin hard_shard_mode.input_path=<INPUT_PATH> --mixin hard_shard_mode.output_path=<OUTPUT_PATH>
Reserved Keywords
When defining the task and designing the field names, please avoid using the following keywords:
- Used in torch task processing
result
- Used by template decorators
inputoutputoutput_*
Reserved CLI Arguments
-h,--help- Show help message for task and exit
-hb,--help-backbone- Show help message for chosen backbone and exit
-hk,--help-kagurazaka- Show Kagurazaka help message and exit
--backbone- The backbone to use (default: auto)
--mixin-cfg- The file path of the mixin config file to use.
--mixin- The mixin to use. Has higher priority than
--mixin-cfg. Provide in the format of "key=value".
- The mixin to use. Has higher priority than
Roadmap
- Sharding tool
- File triplet jsonl exporter
- CPU affinity
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file kagurazaka-0.1.0.tar.gz.
File metadata
- Download URL: kagurazaka-0.1.0.tar.gz
- Upload date:
- Size: 14.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9bb22eeef17fa15276e2594d7627cb2cbfeb1ac9dc0c8133dd3aefd1cf3421b8
|
|
| MD5 |
5c2d03bc31930a62377c8ef08f05318b
|
|
| BLAKE2b-256 |
52014ba6fd2095b4678d6126f60382ef2338b4bf8692a9a60c65c02a365a0d1a
|
File details
Details for the file kagurazaka-0.1.0-py3-none-any.whl.
File metadata
- Download URL: kagurazaka-0.1.0-py3-none-any.whl
- Upload date:
- Size: 15.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
15af81c4c5626af6f6bb38d67fcc9ddd867c189e8472070dbc52cdd6c7ae4a0c
|
|
| MD5 |
4a180d41e7362d03f3066f9c9b456e39
|
|
| BLAKE2b-256 |
111f35ddf0909db6659f200963b08ba8ff90a1ef7460666064c2b4257bed60ba
|