Skip to main content

SilkLoom Core: minimal stateful batch engine for LLM and VLM workloads

Project description

📚 SilkLoom Core 3.0 API 设计与架构指南

1. 核心设计哲学 (Design Philosophy)

SilkLoom Core 3.0 定位于极简、高可用、结构化的大模型多模态批处理引擎

  • 对外:低心智负担。 坚持“一行代码跑通”,仅保留一套核心 API。
  • 对内:工程痛点吞噬者。 采用“计算与存储分离”的流水线,默默处理并发调度、断点续跑、图片转换组装,以及依赖 json_repair 的脏 JSON 自动挽救。

2. 内部架构与数据流转 (Architecture & Pipeline)

为了实现“可并行、可中断、可异步”,引擎底层执行器遵循以下状态流转:

  1. 哈希指纹化 (Fingerprinting):引擎遍历输入序列,为每个输入字典生成 SHA-256 哈希值作为唯一 Task ID
  2. 缓存拦截 (Cache Intercept):如果指定了 task_name,引擎会查询本地 SQLite 库,命中的任务直接标记成功,瞬间通过。
  3. 并发调度 (Worker Pool):未命中缓存的任务进入同步线程池 (ThreadPoolExecutor) 或异步任务组 (asyncio.gather)。
  4. 原子持久化 (Atomic Persist):任务一旦成功(含 JSON 修复),立刻原子级 Upsert 写入 SQLite WAL 模式缓存。无论外部环境如何崩溃,已完成数据绝对安全。
  5. 灵活消费 (Flexible Consumption):通过 map(阻塞重排组装)或 stream(流式实时释放)将结果交付给前端。

3. 核心 API 参考 (Core API Reference)

3.1 主引擎:TaskLoom

TaskLoom 是系统的唯一入口。支持作为上下文管理器(Context Manager)使用以自动释放底层连接资源。

from typing import Any, AsyncGenerator, Generator, Generic, Iterable, TypeVar
from pydantic import BaseModel
from silkloom_core import TaskResult, BatchResult

T = TypeVar("T")

class TaskLoom(Generic[T]):
    def __init__(
        self,
        model: str,
        prompt_template: str,
        system_prompt: str | None = None,
        response_model: type[BaseModel] | type[dict] | None = None,
        auto_repair_json: bool = True,
        max_retries: int = 3,
        client: Any | None = None,
        db_path: str = ".silkloom.db",
        **llm_kwargs: Any
    ): ...

    def close(self): ...
    
    # 支持 Context Manager
    def __enter__(self) -> "TaskLoom": ...
    def __exit__(self, exc_type, exc_val, exc_tb): ...

核心参数说明:

  • response_model: 决定输出形态。传入 Pydantic 模型 返回强类型对象;传入 dict 返回字典;传入 None 返回原始文本。
  • prompt_template: Jinja2 语法的提示词模板(如 "分析:{{ text }}")。

3.2 数据模型 (Data Models)

**单条快照:TaskResult[T]**

class TaskResult(BaseModel, Generic[T]):
    task_id: str              # 根据输入内容生成的唯一 Hash 指纹
    is_success: bool          # 任务是否成功解析
    data: T | None            # 最终结构化数据 (Pydantic实例 / Dict / Str)
    error: str | None         # 异常堆栈 (仅失败时存在)
    input_data: dict          # 触发该任务的原始输入字典
    raw_output: str | None    # 大模型返回的原始字符串 (供脏数据兜底排查)
    reasoning: str | None     # 模型推理过程 (如 DeepSeek <think>)
    cached: bool              # 标识该结果是否来自于 SQLite 缓存

**批次集合:BatchResult[T]** (仅 map/amap 返回)

class BatchResult(Generic[T]):
    results: list[TaskResult[T]]
    
    def successful(self) -> list[TaskResult[T]]: ...
    def failed(self) -> list[TaskResult[T]]: ...
    def to_pandas(self) -> "pd.DataFrame": ...  # 自动展平 data 导出

3.3 执行模式 (Execution Matrix)

所有输入数据源(data / sequence)统一接受 dictdict 的集合。包含 images 键时自动触发多模态逻辑。

1. 单例执行 (Single)

适用于即时问答或单条测试。

def process(self, data: str | dict) -> TaskResult[T]: ...
async def aprocess(self, data: str | dict) -> TaskResult[T]: ...

2. 阻塞批处理 (Blocking Batch)

适用于后台脚本或定时任务。等待所有任务跑完后一次性返回汇总结果。默认保证返回顺序与输入顺序完全一致。

def map(
    self,
    sequence: Iterable[str | dict],
    task_name: str | None = None,
    max_workers: int = 5,
) -> BatchResult[T]: ...

async def amap(
    self,
    sequence: Iterable[str | dict],
    task_name: str | None = None,
    max_workers: int = 5,
) -> BatchResult[T]: ...

3. 流式批处理 (Streaming Batch) - ✨ 核心高阶 API

适用于前端 UI(Gradio / Streamlit)或响应式 API。极低内存占用,按完成状态实时 Yield

def stream(
    self,
    sequence: Iterable[str | dict],
    task_name: str | None = None,
    max_workers: int = 5,
    ordered: bool = False,
) -> Generator[TaskResult[T], None, None]: ...

