Skip to main content

AI 榨汁机 (AIJuicer) Agent SDK —— 5 分钟接入流水线的一个步骤

Project description

AI 榨汁机 · Python SDK 使用文档

aijuicer_sdk 是 AI 榨汁机(AIJuicer)的 Python Agent SDK。 你用它写出的小脚本,就能作为流水线里某一 step 的执行者:从任务队列里拉任务、处理、写产物、上报结果——调度、重试、心跳、幂等、跨步读产物这些琐事 SDK 全都替你处理好。

适用人群:想把自己的 agent 接入 AI 榨汁机流水线的 Python 开发者。


目录


它是什么

AI 榨汁机的流水线是 固定 6 步(按顺序):

idea → requirement → plan → design → devtest → deploy

每一步都需要一个专门的 agent 来做实际工作。SDK 帮你写出这样一个 agent——一个进程,以装饰器风格注册 handler,然后常驻 → 拉任务 → 处理 → 上报。你只需要关心"拿到输入、产出结果",其他都是 SDK 的事。

SDK 负责的事:

  • 向 scheduler 注册(POST /api/agents/register
  • 从 Redis Streams 拉任务(XREADGROUP tasks:<step> agents:<step>
  • 并发控制(asyncio.Semaphore
  • 执行期间自动心跳(默认每 30 秒)
  • handler 成功 → 调 PUT /api/tasks/<id>/complete
  • handler 抛异常 → 按异常类型分类上报 /fail(决定是否重试)
  • XACK 消息消费完成
  • 产物原子落盘(.tmp → fsync → rename)+ sha256 + POST /api/artifacts 注册元数据
  • SIGTERM 优雅退出:停止拉新任务,等当前任务跑完

你负责的事:

  • 写一个 async def handle(ctx, task): ... 函数
  • 里面爱咋处理咋处理(调 LLM、爬数据、写代码、画 UI 原型 …)
  • ctx.save_artifact(...) 写产物
  • 成功就 return dict;恢复不了就 raise FatalError;暂时挂了就 raise RetryableError

安装

SDK 当前内嵌在 AI 榨汁机主仓库,随仓库一起装:

git clone <AIJuicer 仓库>
cd AIJuicer
python3.12 -m venv .venv && source .venv/bin/activate
pip install -e '.[dev]'

装完你就能 from aijuicer_sdk import Agent

未来 SDK 会独立发布到 PyPI(pip install aijuicer-sdk),敬请期待。


5 分钟上手

写一个 my_agent.py

from aijuicer_sdk import Agent

agent = Agent(
    name="my-requirement-agent",   # 这个 agent 实例的名字,允许同 step 有多实例
    step="requirement",            # 负责哪一步(6 选 1)
    server="http://127.0.0.1:8000",
    redis_url="redis://127.0.0.1:6379/0",
    concurrency=1,
)


@agent.handler
async def handle(ctx, task):
    topic = task["input"]["topic"]
    await ctx.heartbeat("正在写需求文档")

    # 读上一步 idea 的产出
    idea = ctx.load_artifact("idea", "idea.md").decode("utf-8")

    # 做你的活儿
    req_md = f"# 需求文档\n\n源自 idea:\n```\n{idea[:300]}\n```\n..."

    # 写产物(原子落盘 + 自动注册到 scheduler)
    await ctx.save_artifact("requirements.md", req_md, content_type="text/markdown")

    # 返回的 dict 会写进 step_executions.output,供审计与后续步骤查看
    return {"features": 8, "stories": 5}


if __name__ == "__main__":
    agent.run()        # 阻塞跑到 SIGTERM

启动:

python my_agent.py

这个进程会持续活着,每当有新的 requirement 任务进来就处理一次。Ctrl+C 退出时 SDK 会等当前任务跑完再停。


核心概念

Agent — 一个 agent 进程。由 namestep 唯一标识这一类消费者,concurrency 控制同一进程内能并发处理几个任务。同一个 step 可以起多个进程(扩容),它们会通过 Redis consumer group 均衡拉任务。

@agent.handler — 业务逻辑入口。一个 Agent 只能有一个 handler。它会被每个拉到的任务调用一次。

AgentContext(ctx) — 每次 handler 被调用时 SDK 传给你的上下文对象。提供:

  • 只读属性:task_id / workflow_id / step / attempt / input / artifact_root / request_id
  • 副作用方法:save_artifactload_artifactheartbeat
  • 结构化日志:ctx.log.info/warn/error("event.name", k1=v1, ...)

task(第二个参数) — Redis Streams 里的 task payload(dict),包含:

{
  "task_id": "uuid...",
  "workflow_id": "uuid...",
  "step": "requirement",
  "attempt": 1,
  "input": {"topic": "..."},         # workflow 创建时传入的 input
  "artifact_root": "var/.../<wf>",
  "request_id": "req_...",
}

大部分业务只需要用 task["input"];其它字段 SDK 已经帮你放进 ctx 了。


完整 API

Agent 构造参数

Agent(
    *,
    name: str,                        # 必填,agent 实例名
    step: str,                        # 必填,流水线的哪一步
    server: str | None = None,        # scheduler 的 HTTP base URL;默认读 env AIJUICER_SERVER
    redis_url: str | None = None,     # Redis URL;默认读 env AIJUICER_REDIS_URL
    concurrency: int = 1,             # 同进程内的最大并发任务数
    block_ms: int = 5000,             # XREADGROUP 阻塞的毫秒数
    heartbeat_interval: float = 30.0, # handler 执行期间自动心跳的间隔(秒)
    configure_logging: bool = True,   # 是否让 SDK 帮你配置 structlog JSON 输出
)

参数对应 6 个 step 的合法取值:

idea / requirement / plan / design / devtest / deploy

@agent.handler 装饰器

@agent.handler
async def handle(ctx: AgentContext, task: dict) -> dict | None:
    ...

契约:

  • 必须是 async def(SDK 基于 asyncio)
  • 一个 Agent 只能注册一个 handler;重复注册会 RuntimeError
  • 返回一个 dict(会作为 step_executions.output 入库);返回 None 等同于 {}
  • 抛出 FatalError → scheduler 不重试、workflow 进入 AWAITING_MANUAL_ACTION
  • 抛出 RetryableError → 在 max_retries 内自动重试
  • 抛出其它 Exception → 保守按 RetryableError 处理(避免 bug 导致任务丢失)

agent.run()

阻塞运行主循环直到收到 SIGTERMSIGINT。内部做的事:

  1. POST /api/agents/register 拿到 agent_id
  2. 确保自己的 Redis consumer group 存在(XGROUP CREATE ... MKSTREAMBUSYGROUP 视为已存在)
  3. 进入 while 循环:XREADGROUP 拉一批消息 → 每条 spawn 一个 worker task(受 concurrency 信号量限制)
  4. 每个 worker:
    • PUT /api/tasks/<id>/start(收到 started=False 说明重复投递,直接跳过)
    • 启动自动心跳 sibling task
    • 调你的 handler
    • 成功:PUT /complete + XACK
    • FatalErrorPUT /fail?retryable=false + XACK
    • RetryableError / 其它:PUT /fail?retryable=true + XACK
    • 无论成功失败:取消心跳 task + XACK + 释放信号量
  5. 收到信号:设置 _shutdown event → 停止拉新任务 → asyncio.gather 等当前 inflight 跑完

AgentContext

class AgentContext:
    # 只读属性
    task_id: str              # 这次任务的 UUID
    workflow_id: str          # 所属 workflow 的 UUID
    step: str                 # 你负责的 step(= Agent 构造时的 step)
    attempt: int              # 本次是第几次尝试(从 1 开始;失败重试会递增)
    input: dict               # workflow 的原始 input(不随 step 变)
    artifact_root: Path       # 本 workflow 的产物根目录
    request_id: str           # 全链路 request_id
    log: structlog.BoundLogger  # 预绑定好上下文的日志器

    # 方法
    async def save_artifact(key: str, data: str | bytes, *,
                            content_type: str | None = None) -> ArtifactRef: ...
    def     load_artifact(step: str, key: str) -> bytes: ...
    async def heartbeat(message: str | None = None) -> None: ...

ArtifactRefsave_artifact 返回):

@dataclass
class ArtifactRef:
    key: str         # 你传的 key
    path: Path       # 磁盘上的绝对路径
    size_bytes: int  # 字节数
    sha256: str      # 内容 sha256

异常类

from aijuicer_sdk import RetryableError, FatalError

raise RetryableError("LLM rate limit, 稍后重试")
raise FatalError("input 缺少 topic 字段,人工检查")

产物(artifact)规则

每个 workflow 有一个独立的产物根目录 artifact_root,结构按 step 前缀分目录:

var/aijuicer/artifacts/workflows/<workflow_id>/
├── 01_idea/
│   ├── idea.md
│   └── candidates.json
├── 02_requirement/
│   └── requirements.md
├── 03_plan/
│   ├── plan.md
│   └── plan.json
├── 04_design/
│   ├── wireframe.svg
│   └── prototype.html
├── 05_devtest/
│   ├── app.py
│   └── test_report.json
└── 06_deploy/
    ├── artifact.tar.gz
    └── deploy.md

save_artifact 的保证:

  1. 原子写:先写到 <key>.tmpfsyncrename 到最终路径,不会出现"写了一半"的残文件。
  2. sha256:自动计算并随元数据一起发给 scheduler。
  3. 元数据自动注册:调用 POST /api/artifacts,把 (workflow_id, step, key, path, size_bytes, sha256, content_type) 入库,供 Web UI 列表与下载。
  4. 幂等:同一 (workflow_id, step, key) 再次调用会 UPSERT(ON CONFLICT DO UPDATE),重试场景不会留下孤儿行。
  5. content_type 推断:不传 content_type 时按 key 的后缀用 mimetypes.guess_type 推断。
# 字符串或字节都行
await ctx.save_artifact("idea.md", "# hello")
await ctx.save_artifact("bundle.tar.gz", raw_bytes, content_type="application/gzip")

# 返回值可以追溯路径
ref = await ctx.save_artifact("plan.json", json.dumps(plan))
ctx.log.info("saved", path=str(ref.path), size=ref.size_bytes)

跨步骤读取上游产物

# 在 requirement agent 里读 idea 那一步写的 idea.md
raw = ctx.load_artifact("idea", "idea.md")   # 返回 bytes
text = raw.decode("utf-8")

约束:第一版假设 agent 和 scheduler 共享文件系统(同机或挂了 NFS)。未来如果 agent 跑在远程无共享 FS 的机器,SDK 会自动 fallback 到 GET /api/artifacts/<id>/content HTTP 读取(M5+ 规划);当前还是走本地文件系统。


错误处理与重试

三种结局:

抛出 SDK 上报 调度器行为
return dict PUT /complete step 标 succeeded,按 policy 推进到下一步
raise RetryableError(...) PUT /fail?retryable=true 如果 attempt < max_retries 自动重试(新 attempt 入队);否则转 AWAITING_MANUAL_ACTION
raise FatalError(...) PUT /fail?retryable=false 不重试,直接转 AWAITING_MANUAL_ACTION
raise <其它 Exception> 当作 Retryable 处理 RetryableError(保守策略,避免 bug 静默丢任务)

max_retries 由 scheduler 侧配置(默认 3,读自 AIJUICER_MAX_RETRIES)。

典型用法:

@agent.handler
async def handle(ctx, task):
    try:
        resp = await call_llm(task["input"]["topic"])
    except RateLimitError as e:
        raise RetryableError(f"LLM 限流:{e}") from e
    except BadInputError as e:
        raise FatalError(f"输入有问题,人工看一下:{e}") from e

    await ctx.save_artifact("result.md", resp.text)
    return {"tokens": resp.usage.total_tokens}

心跳与超时

  • 自动心跳:handler 执行期间,SDK 自动起一个 sibling task 每 heartbeat_interval(默认 30s)秒调一次 PUT /api/tasks/<id>/heartbeat。handler 返回或抛异常时自动 cancel。
  • 手动心跳:可以随时 await ctx.heartbeat("当前进度描述") 上报进度消息(会写进 step_executions.heartbeat_message,UI 可见)。
  • 超时判定在 scheduler 侧heartbeat_monitorheartbeat_interval_sec // 2 秒扫一次,对 status='running' AND last_heartbeat_at < now() - heartbeat_timeout_sec(默认 90s)的 step,按 retryable=TrueTaskService.fail——要么进入下一个 attempt,要么转人工介入。

所以只要你的 handler 不卡死超过 90s 不心跳就安全。LLM 调用、长下载等"慢但还在跑"的场景,SDK 的自动心跳会自动照顾到;特殊情况(比如 handler 里有一个 asyncio.sleep(600) 的长等待)你也不用额外做什么——心跳 task 是同级 asyncio task,不受 handler 阻塞影响。


幂等与重复投递

分布式系统中,任务被投递多次是常态(scheduler 启动恢复、agent 挂掉后心跳超时等)。SDK 对此的处理:

  1. PUT /start 时 scheduler 检查 step 当前状态。如果 step 已经不是 pending(说明别人已经接走了),API 返回 {"started": false},SDK 跳过 handler,只 XACK,不会二次执行。
  2. 你自己也最好让产物生成路径内容可重现(相同 input → 相同产出)。save_artifact 的 UPSERT 会兜底,但 handler 里不要有"第一次跑就消费一次配额"的副作用。

如果你的 handler 本身有强副作用(调付费 API、发邮件等),可以额外用 ctx.task_id 作为幂等 key 自己做去重。


日志与链路追踪

SDK 默认启用 structlog + JSON 输出:

ctx.log.info("fetching.llm", model="gpt-4.1", tokens_in=1200)
ctx.log.warning("rate.limited", retry_after_sec=30)

输出示例:

{
  "timestamp": "2026-04-24T02:53:14.123Z",
  "level": "info",
  "message": "fetching.llm",
  "request_id": "req_a1b2c3",
  "workflow_id": "uuid...",
  "step": "requirement",
  "attempt": 1,
  "task_id": "uuid...",
  "model": "gpt-4.1",
  "tokens_in": 1200
}

request_id 全链路贯通:从提交 workflow 的 HTTP 请求 → scheduler 日志 → Redis payload → SDK handler 日志 → 你调回 scheduler 的 HTTP header → scheduler 二次日志。用 grep req_a1b2c3 *.log 能看到完整链路。

不喜欢 SDK 默认日志格式?构造时传 configure_logging=False,自己配 structlog.configure(...) 即可。


环境变量

SDK 会读取(构造参数优先,环境变量兜底):

变量 默认 说明
AIJUICER_SERVER http://localhost:8000 scheduler HTTP base URL
AIJUICER_REDIS_URL redis://localhost:6379/0 Redis URL

典型 .env 片段:

export AIJUICER_SERVER=http://127.0.0.1:8000
export AIJUICER_REDIS_URL=redis://127.0.0.1:6379/0

常见问题

Q: 同一个 step 可以跑多个 agent 进程吗? A: 可以。它们通过同一个 Redis consumer group 均衡拉任务(XREADGROUP 原生语义)。这是水平扩容的主要方式。

Q: name 必须唯一吗? A: 不需要。name 只是人类可读标识,agent_id 是 scheduler 分配的 UUID。多进程用同一 name 没问题。

Q: handler 里可以启动别的 asyncio task 吗? A: 可以,但建议在 handler 返回前 await 掉它们。handler 返回就意味着任务完成,scheduler 会把 step 标 succeeded,再启动的 task 不会影响状态但可能被进程退出 cancel。

Q: 能处理同步函数吗? A: handler 本身必须 async def,但里面可以 await asyncio.to_thread(sync_fn, ...) 跑同步代码。

Q: 产物很大(几 GB)怎么办? A: save_artifact 当前是一次性写整块 bytes,适合中小文件(< 几十 MB)。巨大产物建议你自己流式写到 ctx.artifact_root / <step_dir> / key,然后手动 ctx._client.create_artifact(...)(未来 SDK 会提供更正式的流式接口)。

Q: 我的 agent 起不来,卡在 agent.registered 之前? A: 99% 概率是 scheduler 或 Redis 没连上。检查:

  • curl $AIJUICER_SERVER/health 应该返回 {"status":"ok"}
  • redis-cli -u $AIJUICER_REDIS_URL ping 应该返回 PONG

Q: 如何本地测试我的 handler 逻辑,不想真的连 scheduler / Redis? A: 直接单元测试:用 AsyncMock 替换 SchedulerClient,手动构造 AgentContext

from unittest.mock import AsyncMock
from aijuicer_sdk.context import AgentContext

async def test_my_handler(tmp_path):
    client = AsyncMock()
    ctx = AgentContext(
        task_id="t", workflow_id="w", step="requirement", attempt=1,
        input={"topic": "hi"}, artifact_root=str(tmp_path),
        request_id="req_test", client=client,
    )
    out = await handle(ctx, {"input": {"topic": "hi"}})
    assert out["features"] > 0
    assert (tmp_path / "02_requirement" / "requirements.md").exists()

SDK 自己的单测(sdk/tests/)就是这个模式。


完整示例:把 AI Idea 展开成需求

"""ai_requirement.py —— 流水线第二步 agent。"""
from __future__ import annotations

import json

from aijuicer_sdk import Agent, FatalError, RetryableError

agent = Agent(
    name="ai-requirement",
    step="requirement",
    concurrency=2,                 # 允许同时处理 2 个任务
    heartbeat_interval=20.0,       # 每 20 秒自动心跳
)


async def call_llm(prompt: str) -> str:
    """占位:真实场景改成你自己的 LLM 调用。"""
    import asyncio
    await asyncio.sleep(0.1)
    return f"# 需求文档\n\n输入:\n{prompt}\n\n..."


@agent.handler
async def handle(ctx, task):
    inp = task.get("input") or {}
    if "topic" not in inp:
        raise FatalError("input.topic 缺失,人工介入")

    await ctx.heartbeat("读取上游 idea")
    try:
        idea = ctx.load_artifact("idea", "idea.md").decode("utf-8")
    except FileNotFoundError as e:
        raise RetryableError(f"上游 idea 产物还没准备好: {e}") from e

    await ctx.heartbeat("调用 LLM 展开需求")
    try:
        md = await call_llm(f"topic={inp['topic']}\n\nidea:\n{idea}")
    except TimeoutError as e:
        raise RetryableError(f"LLM 超时: {e}") from e

    await ctx.save_artifact("requirements.md", md, content_type="text/markdown")
    await ctx.save_artifact(
        "summary.json",
        json.dumps({"source": "idea.md", "length": len(md)}, ensure_ascii=False),
        content_type="application/json",
    )
    ctx.log.info("requirement.done", length=len(md))
    return {"bytes": len(md)}


if __name__ == "__main__":
    agent.run()

启动:

AIJUICER_SERVER=http://127.0.0.1:8000 \
AIJUICER_REDIS_URL=redis://127.0.0.1:6379/0 \
python ai_requirement.py

触发一次完整流水线(需要 6 个 step 的 agent 都在跑):

# AIFinder 作为 "投喂者"——不是 SDK 的消费者,而是通过 HTTP 直接提交 workflow
python -m sdk.examples.ai_finder --topic "AI 代码审查助手" --auto
# 🧃 AIJuicer 接单:<wf_id> ← topic: AI 代码审查助手

然后在 Web UI http://127.0.0.1:3000 看流水线 6 个 step 依次变绿,每步的产物都出现在详情页。


目前的限制

限制 备注
只有 Python SDK Go / TypeScript SDK 是 v2 任务
依赖共享文件系统读产物 load_artifact 第一版走本地 FS;多机场景 M5+ 加 HTTP fallback
没有 pull 式 Admin API 想动态列出自己注册过的 agent 实例,只能查 /api/agents
单进程代理不做 PEL 扫描 Redis PEL 里被 claim 没 ack 的消息依赖 scheduler 的启动恢复兜底;未来在 SDK 侧加主动 XPENDING + XCLAIM
产物流式写尚未封装 巨大产物请手动写文件再调 create_artifact
不支持 Python 3.11 及以下 要求 Python 3.12+(用了 StrEnum、新版类型语法等)

欢迎在主仓库提 issue / PR。

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aijuicer_sdk-0.1.0.tar.gz (27.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

aijuicer_sdk-0.1.0-py3-none-any.whl (19.5 kB view details)

Uploaded Python 3

File details

Details for the file aijuicer_sdk-0.1.0.tar.gz.

File metadata

  • Download URL: aijuicer_sdk-0.1.0.tar.gz
  • Upload date:
  • Size: 27.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.9

File hashes

Hashes for aijuicer_sdk-0.1.0.tar.gz
Algorithm Hash digest
SHA256 f82de3fbf9fb97eac6098e527a838f927615539ffe2f25ca24b02f84a4aedb83
MD5 55c58855876c5ce3c519ad8cf28da68c
BLAKE2b-256 d886004f9d346fef3aed641a6883726d1f84e44d91cd8df1555947dd39619015

See more details on using hashes here.

File details

Details for the file aijuicer_sdk-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: aijuicer_sdk-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 19.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.9

File hashes

Hashes for aijuicer_sdk-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 0cea7d7dd54fce99fb8ee1e06f4ccb6970b5d10444aae15c8a1b1e7bef2726cd
MD5 09d6b260b9fd2481e21cde4a4d069592
BLAKE2b-256 00ffaa8f366672af672c91ff6b620dc1f1fa24194164aa1b39388bd2535e90b9

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page