An execution framework for i/o heavy task with memory persistence and concurrency
Project description
Kofu
Kofu (Japanese for "Miner") is a robust task execution framework with persistence, designed for I/O-heavy workloads like web scraping and LLM synthetic data generation on a single machine. It focuses on local concurrent execution, not distributed cluster computing.
Features
- Persistent Execution: Survive restarts/crashes with SQLite-backed state
- Concurrent Processing: Thread-based parallelism with configurable workers
- Atomic Operations: Batch updates with transaction safety
- Automatic Retries: Configurable retry logic with exponential backoff
- Progress Tracking: Built-in tqdm integration for execution monitoring
Installation
uv add kofu
Quick Start
from kofu import LocalThreadedExecutor, Task
from kofu.store import SingleSQLiteTaskStore
from kofu.tasks import SimpleFn
def fetch_url(url: str) -> dict:
import requests
response = requests.get(url)
return {"status": response.status_code, "content": response.text[:100]}
tasks = [
SimpleFn("example", fetch_url, args=("https://example.com",)),
SimpleFn("python", fetch_url, args=("https://python.org",))
]
store = SingleSQLiteTaskStore(directory="./tasks_db")
executor = LocalThreadedExecutor(tasks=tasks, store=store, max_concurrency=2)
executor.run()
results = executor.get_results()
Core Concepts
Tasks
Implement the Task protocol or use SimpleFn:
from dataclasses import dataclass
from kofu import Task
@dataclass
class AnalysisTask:
input_data: str
_task_id: str = None
def __post_init__(self):
if self._task_id is None:
self._task_id = f"analysis_{hash(self.input_data)}"
@property
def id(self) -> str:
return self._task_id
def __call__(self) -> dict:
return {"result": len(self.input_data)}
Stores
Persistent storage backends:
store = SingleSQLiteTaskStore(
directory="./data",
serializer=JSONSerializer(compression_level=1)
)
Executors
Configure execution parameters:
executor = LocalThreadedExecutor(
tasks=[task1, task2],
store=store,
max_concurrency=4,
retry=3,
batch_size=50
)
Advanced Usage
Resume Execution
# After crash/restart
store = SingleSQLiteTaskStore(directory="./tasks_db")
pending = store.get_pending_task_ids()
# Recreate tasks (or load from somewhere)
executor = LocalThreadedExecutor(
tasks=[recreated_tasks[i] for i in pending],
store=store
)
executor.run()
LLM Processing
from openai import OpenAI
client = OpenAI()
def llm_task(prompt: str) -> dict:
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}]
)
return {"content": response.choices[0].message.content}
tasks = [SimpleFn(f"prompt_{i}", llm_task, args=(p,)) for i, p in enumerate(prompts)]
executor = LocalThreadedExecutor(tasks, store=SingleSQLiteTaskStore("./llm_tasks"), max_concurrency=5)
executor.run()
Custom Serialization
from kofu.store import Serializer
class BSONSerializer(Serializer):
def serialize(self, obj) -> bytes:
import bson
return bson.dumps(obj)
def deserialize(self, data: bytes) -> Any:
import bson
return bson.loads(data)
store = SingleSQLiteTaskStore(directory="./data", serializer=BSONSerializer())
API Overview
| Component | Description |
|---|---|
Task |
Protocol defining task interface |
SimpleFn |
Ready-to-use task wrapper for functions |
SingleSQLiteTaskStore |
Production-ready SQLite persistence |
LocalThreadedExecutor |
Thread-pool based task executor |
TaskStatus |
Enum (PENDING/COMPLETED/FAILED) |
Contributing
Contributions welcome! Please open an issue first to discuss proposed changes.
git clone https://github.com/avyuh/kofu
uv add -e .[dev]
pytest tests/
License
MIT License. See LICENSE for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file kofu-0.3.0.tar.gz.
File metadata
- Download URL: kofu-0.3.0.tar.gz
- Upload date:
- Size: 30.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.7.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
bbb64052889f41189798fff73216149957a5b9cc9eb1ea074a96dda929b833cc
|
|
| MD5 |
77136429e82a44a6de641973c8b42ff7
|
|
| BLAKE2b-256 |
0755c0ba81eac9e306873896fc7af571ba16e4995e4fbc91fb0fd26441afc981
|
File details
Details for the file kofu-0.3.0-py3-none-any.whl.
File metadata
- Download URL: kofu-0.3.0-py3-none-any.whl
- Upload date:
- Size: 22.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.7.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b3ff4128265073c238ad65c4cefb0c75fa4887fe559359af4d43df0192692db1
|
|
| MD5 |
79ada3b9b56b599cbb3f7205df515732
|
|
| BLAKE2b-256 |
e358c459940d00a68913832e25ad8fe9b7046e684ca8dd7326d0be5fbc7d37c2
|