Handling Complex Workflows with Tasks
Project description
tasksflow
Handling Complex Workflows with Tasks
When dealing with complex workflows, it's often necessary to decompose them into multiple tasks and then combine these tasks together. This library provides a simple way to define and execute these tasks.
Quick Start
Install tasksflow
pip install tasksflow
Create some simple tasks
import tasksflow.pool
import tasksflow.task
import tasksflow.cache
import tasksflow.executer
from pathlib import Path
class Task1(tasksflow.task.Task):
def run(self):
return {"a": 1, "b": 2}
class Task2(tasksflow.task.Task):
def run(self, a: int, b: int):
return {"c": a + b}
tasks = [Task1(), Task2()]
p = tasksflow.pool.Pool(tasks)
result = p.run() # run tasks in pool
print(result) # {"a": 1, "b": 2, "c": 3}
Usage
Task
Each task has multiple input parameters and multiple output parameters. For example, take Task2:
class Task2(tasksflow.task.Task):
def run(self, a: int, b: int):
return {"c": a + b}
Task2 takes input parameters a
and b
, and outputs parameter c
.
Each task needs to inherit from tasksflow.task.Task
and override the run
method. The parameters of the run
method are the inputs required for this task, and the return value should be a dict of parameter -> value or None. Make sure that each task returns non-repetitive parameters.
There's no need to explicitly define the dependency relationship between tasks. Once the preceding task returns the values corresponding to the parameters, the subsequent tasks that depend on these parameters will be automatically invoked with these values.
Pool
tasksflow.pool.Pool
is a pool of tasks used to run a series of tasks. Common usage:
tasks = [Task1(), Task2()]
p = tasksflow.pool.Pool(tasks)
result = p.run() # run tasks in pool
Initialize the pool using tasksflow.pool.Pool(tasks)
, then execute the list of tasks with result = p.run()
and retrieve the results. The result is a dict containing the parameter -> value mapping for all tasks.
Advanced
Cache
For most tasks, if the input parameters and the task code are the same, the output result will also be the same. By default, tasksflow
caches the code and inputs/outputs of tasks. When running tasks again and hitting the cache, the execution process is skipped, and the output is directly used. The caching feature can effectively improve development efficiency, as developers don't need to rerun preceding tasks when developing subsequent tasks.
Disabling Task Cache
Some tasks may depend on external factors such as the network or time, and even if the input parameters and task code are the same, the results may differ. In such cases, you may need to disable caching to force task execution. This can be done by declaring cache disable during task initialization.
tasks = [Task1(), Task2(enable_cache=False)]
Where Task2 is passed enable_cache=False
, disabling automatic caching.
Cache Implementation
By default, the pool
will create a SQLite database at cache.db
to cache the task code and inputs. If you want to customize the storage path:
from pathlib import Path
p = tasksflow.pool.Pool(tasks, cache_provider=tasksflow.cache.SqliteCacheProvider(Path("mycache.db")))
You can also use MemoryCacheProvider
instead of SqliteCacheProvider
to store the cache in memory, commonly used for testing:
p = tasksflow.pool.Pool(tasks, cache_provider=tasksflow.cache.MemoryCacheProvider())
Or you can customize the CacheProvider
by inheriting from tasksflow.cache.CacheProvider
and implementing the get
and set
methods. Then pass your custom CacheProvider
to the Pool
.
Executer
By default, the pool
uses tasksflow.executer.multiprocess_run
, which creates a separate process for each task. After a task is completed, it automatically invokes the subsequent tasks that depend on it based on its output.
You can also use tasksflow.executer.serial_run
, which executes tasks in the order specified.
p = tasksflow.pool.Pool(tasks, executer=tasksflow.executer.serial_run)
You can also define your own executor:
from typing import Any
def my_executer(tasks: list[tasksflow.task.Task]) -> dict[str, Any]:
pass
p = tasksflow.pool.Pool(tasks, executer=my_executer)
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for tasksflow-0.2.3-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 57602c680d714dd64f6b3fdad68a811b287df714cb69797cf259a791fa6775d3 |
|
MD5 | ca416e0736604dec7493fb9fca70b65d |
|
BLAKE2b-256 | f08351df0d686a9e4f07da9dfe5fbafc436f087a42ec7205ff3bc3a38ba90331 |