Skip to main content

可扩展的任务调度通知框架:触发器 + 行为插件 + 事件总线,TOML 配置驱动

Project description

anotiflow

可扩展的任务调度通知框架:触发器 + 行为插件 + 事件总线,TOML 配置驱动,基于 UV 管理。

触发器(定时 / 事件)──▶ 任务 ──▶ 多个行为按序执行(飞书 / 钉钉 / 自定义 / 广播事件)
                                            │
                                            └─ bus.publish(...) ──▶ 事件触发其他任务

一个任务可以绑定 N 个触发器 + N 个行为,任意触发器命中即按顺序执行全部行为。通过 publish_event 行为或用户自定义函数向 EventBus 广播事件,实现任务之间的链式联动。

特性

  • 插件式设计 — 顶层 Action / Trigger 抽象基类,派生出通知基类 / 具体渠道;装饰器 @register_action("xxx") 即可注册新类型,TOML 自动识别
  • 两种触发器 — 定时(基于 schedule完整保留其原生灵活性:秒/分/时/天/周、每周一..周日、at 精确时刻、to 随机区间、until 截止时刻)+ 事件(进程内 EventBus 订阅);预留手动触发扩展点
  • 内置通知渠道 — 飞书 / 钉钉(基于 ipush),统一继承 NotifyAction,一个任务可多渠道同步发送
  • 配置即代码 — TOML 管理任务、触发器、行为参数;新增任务零代码
  • 自定义业务逻辑 — 用户写普通 Python 函数(fn(context) -> None),TOML 以 dotted path 引用;通常用于"定时检查 + 满足条件广播事件"的判断层
  • 链式联动EventBus.publish(event, payload)EventTrigger 配对,形成任务间事件链路
  • 工程细节 — loguru 日志、任务启用/禁用、行为级异常捕获、SIGINT/SIGTERM 优雅关闭

安装

需要 UV

git clone <this-repo>
cd anotiflow
uv sync

快速开始

uv run anotiflow --config examples/config.toml
# 可选:--log-level DEBUG

默认示例会每 5 秒随机模拟一次"股价检查",高于阈值即广播 stock.high 事件,触发飞书 / 钉钉通知。将 examples/config.toml 里的 token / secret 替换为真实值就能收到真通知;占位值会发送失败但不会让进程崩溃。

核心概念

Task

Task = name + enabled + [Trigger, ...] + [Action, ...]

任意一个触发器命中 → 按顺序执行所有行为。任一行为抛异常会被记录日志但不影响后续行为 / 其他任务。

Trigger(触发器)

类型 作用 关键字段
interval 定时,基于 schedule unit 必填;every / to / at / until 可选
event 订阅 EventBus 事件 event 事件名

intervalunit 取值覆盖 schedule 的全部灵活性:

  • seconds / secondminutes / minutehours / hourdays / dayweeks / week
  • 星期名:monday / tuesday / wednesday / thursday / friday / saturday / sunday

常见组合:

需求 TOML
每 5 秒 unit="seconds", every=5
每 5~10 秒随机 unit="seconds", every=5, to=10
每分钟的第 23 秒 unit="minute", at=":23"
每天 09:30 unit="day", at="09:30"
每周一 13:15 unit="monday", at="13:15"
每小时执行直到 18:30 unit="hour", until="18:30"

Action(行为)

抽象层次:Action(顶层)→ NotifyAction(通知基类)→ FeishuNotify / DingtalkNotify / ...

内置类型:

type 说明 关键字段
feishu 飞书群机器人 token, secret, message_template
dingtalk 钉钉群机器人 token, secret, title, message_template
publish_event 向 EventBus 广播事件(用于串联任务) event, [tasks.actions.payload]
custom 调用用户自定义函数 path = "module.func"

Context(行为执行时的上下文)

每次任务触发,框架会组装 context 字典传给每个 action。在 message_template / publish_event.payload 里用 {xxx} 引用:

字段 含义
{task_name} 任务名
{trigger_name} 触发来源描述,如 interval(every 5 seconds) / event(stock.high)
{trigger_type} interval / event
{fired_at} 触发时刻 YYYY-MM-DD HH:MM:SS
{trigger_payload} 完整业务载荷 dict
{trigger_payload[symbol]} 载荷中某字段

EventBus

进程内线程安全的发布/订阅单例:

from anotiflow.core.event_bus import bus
bus.publish("stock.high", {"symbol": "AAPL", "price": 107.6})

配置示例

# 任务 1:每 5 秒跑一次自定义业务检查
[[tasks]]
name = "check_stock_price"
enabled = true

[[tasks.triggers]]
type = "interval"
every = 5
unit = "seconds"

[[tasks.actions]]
type = "custom"
path = "examples.user_actions.check_stock_price"

# 任务 2:多触发器(定时 + 两个事件),按序发飞书 + 钉钉
[[tasks]]
name = "notify_with_multi_triggers"
enabled = true

[[tasks.triggers]]
type = "interval"
every = 12
unit = "seconds"

[[tasks.triggers]]
type = "event"
event = "stock.high"

[[tasks.triggers]]
type = "event"
event = "manual.fire"

