No project description provided
Project description
yakits
一个轻量级的数据处理管道工具,统一支持同步函数和异步函数,提供多进程与线程池并发、批量处理、自动重试、断点续跑与结构化日志。适合用最少的代码搭建稳定的批处理与流式刷新任务。
核心思想
- 只需要三个钩子:
load_func产生输入、pipeline_func处理每条数据、save_func或output_path持久化输出。 - 同步函数用线程池执行,异步函数在每个进程内启用事件循环执行;两者共享同一套接口。
安装
- 要求
Python >= 3.12 - 从源码安装:
pip install -e .
快速上手
下面给出同步与异步的最小可运行示例。
同步示例
from typing import Dict, Iterator, Tuple, List
from async_dag import run_pipeline, Context
def load() -> Iterator[Tuple[int, Dict]]:
for i, text in enumerate(["a", "b", "c"]):
yield i, {"text": text}
def pipeline(ctx: Context) -> None:
ctx.payload["text"] = ctx.payload["text"].upper()
results: List[Context] = []
def save(ctx: Context) -> None:
results.append(ctx)
run_pipeline(
load_func=load,
pipeline_func=pipeline,
save_func=save,
process_cnt=1,
thread_cnt=2,
batch_size=3,
)
print([c.payload["text"] for c in results]) # ["A", "B", "C"]
异步示例
import asyncio
from typing import Dict, Iterator, Tuple, List
from async_dag import run_pipeline, Context
def load() -> Iterator[Tuple[int, Dict]]:
for i, text in enumerate(["x", "y"]):
yield i, {"text": text}
async def pipeline(ctx: Context) -> None:
await asyncio.sleep(0.1)
ctx.payload["text"] += "|done"
results: List[Context] = []
def save(ctx: Context) -> None:
results.append(ctx)
run_pipeline(
load_func=load,
pipeline_func=pipeline,
save_func=save,
process_cnt=1,
batch_size=2,
)
print([c.payload["text"] for c in results]) # ["x|done", "y|done"]
文件输出与断点续跑
如果不提供 save_func,可以通过 output_path 直接写出结果,且自动生成同名的 *.task_info 文件用于断点续跑。
from async_dag import run_pipeline, Context
def load():
for i in range(5):
yield i, {"value": i}
def pipeline(ctx: Context):
ctx.payload["value"] *= 2
run_pipeline(
load_func=load,
pipeline_func=pipeline,
output_path="outputs.tsv",
process_cnt=1,
thread_cnt=1,
resume=True,
)
运行后会生成两个文件:
outputs.tsv:逐行写入idx\tJSON(payload)结果outputs.tsv.task_info:记录已完成的idx与耗时,用于下次跳过已处理数据
参数速览
load_func:生成器,逐条yield (idx, payload)pipeline_func:处理逻辑,支持同步函数或async def协程save_func/output_path:二选一;未提供save_func时必须提供output_pathprocess_cnt:工作进程数;异步与同步均可按进程并发thread_cnt:每个进程的线程数,仅在同步函数时生效buffer_size:内部队列的缓冲上限,控制内存占用batch_size:单次发送到 worker 的上下文数量;同步函数需满足batch_size >= thread_cntmax_retries:失败自动重试次数上限resume:为真时会读取*.task_info跳过已完成的idxrun_debug:为真时只处理一个小批次,便于本地调试
日志与追踪
- 统一使用
Context.info / warning / error记录结构化日志,可在处理完后调用Context.print_logs()输出。 - 若安装了
colorlog,日志会以彩色格式输出;否则退化为标准控制台格式。
代码结构
async_dag/context.py:上下文模型与日志辅助async_dag/pipeline.py:生产者/消费者、进程与线程并发、重试与持久化async_dag/logger.py:彩色与普通日志初始化
运行测试
仓库内提供了简单的单元测试,示例参见 tests/。若已安装 pytest,可运行:
pytest -q
设计细节
- 异步函数在每个进程内通过事件循环批量执行;同步函数通过线程池并发执行。
- 失败重试在 worker 内部进行,超过
max_retries的任务会被丢弃并记录日志。 - 当提供
output_path时,内置保存器会同时写出*.task_info以支持断点续跑。
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
yakits-0.1.0.tar.gz
(10.3 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file yakits-0.1.0.tar.gz.
File metadata
- Download URL: yakits-0.1.0.tar.gz
- Upload date:
- Size: 10.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.6.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
58f80fe0fa36273a1c33555e8b4c77630b73bdee51e425d1c5ad80bf289f7b26
|
|
| MD5 |
5b12e05ffa3f37dee9970a9036f402aa
|
|
| BLAKE2b-256 |
450fc5be7fd34a061f92f9877caa6a552e302691d5ccccba382bd85eb4252c14
|
File details
Details for the file yakits-0.1.0-py3-none-any.whl.
File metadata
- Download URL: yakits-0.1.0-py3-none-any.whl
- Upload date:
- Size: 8.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.6.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4eb2a2cb95e318bdc9300e1c54af8ce8f52775993bcd8c59aea5d73ba707e9b4
|
|
| MD5 |
f63a662008070755ecfd9063e856a805
|
|
| BLAKE2b-256 |
2e53981be81b7900a24d40d03dae2a6d6ed203b5353add09175b23e66252fadb
|