Skip to main content

No project description provided

Project description

yakits

一个轻量级的数据处理管道工具,统一支持同步函数和异步函数,提供多进程与线程池并发、批量处理、自动重试、断点续跑与结构化日志。适合用最少的代码搭建稳定的批处理与流式刷新任务。

核心思想

  • 只需要三个钩子:load_func 产生输入、pipeline_func 处理每条数据、save_funcoutput_path 持久化输出。
  • 同步函数用线程池执行,异步函数在每个进程内启用事件循环执行;两者共享同一套接口。

安装

  • 要求 Python >= 3.12
  • 从源码安装:
pip install -e .

快速上手

下面给出同步与异步的最小可运行示例。

同步示例

from typing import Dict, Iterator, Tuple, List
from async_dag import run_pipeline, Context

def load() -> Iterator[Tuple[int, Dict]]:
    for i, text in enumerate(["a", "b", "c"]):
        yield i, {"text": text}

def pipeline(ctx: Context) -> None:
    ctx.payload["text"] = ctx.payload["text"].upper()

results: List[Context] = []
def save(ctx: Context) -> None:
    results.append(ctx)

run_pipeline(
    load_func=load,
    pipeline_func=pipeline,
    save_func=save,
    process_cnt=1,
    thread_cnt=2,
    batch_size=3,
)

print([c.payload["text"] for c in results])  # ["A", "B", "C"]

异步示例

import asyncio
from typing import Dict, Iterator, Tuple, List
from async_dag import run_pipeline, Context

def load() -> Iterator[Tuple[int, Dict]]:
    for i, text in enumerate(["x", "y"]):
        yield i, {"text": text}

async def pipeline(ctx: Context) -> None:
    await asyncio.sleep(0.1)
    ctx.payload["text"] += "|done"

results: List[Context] = []
def save(ctx: Context) -> None:
    results.append(ctx)

run_pipeline(
    load_func=load,
    pipeline_func=pipeline,
    save_func=save,
    process_cnt=1,
    batch_size=2,
)

print([c.payload["text"] for c in results])  # ["x|done", "y|done"]

文件输出与断点续跑

如果不提供 save_func,可以通过 output_path 直接写出结果,且自动生成同名的 *.task_info 文件用于断点续跑。

from async_dag import run_pipeline, Context

def load():
    for i in range(5):
        yield i, {"value": i}

def pipeline(ctx: Context):
    ctx.payload["value"] *= 2

run_pipeline(
    load_func=load,
    pipeline_func=pipeline,
    output_path="outputs.tsv",
    process_cnt=1,
    thread_cnt=1,
    resume=True,
)

运行后会生成两个文件:

  • outputs.tsv:逐行写入 idx\tJSON(payload) 结果
  • outputs.tsv.task_info:记录已完成的 idx 与耗时,用于下次跳过已处理数据

参数速览

  • load_func:生成器,逐条 yield (idx, payload)
  • pipeline_func:处理逻辑,支持同步函数或 async def 协程
  • save_func / output_path:二选一;未提供 save_func 时必须提供 output_path
  • process_cnt:工作进程数;异步与同步均可按进程并发
  • thread_cnt:每个进程的线程数,仅在同步函数时生效
  • buffer_size:内部队列的缓冲上限,控制内存占用
  • batch_size:单次发送到 worker 的上下文数量;同步函数需满足 batch_size >= thread_cnt
  • max_retries:失败自动重试次数上限
  • resume:为真时会读取 *.task_info 跳过已完成的 idx
  • run_debug:为真时只处理一个小批次,便于本地调试

日志与追踪

  • 统一使用 Context.info / warning / error 记录结构化日志,可在处理完后调用 Context.print_logs() 输出。
  • 若安装了 colorlog,日志会以彩色格式输出;否则退化为标准控制台格式。

代码结构

  • async_dag/context.py:上下文模型与日志辅助
  • async_dag/pipeline.py:生产者/消费者、进程与线程并发、重试与持久化
  • async_dag/logger.py:彩色与普通日志初始化

运行测试

仓库内提供了简单的单元测试,示例参见 tests/。若已安装 pytest,可运行:

pytest -q

设计细节

  • 异步函数在每个进程内通过事件循环批量执行;同步函数通过线程池并发执行。
  • 失败重试在 worker 内部进行,超过 max_retries 的任务会被丢弃并记录日志。
  • 当提供 output_path 时,内置保存器会同时写出 *.task_info 以支持断点续跑。

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

yakits-0.1.0.tar.gz (10.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

yakits-0.1.0-py3-none-any.whl (8.4 kB view details)

Uploaded Python 3

File details

Details for the file yakits-0.1.0.tar.gz.

File metadata

  • Download URL: yakits-0.1.0.tar.gz
  • Upload date:
  • Size: 10.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.6.14

File hashes

Hashes for yakits-0.1.0.tar.gz
Algorithm Hash digest
SHA256 58f80fe0fa36273a1c33555e8b4c77630b73bdee51e425d1c5ad80bf289f7b26
MD5 5b12e05ffa3f37dee9970a9036f402aa
BLAKE2b-256 450fc5be7fd34a061f92f9877caa6a552e302691d5ccccba382bd85eb4252c14

See more details on using hashes here.

File details

Details for the file yakits-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: yakits-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 8.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.6.14

File hashes

Hashes for yakits-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 4eb2a2cb95e318bdc9300e1c54af8ce8f52775993bcd8c59aea5d73ba707e9b4
MD5 f63a662008070755ecfd9063e856a805
BLAKE2b-256 2e53981be81b7900a24d40d03dae2a6d6ed203b5353add09175b23e66252fadb

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page