[[tasks.actions]]
type = "feishu"
token = "xxxx"
secret = "yyyy"
message_template = """[{task_name}] 触发={trigger_name} @ {fired_at}
载荷={trigger_payload}"""

[[tasks.actions]]
type = "dingtalk"
token = "xxxx"
secret = "yyyy"
title = "anotiflow 通知"
message_template = "{trigger_payload[symbol]} = {trigger_payload[price]}"

# 任务 3:每天 09:30 广播事件(用 publish_event 串联)
[[tasks]]
name = "daily_morning_fire"
enabled = true

[[tasks.triggers]]
type = "interval"
unit = "day"
at = "09:30"

[[tasks.actions]]
type = "publish_event"
event = "manual.fire"

[tasks.actions.payload]
reason = "morning_cron@{fired_at}"

完整示例见 examples/config.toml

自定义业务行为

写一个普通 Python 函数,签名 fn(context: dict) -> None

# examples/user_actions.py
from anotiflow.core.event_bus import bus
from loguru import logger

def check_stock_price(context: dict) -> None:
    price = fetch_price("AAPL")
    logger.info(f"[{context['task_name']}] price={price} at {context['fired_at']}")
    if price > 100:
        bus.publish("stock.high", {"symbol": "AAPL", "price": price})

TOML 引用:

[[tasks.actions]]
type = "custom"
path = "examples.user_actions.check_stock_price"

模块解析路径:框架会把 CWD 与 config 文件所在目录都加入 sys.path,所以 examples.user_actions.check_stock_price 在项目根目录执行 uv run anotiflow 时能被正确加载。

扩展新渠道 / 新触发器

以新增企业微信通知为例:

# src/anotiflow/actions/wecom.py
from ipush import WeCom
from anotiflow.actions.notify_base import NotifyAction
from anotiflow.core.registry import register_action

@register_action("wecom")
class WeComNotify(NotifyAction):
    def __init__(self, token: str, message_template: str = "") -> None:
        super().__init__(message_template=message_template)
        self.name = "wecom"
        self._client = WeCom(token=token)

    def _send(self, message: str) -> None:
        self._client.send(message)

src/anotiflow/actions/init.pyimport 该模块触发 @register_action 装饰器副作用,之后 TOML 里 type = "wecom" 即可使用。

新增触发器同理:继承 Trigger + @register_trigger("your_type"),在 src/anotiflow/triggers/init.pyimport

项目结构

anotiflow/
├── pyproject.toml
├── src/anotiflow/
│   ├── cli.py                     # uv run anotiflow 入口
│   ├── __main__.py                # python -m anotiflow
│   ├── task.py                    # Task 数据类
│   ├── logging_setup.py           # loguru 初始化
│   ├── core/
│   │   ├── event_bus.py           # EventBus 单例
│   │   ├── registry.py            # 类型注册表
│   │   ├── loader.py              # TOML → Task[]
│   │   └── scheduler.py           # 主循环 + 优雅关闭
│   ├── triggers/
│   │   ├── base.py                # Trigger ABC
│   │   ├── interval.py            # 定时触发(schedule)
│   │   └── event.py               # 事件触发(EventBus 订阅)
│   └── actions/
│       ├── base.py                # Action ABC + CallableAction
│       ├── notify_base.py         # NotifyAction
│       ├── feishu.py              # 飞书通知
│       ├── dingtalk.py            # 钉钉通知
│       └── publish_event.py       # 广播事件行为
└── examples/
    ├── config.toml                # 配置示例(含 4 个任务)
    └── user_actions.py            # 自定义业务行为示例

运行

uv run anotiflow --config examples/config.toml
uv run anotiflow --config /path/to/your.toml --log-level DEBUG
# 等价写法
uv run python -m anotiflow --config examples/config.toml

Ctrl-CSIGTERM 会触发 Scheduler 解绑所有触发器并优雅退出。

依赖

  • schedule — 定时调度
  • ipush — 飞书 / 钉钉等推送渠道统一封装
  • loguru — 日志

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

anotiflow-0.1.2.tar.gz (13.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

anotiflow-0.1.2-py3-none-any.whl (21.8 kB view details)

Uploaded Python 3

File details

Details for the file anotiflow-0.1.2.tar.gz.

File metadata

  • Download URL: anotiflow-0.1.2.tar.gz
  • Upload date:
  • Size: 13.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.8 {"installer":{"name":"uv","version":"0.11.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for anotiflow-0.1.2.tar.gz
Algorithm Hash digest
SHA256 6be10d2ec7500d0af60ee0f757d9951de94dbefa599d7d01c4d0511f7119e41c
MD5 e8106cc6a3fca1402d27c56a52360415
BLAKE2b-256 203176a68f123a5afea9736d4ac38d01c01f53b46b56e4813beae612eaa20a18

See more details on using hashes here.

File details

Details for the file anotiflow-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: anotiflow-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 21.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.8 {"installer":{"name":"uv","version":"0.11.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for anotiflow-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 886cfa7c8681fa523987041951485bbae2dc3ef8077c9cd77e3c544061dbab89
MD5 221f47a332811459ff11fd1b78dad971
BLAKE2b-256 f0fcafe578b97fa8855d5928064a9c8a76f81b24b593e9a97bdc4f964e167363

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page