async def astream(
    self,
    sequence: Iterable[str | dict],
    task_name: str | None = None,
    max_workers: int = 5,
    ordered: bool = False,
) -> AsyncGenerator[TaskResult[T], None]: ...

关于 ordered 参数:

  • ordered=False (默认):先处理完的任务先返回。速度最快,UI 响应体验最佳。
  • ordered=True:内部建立缓冲队列,严格按 sequence 的原顺序阻塞 Yield。

4. 典型应用范例 (Best Practices)

场景一:纯文本结构化抽取 (Pydantic 兜底)

文本数据通过 Jinja2 渲染,底层自动执行校验与重试。

from pydantic import BaseModel
from silkloom_core import TaskLoom

class UserProfile(BaseModel):
    name: str
    skills: list[str]

with TaskLoom(
    model="gpt-4o-mini",
    prompt_template="提取简历中的信息:{{ text }}",
    response_model=UserProfile,
) as loom:

    # 自动开启缓存与断点续跑机制
    results = loom.map(
        [{"text": "张三精通 Python"}, {"text": "李四会设计"}], 
        task_name="cv_parse_v1",
        max_workers=5
    )

    print(results.successful()[0].data.skills) # ["Python"]

场景二:多模态图像批处理

引擎自动拦截 images 字段,完成本地图片转码/远程图片拉取,组装为大模型支持的复杂协议。

loom = TaskLoom(
    model="qwen-vl-max", 
    prompt_template="根据要求分析图片:{{ instruction }}",
    response_model=dict,
)

result = loom.process({
    "instruction": "提取图中的菜名和总价",
    "images": [
        "./receipt_01.jpg",               # 自动读取并转 Base64
        "https://example.com/menu.png"    # URL 直接透传
    ]
})

场景三:Gradio 流式渲染与断点续跑 (UI 融合)

利用 stream,哪怕网页中途关闭,已跑完的数据早已安全落库。再次点击瞬间完成缓存加载,进度条无缝续接。

import gradio as gr
from silkloom_core import TaskLoom

loom = TaskLoom(
    model="deepseek-chat",
    prompt_template="总结论文核心方法:{{ text }}",
    response_model=dict
)

def process_papers(papers_list, progress=gr.Progress()):
    total = len(papers_list)
    results_list = []
    
    # stream(ordered=False) 保证最快的视觉反馈
    generator = loom.stream(
        sequence=papers_list,
        task_name="gradio_paper_batch_v1",  # 开启容灾缓存
        max_workers=10
    )
    
    for i, task in enumerate(generator, 1):
        status = "✅ 成功" if task.is_success else "❌ 失败"
        progress(i / total, desc=f"进度: {i}/{total} | 最新: {status}")
        
        results_list.append({
            "处理状态": status,
            "提取数据": task.data,
            "缓存命中": task.cached
        })
        
        # 实时逐行更新 UI 表格
        yield results_list

场景四:FastAPI 异步 SSE 实时推送

在现代后端架构中,使用 astream 释放 ASGI 容器的最高并发性能。

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json

app = FastAPI()
loom = TaskLoom(
    model="gpt-4o-mini",
    prompt_template="提取发票关键信息:{{ text }}",
    response_model=dict
)

@app.post("/api/batch_extract")
async def batch_extract(payload: list[dict]):
    
    async def event_stream():
        # 充分利用 asyncio 的并发调度机制
        async for task in loom.astream(payload, task_name="invoice_prod", max_workers=20):
            yield f"data: {json.dumps(task.model_dump())}\n\n"
            
    return StreamingResponse(event_stream(), media_type="text/event-stream")

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

silkloom_core-3.0.0.tar.gz (18.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

silkloom_core-3.0.0-py3-none-any.whl (14.4 kB view details)

Uploaded Python 3

File details

Details for the file silkloom_core-3.0.0.tar.gz.

File metadata

  • Download URL: silkloom_core-3.0.0.tar.gz
  • Upload date:
  • Size: 18.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for silkloom_core-3.0.0.tar.gz
Algorithm Hash digest
SHA256 cdf1cb780a1fadb0ce531cec90889e675343b212347df37251241df85d3845f1
MD5 1e275331eb7b4eeaf20d03f255599de6
BLAKE2b-256 f896251a19807cfec1c39433b362b9d6fa5f0fed6ceec276f41ee7f56499f909

See more details on using hashes here.

Provenance

The following attestation bundles were made for silkloom_core-3.0.0.tar.gz:

Publisher: publish.yml on LeLiu-GeoAI/silkloom-core

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file silkloom_core-3.0.0-py3-none-any.whl.

File metadata

  • Download URL: silkloom_core-3.0.0-py3-none-any.whl
  • Upload date:
  • Size: 14.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for silkloom_core-3.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 13ede1fe5a0ad86f5f04b22ecb52a03595a123e9ba118867d9fbcd30963e3dd2
MD5 96f2caa32c71f23fff72525ff2025f76
BLAKE2b-256 520ed8121963e3ac20a2ec6ef57e0d9bb3b4c91897211df32cedde4d052103ab

See more details on using hashes here.

Provenance

The following attestation bundles were made for silkloom_core-3.0.0-py3-none-any.whl:

Publisher: publish.yml on LeLiu-GeoAI/silkloom-core

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page