Skip to main content

An execution framework for i/o heavy task with memory persistence and concurrency

Project description

Kofu

PyPI version Python Versions

Kofu (Japanese for "Miner") is a robust task execution framework with persistence, designed for I/O-heavy workloads like web scraping and LLM synthetic data generation on a single machine. It focuses on local concurrent execution, not distributed cluster computing.

Features

  • Persistent Execution: Survive restarts/crashes with SQLite-backed state
  • Concurrent Processing: Thread-based parallelism with configurable workers
  • Atomic Operations: Batch updates with transaction safety
  • Automatic Retries: Configurable retry logic with exponential backoff
  • Progress Tracking: Built-in tqdm integration for execution monitoring

Installation

uv add kofu

Quick Start

from kofu import LocalThreadedExecutor, Task
from kofu.store import SingleSQLiteTaskStore
from kofu.tasks import SimpleFn

def fetch_url(url: str) -> dict:
    import requests
    response = requests.get(url)
    return {"status": response.status_code, "content": response.text[:100]}

tasks = [
    SimpleFn("example", fetch_url, args=("https://example.com",)),
    SimpleFn("python", fetch_url, args=("https://python.org",))
]

store = SingleSQLiteTaskStore(directory="./tasks_db")
executor = LocalThreadedExecutor(tasks=tasks, store=store, max_concurrency=2)
executor.run()

results = executor.get_results()

Core Concepts

Tasks

Implement the Task protocol or use SimpleFn:

from dataclasses import dataclass
from kofu import Task

@dataclass
class AnalysisTask:
    input_data: str
    _task_id: str = None
    
    def __post_init__(self):
        if self._task_id is None:
            self._task_id = f"analysis_{hash(self.input_data)}"

    @property
    def id(self) -> str:
        return self._task_id

    def __call__(self) -> dict:
        return {"result": len(self.input_data)}

Stores

Persistent storage backends:

store = SingleSQLiteTaskStore(
    directory="./data",
    serializer=JSONSerializer(compression_level=1)
)

Executors

Configure execution parameters:

executor = LocalThreadedExecutor(
    tasks=[task1, task2],
    store=store,
    max_concurrency=4,
    retry=3,
    batch_size=50
)

Advanced Usage

Resume Execution

# After crash/restart
store = SingleSQLiteTaskStore(directory="./tasks_db")
pending = store.get_pending_task_ids()

# Recreate tasks (or load from somewhere)
executor = LocalThreadedExecutor(
    tasks=[recreated_tasks[i] for i in pending],
    store=store
)
executor.run()

LLM Processing

from openai import OpenAI

client = OpenAI()

def llm_task(prompt: str) -> dict:
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}]
    )
    return {"content": response.choices[0].message.content}

tasks = [SimpleFn(f"prompt_{i}", llm_task, args=(p,)) for i, p in enumerate(prompts)]
executor = LocalThreadedExecutor(tasks, store=SingleSQLiteTaskStore("./llm_tasks"), max_concurrency=5)
executor.run()

Custom Serialization

from kofu.store import Serializer

class BSONSerializer(Serializer):
    def serialize(self, obj) -> bytes:
        import bson
        return bson.dumps(obj)
    
    def deserialize(self, data: bytes) -> Any:
        import bson
        return bson.loads(data)

store = SingleSQLiteTaskStore(directory="./data", serializer=BSONSerializer())

API Overview

Component Description
Task Protocol defining task interface
SimpleFn Ready-to-use task wrapper for functions
SingleSQLiteTaskStore Production-ready SQLite persistence
LocalThreadedExecutor Thread-pool based task executor
TaskStatus Enum (PENDING/COMPLETED/FAILED)

Contributing

Contributions welcome! Please open an issue first to discuss proposed changes.

git clone https://github.com/avyuh/kofu
uv add -e .[dev]
pytest tests/

License

MIT License. See LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kofu-0.3.0.tar.gz (30.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kofu-0.3.0-py3-none-any.whl (22.4 kB view details)

Uploaded Python 3

File details

Details for the file kofu-0.3.0.tar.gz.

File metadata

  • Download URL: kofu-0.3.0.tar.gz
  • Upload date:
  • Size: 30.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.7.3

File hashes

Hashes for kofu-0.3.0.tar.gz
Algorithm Hash digest
SHA256 bbb64052889f41189798fff73216149957a5b9cc9eb1ea074a96dda929b833cc
MD5 77136429e82a44a6de641973c8b42ff7
BLAKE2b-256 0755c0ba81eac9e306873896fc7af571ba16e4995e4fbc91fb0fd26441afc981

See more details on using hashes here.

File details

Details for the file kofu-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: kofu-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 22.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.7.3

File hashes

Hashes for kofu-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 b3ff4128265073c238ad65c4cefb0c75fa4887fe559359af4d43df0192692db1
MD5 79ada3b9b56b599cbb3f7205df515732
BLAKE2b-256 e358c459940d00a68913832e25ad8fe9b7046e684ca8dd7326d0be5fbc7d37c2

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page