可扩展的任务调度通知框架:触发器 + 行为插件 + 事件总线 + Web 控制台 + 远程动作 SDK
Project description
anotiflow
可扩展的任务调度通知框架:触发器 + 行为插件 + 事件总线 + Web 控制台 + 远程动作 SDK,TOML 配置驱动,基于 UV 管理。
触发器(定时 / 事件 / API)──▶ 任务 ──▶ 多个行为按序执行(飞书 / 钉钉 / 自定义本地 / 自定义远程 / 广播事件)
│
├─ bus.publish(...) ──▶ 事件触发其他任务
└─ WebSocket ──▶ 远端 SDK handler 执行
一个任务可以绑定 N 个触发器 + N 个行为,任意触发器命中即按顺序执行全部行为。通过 publish_event 行为或用户自定义函数向 EventBus 广播事件,实现任务之间的链式联动。
特性
- 插件式设计 — 顶层
Action/Trigger抽象基类,装饰器@register_action("xxx")即可注册新类型,TOML 自动识别 - 三种触发器 — 定时(基于
schedule,完整保留其原生灵活性)+ 事件(进程内 EventBus 订阅)+ API webhook(自动签发 token,POST /trigger/<token>携带 payload 即触发) - 内置通知渠道 — 飞书 / 钉钉(基于
ipush) - 本地 + 远程自定义动作 — 本地走 dotted-path Python 函数;远程通过 WebSocket SDK,业务依赖跑在客户端 venv 里,与宿主完全解耦
- Web 控制台 — 单文件 HTML,可视化管理任务/触发器/动作;改动落盘 TOML,文件改动实时生效,两路等价
- 配置即代码 + 实时同步 — TOML 是事实源,watchdog 监听文件变化自动 rebind;UI 与文件双向同步、hash 抑制回环
- 链式联动 —
EventBus.publish(event, payload)与EventTrigger配对;远程动作回包成功还可经publish_on_success串入事件总线 - 统一令牌系统 — admin / action / trigger 三类 scope;
<config_dir>/.anotiflow/tokens.json持久化;为后续 MCP / Skill / CLI 接入预留 scope 扩展点 - 工程细节 — loguru 日志、任务启用/禁用、行为级异常捕获、SIGINT/SIGTERM 优雅关闭、UI 改动原子落盘、远程客户端断线指数回退重连
安装
需要 UV 与 Python ≥ 3.9。
git clone <this-repo>
cd anotiflow
uv sync
快速开始
# 任意空目录启动;不存在 config.toml 时会自动生成空模板与 admin token
mkdir my-anotiflow && cd my-anotiflow
uv run --project /path/to/anotiflow anotiflow
# 控制台输出会包含:
# auto-issued admin token: adm_xxxxxxxxxxxxxxxxxx
# web server starting at http://127.0.0.1:8765
打开浏览器访问 http://127.0.0.1:8765,登录页填入 admin token,即可可视化管理任务、触发器、动作。
也可以照旧用现成示例:
uv run anotiflow --config examples/config.toml
# 可选:--log-level DEBUG / --port 9000 / --no-web(仅引擎,不起 Web)
核心概念
Task
Task = name + enabled + [Trigger, ...] + [Action, ...]
任意一个触发器命中 → 按顺序执行所有行为。任一行为抛异常会被记录日志但不影响后续行为 / 其他任务。
Trigger(触发器)
| 类型 | 作用 | 关键字段 |
|---|---|---|
interval |
定时,基于 schedule |
unit 必填;every / to / at / until 可选 |
event |
订阅 EventBus 事件 | event 事件名 |
api |
webhook 接入,外部 POST 触发 | token(缺失时自动签发) |
interval 的 unit 取值覆盖 schedule 的全部灵活性:
seconds/second、minutes/minute、hours/hour、days/day、weeks/week- 星期名:
monday/tuesday/wednesday/thursday/friday/saturday/sunday
常见组合:
| 需求 | TOML |
|---|---|
| 每 5 秒 | unit="seconds", every=5 |
| 每 5~10 秒随机 | unit="seconds", every=5, to=10 |
| 每分钟的第 23 秒 | unit="minute", at=":23" |
| 每天 09:30 | unit="day", at="09:30" |
| 每周一 13:15 | unit="monday", at="13:15" |
| 每小时执行直到 18:30 | unit="hour", until="18:30" |
Action(行为)
抽象层次:Action(顶层)→ NotifyAction(通知基类)→ FeishuNotify / DingtalkNotify / RemoteActionProxy / ...
内置类型:
| type | 说明 | 关键字段 |
|---|---|---|
feishu |
飞书群机器人 | token, secret, message_template |
dingtalk |
钉钉群机器人 | token, secret, title, message_template |
publish_event |
向 EventBus 广播事件(用于串联任务) | event, [tasks.actions.payload] |
custom (本地) |
调用宿主 venv 内的 Python 函数 | path = "module.func" |
custom (远程) |
WebSocket 派发到远端 SDK 客户端 | remote = true, token, wait, timeout_seconds, offline_policy, publish_on_success |
Context(行为执行时的上下文)
行为模板可访问的命名空间(在 message_template / publish_event.payload 中用 {...} 引用):
| 字段 | 含义 |
|---|---|
{task.name} |
任务名 |
{task.config[xxx]} |
当前任务任意字段 |
{trigger.config[xxx]} |
命中触发器任意字段 |
{trigger.kind} |
interval / event / api |
{trigger.fired_at} |
命中时刻 YYYY-MM-DD HH:MM:SS |
{trigger.payload[xxx]} |
业务载荷字段(API/事件触发时来自 body/发布者;定时触发为 {}) |
Web 控制台
启动后默认监听 http://127.0.0.1:8765(可在 [server] 段或 --host/--port 覆盖)。
- 首次访问:日志中找到
auto-issued admin token: adm_xxx,登录页粘贴。 - 左栏:任务列表,点击 + 新建,圆点表示 enabled。
- 中栏:当前任务的触发器 / 动作表单编辑;动作类型选
custom+remote=true时会自动展示客户端 SDK 代码片段。 - 右栏:实时只读 TOML 预览。
- 顶部 Tokens:列出全部令牌、可吊销。
UI 改动通过 PUT /api/config 落盘;外部直接编辑 config.toml 也会被 watchdog 捕获并实时 rebind。两路等价。
⚠️ TOML 写回不保留注释(这是
tomli_w的限制)。如果你严重依赖配置文件里的注释,建议把它们拆到一个独立的README或保持只用 UI 编辑。
API 触发(webhook)
[[tasks.triggers]]
name = "incoming"
type = "api"
# token 不写时会在首次启动时自动签发并回写到本文件
外部触发:
curl -X POST http://127.0.0.1:8765/trigger/<token> \
-H 'Content-Type: application/json' \
-d '{"symbol":"AAPL","price":107.6}'
body 即 trigger.payload;模板 {trigger.payload[symbol]} 直接可用。
自定义动作
本地(与原版一致)
# user_actions.py
from anotiflow.core.event_bus import bus
def check_stock_price(context):
task, trigger = context["task"], context["trigger"]
...
bus.publish("stock.high", {...})
[[tasks.actions]]
type = "custom"
path = "user_actions.check_stock_price"
模块解析路径:CWD 与 config 文件所在目录都自动加入 sys.path。
远程(推荐:业务依赖跑在客户端进程)
宿主侧配置(token 不写就自动签发):
[[tasks.actions]]
type = "custom"
remote = true
wait = true # 阻塞等待客户端回包;false 即 fire-and-forget
timeout_seconds = 30
offline_policy = "queue" # queue 堆积 / drop 丢弃 / error 抛错
publish_on_success = "task.done" # 可选:回包 ok=true 时把 value 串入事件总线
客户端(任意机器、任意 venv,只需 pip install anotiflow):
from anotiflow import RemoteAction
action = RemoteAction("ws://anotiflow-host:8765", token="<上面 UI 复制的 token>")
@action.handler
def handle(ctx):
# ctx.task.name / ctx.task.config
# ctx.trigger.name / ctx.trigger.kind / ctx.trigger.config
# ctx.trigger.fired_at / ctx.trigger.payload
return {"ok": True} # 任意 JSON-friendly 值;通过 publish_on_success 流回事件总线
action.run() # 阻塞;自动断线指数回退重连
完整示例:examples/remote_client.py。
EventBus
进程内线程安全的发布/订阅单例:
from anotiflow.core.event_bus import bus
bus.publish("stock.high", {"symbol": "AAPL", "price": 107.6})
配置示例
完整示例见 examples/config.toml 与 examples/config.example.toml。
扩展新渠道 / 新触发器
新增企业微信通知:
# src/anotiflow/actions/wecom.py
from ipush import WeCom
from anotiflow.actions.notify_base import NotifyAction
from anotiflow.core.registry import register_action
@register_action("wecom")
class WeComNotify(NotifyAction):
def __init__(self, token: str, message_template: str = "") -> None:
super().__init__(message_template=message_template)
self.name = "wecom"
self._client = WeCom(token=token)
def _send(self, message: str) -> None:
self._client.send(message)
在 src/anotiflow/actions/init.py 里 import 该模块触发 @register_action 装饰器副作用。
新增触发器同理:继承 Trigger + @register_trigger("your_type"),在 src/anotiflow/triggers/init.py 里 import。
项目结构
anotiflow/
├── pyproject.toml
├── src/anotiflow/
│ ├── cli.py # uv run anotiflow 入口
│ ├── task.py # Task 数据类
│ ├── core/
│ │ ├── event_bus.py # EventBus 单例
│ │ ├── registry.py # 类型注册表
│ │ ├── loader.py # parse_raw + build_tasks(TOML → Task[])
│ │ ├── scheduler.py # 旧版主循环(已被 Engine 取代,文件保留)
│ │ ├── engine.py # 运行时核心:装配/绑定/热重载
│ │ ├── config_store.py # 配置单一事实源 + 文件 watcher + 原子落盘
│ │ ├── token_registry.py # 统一令牌签发/校验/吊销
│ │ ├── api_trigger_hub.py # API 触发器注册中心
│ │ └── remote_broker.py # 远程动作分发(WS 通道 + 派发表)
│ ├── triggers/
│ │ ├── base.py / interval.py / event.py
│ │ └── api.py # APITrigger
│ ├── actions/
│ │ ├── base.py / notify_base.py / feishu.py / dingtalk.py / publish_event.py
│ │ └── remote.py # RemoteActionProxy
│ ├── server/
│ │ ├── app.py # FastAPI 装配
│ │ ├── auth.py # admin token 中间件
│ │ ├── routes_config.py # GET/PUT 配置;细粒度 task CRUD
│ │ ├── routes_tokens.py # GET/POST/DELETE token
│ │ ├── routes_trigger_api.py # POST /trigger/{token}
│ │ ├── ws_remote_action.py # WS /ws/{scope}/{token}
│ │ └── ui/
│ │ ├── login.html # 极简登录页
│ │ └── index.html # 控制台单文件 UI
│ └── sdk/
│ ├── __init__.py # re-export RemoteAction
│ └── remote_action.py # 客户端 SDK
└── examples/
├── config.toml # 配置示例
├── user_actions.py # 本地自定义动作示例
└── remote_client.py # 远程自定义动作客户端示例
运行
uv run anotiflow # 当前目录 ./config.toml;不存在自动生成
uv run anotiflow --config /path/to/your.toml # 指定配置文件
uv run anotiflow --no-web # 仅引擎模式(兼容 0.2.x 老用法)
uv run anotiflow --host 0.0.0.0 --port 9000 # 覆盖 [server].host/port
uv run anotiflow --log-level DEBUG
# 等价写法
uv run python -m anotiflow
Ctrl-C 或 SIGTERM 会触发引擎解绑所有触发器、停掉 Web、并优雅退出。
依赖
- schedule — 定时调度
- ipush — 飞书 / 钉钉等推送渠道封装
- loguru — 日志
- fastapi + uvicorn — Web 服务
- websockets — 远程动作传输
- watchdog — 配置文件 watcher
- tomli_w — TOML 写回
升级 0.2 → 0.3 注意事项
- Python 下限:从 3.8 抬到 3.9(FastAPI 要求)。
- 配置兼容:旧的
interval/event触发器、feishu/dingtalk/publish_event/custom (path)动作配置完全兼容,无需改动即可升级。 - 新增:
[server]段(自动生成)、[[tasks.triggers]] type="api"、[[tasks.actions]] type="custom" remote=true。 - 令牌:admin / action / trigger token 都存放在
<config_dir>/.anotiflow/tokens.json,不要将该文件提交版本库。 - TOML 注释:UI 改动会通过
tomli_w写回,注释会丢失。建议把长注释放到 README 或者只通过 UI 编辑配置。
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file anotiflow-0.2.0.tar.gz.
File metadata
- Download URL: anotiflow-0.2.0.tar.gz
- Upload date:
- Size: 46.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.8 {"installer":{"name":"uv","version":"0.11.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
348344a09cb15d1b17451b6bfb11462f9deef55d3e4face270d333fc8c2acd12
|
|
| MD5 |
ba87d214777a21c17c0e7ab34f1ae763
|
|
| BLAKE2b-256 |
f867aeaee3a2e72070f2e9b94b12f3a37eaf295e6e70c5fab416ee1263505be9
|
File details
Details for the file anotiflow-0.2.0-py3-none-any.whl.
File metadata
- Download URL: anotiflow-0.2.0-py3-none-any.whl
- Upload date:
- Size: 64.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.8 {"installer":{"name":"uv","version":"0.11.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4b3699986f4204185ccd912c0450630ca78ec9836c2a2085d8514f6678999da3
|
|
| MD5 |
e02cc164441d77d4b266f9962fb3b72e
|
|
| BLAKE2b-256 |
8fc153c1daeb15cfae9447eb2bf90100b04c513b5984d541a0ab58040595280b
|