Skip to main content

Handling Complex Workflows with Tasks

Project description

tasksflow

Handling complex workflows through tasks

English | 简体中文

When dealing with complex workflows, it's often necessary to break them down into multiple tasks and then combine these tasks together. This library provides a simple way to define and execute these tasks.

Quick Start

Install tasksflow

pip install tasksflow

Create some simple tasks

import tasksflow.pool
import tasksflow.task
import tasksflow.cache
import tasksflow.executer
from pathlib import Path

class Task1(tasksflow.task.Task):
    def run(self):
        return {"a": 1, "b": 2}


class Task2(tasksflow.task.Task):
    def run(self, a: int, b: int):
        return {"c": a + b}

tasks = [Task1(), Task2()]
p = tasksflow.pool.Pool(tasks)
result = p.run() # run tasks in pool
print(result) # {"a": 1, "b": 2, "c": 3}

Usage

Task

Each task has multiple input parameters and multiple output parameters. Taking Task2 as an example:

class Task2(tasksflow.task.Task):
    def run(self, a: int, b: int):
        return {"c": a + b}

Task2 takes input parameters a and b, and outputs parameter c.

Each task needs to inherit from tasksflow.task.Task and override the run method. The parameters of the run method are the inputs required for this task, and the return value should be a dict of parameter->value, ensuring that each task returns non-repeating parameters.

There's no need to explicitly define the dependencies between tasks. Once the parameters returned by the preceding tasks are available, the subsequent tasks that depend on these parameters will be automatically invoked with these values.

Pool

tasksflow.pool.Pool is a pool of tasks used for running a series of tasks. The common usage is:

tasks = [Task1(), Task2()]
p = tasksflow.pool.Pool(tasks)
result = p.run() # run tasks in pool

Initialize the task pool using tasksflow.pool.Pool(tasks) and execute the task list with result = p.run(), obtaining the results as a dict of all tasks' parameter->value.

Advanced

Cache

For most tasks, as long as the input parameters and the task code remain the same, the final output result will also be the same. By default, tasksflow caches the task code and inputs/outputs. When running the task again and hitting the cache, it skips the execution process and directly uses the output. This caching feature can effectively improve development efficiency, as developers don't need to rerun previous tasks when developing subsequent tasks.

Disabling Task Cache

Some tasks rely on external factors such as the network or time, and even with the same input parameters and task code, they may produce different outputs. In such cases, it may be necessary to disable caching and force task execution. This can be achieved by declaring cache disabling during task initialization.

tasks = [Task1(), Task2(enable_cache=False)]

Here, Task2 is passed enable_cache=False, indicating that caching should be disabled for this task.

Cache Implementation

By default, pool will create a SQLite database at cache.db and cache task codes and inputs. If you want to customize the storage path:

from pathlib import Path
p = tasksflow.pool.Pool(tasks, cache_provider=tasksflow.cache.SqliteCacheProvider(Path("mycache.db")))

You can also use MemoryCacheProvider instead of SqliteCacheProvider, which stores the cache in memory, commonly used for testing.

p = tasksflow.pool.Pool(tasks, cache_provider=tasksflow.cache.MemoryCacheProvider())

Or you can customize CacheProvider by inheriting tasksflow.cache.CacheProvider and implementing the get and set methods. Then pass your custom CacheProvider to the Pool.

Executer

By default, pool uses tasksflow.executer.MultiprocessExecuter, which creates a separate process for each task. Once a task is completed, it automatically invokes the dependent tasks based on the output of this task.

You can also use tasksflow.executer.SerialExecuter, which executes tasks sequentially according to the order in tasks.

p = tasksflow.pool.Pool(tasks, executer=tasksflow.executer.SerialExecuter())

Or you can create a custom executer.

from typing import Any
class MyExecuter(Executer):
    def run(tasks: list[tasksflow.task.Task]) -> dict[str, Any]:
        pass
p = tasksflow.pool.Pool(tasks, executer=MyExecuter())

Logging

tasksflow uses the loguru module for logging. You can control whether tasksflow's logs are printed using the following code. By default, tasksflow's logs are disabled.

# install loguru by `pip install loguru`

from loguru import logger
logger.enable("tasksflow")

More Realistic Example

Scrape the titles of the website https://webscraper.io/test-sites. Use tasksflow to decompose the scraping task into 2 subtasks: web request and web parsing. This way, when developing the web parsing subtask, caching is utilized to avoid requesting the webpage again.

Install dependencies

pip install tasksflow requests lxml

Write the code

import tasksflow.pool
import tasksflow.task
from lxml import etree
import requests


class TaskRequest(tasksflow.task.Task):
    def run(self):
        headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
        }
        url = "https://webscraper.io/test-sites"
        resp = requests.get(url, headers=headers).text
        return {"resp": resp}


class TaskParse(tasksflow.task.Task):
    def run(self, resp: str):
        html = etree.HTML(resp, etree.HTMLParser())
        title_elements = html.xpath("/html/body/div[1]/div[3]/div[*]/div[1]/h2/a")
        titles = [title.text.strip() for title in title_elements]
        return {"titles": titles}


def main():
    tasks = [TaskRequest(), TaskParse()]
    p = tasksflow.pool.Pool(tasks)
    result = p.run()
    print(f"titles: {result['titles']}")
    # titles: ['E-commerce site', 'E-commerce site with pagination links', 'E-commerce site with AJAX pagination links', 'E-commerce site with "Load more" buttons', 'E-commerce site that loads items while scrolling', 'Table playground']


if __name__ == "__main__":
    main()

Development

See dev_zh_CN.md

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

tasksflow-0.3.1.tar.gz (26.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

tasksflow-0.3.1-py3-none-any.whl (20.7 kB view details)

Uploaded Python 3

File details

Details for the file tasksflow-0.3.1.tar.gz.

File metadata

  • Download URL: tasksflow-0.3.1.tar.gz
  • Upload date:
  • Size: 26.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/5.0.0 CPython/3.12.3

File hashes

Hashes for tasksflow-0.3.1.tar.gz
Algorithm Hash digest
SHA256 e15b16feeb69d29d0393abe47f13028d76575d81ad0c38462560386fabec4878
MD5 4421a415980944496d36ebcb4f9b90a2
BLAKE2b-256 ce1d1f2a9abfd80467ead168dd64868fd49d0887e2ff5a46a9bdf9d69a2172c2

See more details on using hashes here.

File details

Details for the file tasksflow-0.3.1-py3-none-any.whl.

File metadata

  • Download URL: tasksflow-0.3.1-py3-none-any.whl
  • Upload date:
  • Size: 20.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/5.0.0 CPython/3.12.3

File hashes

Hashes for tasksflow-0.3.1-py3-none-any.whl
Algorithm Hash digest
SHA256 950d7c55807722bec5b4221d6ed138e23c29a8e4279e1bbdd4d651f81b9ab0df
MD5 144e64ee314ab68909e6f228bdc288d9
BLAKE2b-256 28e847e95ffc347b21b2dd0fec59f25d74dd0818d30745208c9aef19908f3f5e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page