Infoman base library - A comprehensive toolkit for modern Python applications with dual ORM support
Project description
infomankit
现代化 Python/AI 服务脚手架与工具箱。封装了配置加载、日志、FastAPI 服务、LLM 调用、缓存、消息队列、加解密等常用能力,帮助你快速把 idea 变成可部署的生产级服务。
特性亮点
- 统一配置体系:
.env+config.py支持多环境加载,覆盖应用、数据库、缓存、LLM、MQ、向量库等关键参数。 - FastAPI 微服务基线:开箱即可运行的
infoman.service.app,内置 CORS、GZip、链路日志、中英文错误码、请求 ID、健康/监控接口。 - 灵活的 ORM 选择:支持
Tortoise ORM(简单易用)和SQLAlchemy 2.0(强大性能),可单独或同时使用。 - 异步基础设施:MySQL/PostgreSQL、Redis 缓存、Litellm、NATS、Qdrant/Milvus 的集成入口,易于按需扩展。
- AI/LLM 辅助:
infoman.llm.LLM提供问答、对话、流式输出、翻译、总结、代码审查等常用封装。 - 性能测试工具:内置标准化性能测试模块,支持定制化接口测试、精美 HTML 报告生成、多种接口类型评估标准。
- 实用工具集:日志系统、缓存/重试/计时装饰器、AES/RSA、异步 HTTP、文本结构化提取、Feishu Bot 等常用基建。
- 细粒度模块化:可单独安装
web、database、database-alchemy、llm、vector、messaging等 extra,仅引入所需依赖。
目录速览
infoman/
├── config/ # 环境变量加载与全局配置
├── llm/ # Litellm 包装,提供 Chat/Stream/API
├── performance/ # 性能测试模块(新增)
│ ├── config.py # 测试配置管理
│ ├── runner.py # 测试运行器
│ ├── reporter.py # HTML 报告生成
│ ├── standards.py # 性能标准定义
│ └── cli.py # 命令行工具
├── service/
│ ├── app.py # FastAPI Application 入口
│ ├── routers/ # 健康检查与监控 API
│ ├── core/ # 事件、响应、认证
│ ├── infrastructure/ # 数据库,消息队列
│ ├── exception/ # 错误码、异常处理
│ ├── middleware/ # Logging、RequestID、RateLimit、中间件基类
│ ├── models/ # Tortoise 模型基类 & Embedding 配置
│ └── utils/ # redis 缓存装饰器、解析/转换
└── utils/
├── log/ # Loguru 配置与上下文
├── decorators/ # cache、retry、timing 等装饰器
├── encryption/ # AES/RSA/ECC
├── http/ # aiohttp 客户端、请求信息提取
├── notification/ # 飞书机器人通知
└── text/ # JSON 结构提取等
快速开始
一键创建项目
# 安装 infomankit
pip install -U infomankit
# 创建新项目(自动生成标准目录结构)
infomancli init my-awesome-project
# 进入项目
cd my-awesome-project
# 安装依赖并运行
pip install -e .
cp .env.example .env
infoman-serve run main:app --reload
访问 http://localhost:8000 查看运行效果!
手动安装
# Python >= 3.11
pip install -U infomankit
# 基础 Web 服务
pip install -U "infomankit[web]"
# 完整功能(使用 Tortoise ORM,100% 向前兼容)
pip install -U "infomankit[full]"
# 完整功能增强版(同时支持 Tortoise + SQLAlchemy)
pip install -U "infomankit[full-enhanced]"
常用 extra 组合:
| Extra | 说明 |
|---|---|
web |
FastAPI/Granian/orjson |
database |
Tortoise ORM(默认) |
database-pro |
SQLAlchemy 2.0(高性能) |
cache |
Redis + fastapi-cache2 |
llm |
Litellm |
vector |
Qdrant |
messaging |
NATS |
full |
完整功能(使用 Tortoise) |
full-pro |
完整功能增强版(Tortoise + SQLAlchemy) |
本地开发推荐:
git clone https://github.com/yourusername/infoman-pykit.git
cd infomankit
pip install -e ".[dev,full]" # 安装所有依赖和 lint/test 工具
快速上手
1. 配置环境变量
创建 .env.dev,并设置 ENV=dev (默认 dev)。可根据 infoman/config/config.py 填写常用变量:
APP_NAME=Infoman Service
APP_HOST=0.0.0.0
APP_PORT=8808
MYSQL_HOST=127.0.0.1
MYSQL_DB=infoman
MYSQL_USER=root
MYSQL_PASSWORD=secret
REDIS_HOST=127.0.0.1
QDRANT_HOST=127.0.0.1
LLM_PROXY=litellm_proxy
JWT_SECRET_KEY=change-me
运行时会依次加载 .env 与 .env.{ENV},缺省值可在 config.py 中找到。
2. 启动 FastAPI 服务
export ENV=dev
uvicorn infoman.service.app:application --host ${APP_HOST:-0.0.0.0} --port ${APP_PORT:-8808} --reload
# or
python -m infoman.service.launch --mode gunicorn
应用启动后默认提供:
/api/health:健康检查,返回{code:200}。/api/monitor:进程 & 系统指标、环境信息。- 启动事件中会自动注册 MySQL、Redis 缓存、NATS、Qdrant 等(根据配置是否填写)。
3. 调用 LLM
import asyncio
from infoman.llm import LLM
async def main():
resp = await LLM.ask(
model="gpt-4o-mini",
prompt="请用一句话介绍 infoman-pykit。",
system_prompt="You are a concise assistant."
)
if resp.success:
print(resp.content, resp.total_tokens)
asyncio.run(main())
LLM.ask/chat/stream会自动补全LLM_PROXY前缀并返回 token 统计。LLM.quick_*返回字符串,LLM.translate/summarize/code_review内置常用 system prompt。
4. 使用 Redis 缓存装饰器
from pydantic import BaseModel
from infoman.service.utils.cache import redis_cache
class ConfigSchema(BaseModel):
key: str
value: str
class ConfigService:
@redis_cache(prefix="config", ttl=600)
async def get_config(self, request, key: str) -> ConfigSchema:
# request.app.state.redis_client 将被装饰器自动读取
...
返回值可以是 BaseModel、list[BaseModel] 或普通 dict,装饰器会自动序列化/反序列化。
5. 消息队列与事件路由
from infoman.service.infrastructure.mq.nats import event_router
@event_router.on("topic.user.created", queue="worker")
async def handle_user_created(msg, nats_cli):
payload = msg.data.decode()
...
# 启动时在 startup 事件中执行:
# await event_router.register(app.state.nats_client)
NATSClient 支持 publish/request/subscribe/close,并在 events.startup 中自动连接(配置 NATS_SERVER 后生效)。
日志与中间件
infoman.utils.log.logger基于 Loguru,自动创建多种文件(all/info/error/debug)并支持 JSON 日志、请求上下文(RequestID)。LoggingMiddleware:记录请求耗时、客户端信息;RequestIDMiddleware为每次请求注入X-Request-ID。RateLimitMiddleware:IP/用户/路径多策略限流,内存或 Redis 持久化。BaseMiddleware为自定义中间件提供 session / 处理耗时写入示例。
统一错误与响应
infoman.service.exception.error定义系统、请求、数据库、业务、安全、外部服务等错误码枚举,可中英文提示。AppException+handler.py将数据库、Pydantic、HTTP 异常统一转换为{code, message, details}。infoman.service.core.response.success/failed提供标准响应结构。
更多工具箱
- 装饰器:
retry(支持 async/sync 指数退避)、cache(内存缓存)、timing(执行耗时)。 - 加密:AES(自动填充/随机 IV)、RSA(4096/自定义序列化)。
- HTTP Client:
HttpAsyncClient支持表单/JSON/文件上传,返回HttpResult。 - 文本处理:
utils.text.extractor.extract_json_from_string可从非结构化文本中提取 JSON。 - 通知:
notification.feishu.RobotManager发送飞书机器人消息。 - Embedding 配置:
service.models.type.embed统一管理不同向量模型的维度/长度、集合命名。
配置清单速查
| 分类 | 重点变量 |
|---|---|
| 应用 | APP_NAME, APP_HOST, APP_PORT, APP_BASE_URI, APP_DEBUG |
| 安全 | JWT_SECRET_KEY, JWT_ALGORITHM, JWT_ACCESS_TOKEN_EXPIRE_MINUTES, OAUTH2_REDIRECT_URL |
| 数据库 | MYSQL_HOST, MYSQL_PORT, MYSQL_DB, MYSQL_USER, MYSQL_PASSWORD, MYSQL_TABLE_MODELS |
| 缓存 / Redis | REDIS_HOST, REDIS_PORT, REDIS_DB, REDIS_PASSWORD |
| 向量数据库 | QDRANT_HOST/API_KEY/HTTP_PORT/GRPC_PORT、MILVUS_HOST 等(Milvus 需实现 AsyncMilvusClient) |
| MQ | NATS_SERVER(逗号分隔多实例), NATS_NAME |
| LLM | LLM_PROXY(litellm 代理地址) |
| 日志 | LOG_LEVEL, LOG_FORMAT, LOG_DIR, LOG_RETENTION, LOG_ENABLE_* |
开发 & 测试
# Lint / 格式化
ruff check infoman
black infoman
isort infoman
# 类型检查
mypy infoman
# 测试
pytest
🔀 ORM 选择指南
从 v0.3.0 开始,infomankit 支持两种 ORM:
Tortoise ORM(默认)
适合:简单 CRUD、快速开发、学习成本低
from infoman.service.models.base import TimestampMixin
from tortoise import fields
class User(TimestampMixin):
name = fields.CharField(max_length=100)
# 直接使用
user = await User.create(name="Alice")
SQLAlchemy 2.0(高性能)
适合:复杂查询、高性能需求、工业级项目
from infoman.service.models.base import AlchemyBase, AlchemyTimestampMixin
from sqlalchemy import String
from sqlalchemy.orm import Mapped, mapped_column
class User(AlchemyBase, AlchemyTimestampMixin):
__tablename__ = "users"
name: Mapped[str] = mapped_column(String(100))
# 使用仓储模式
from infoman.service.models.base import create_repository
user_repo = create_repository(User)
user = await user_repo.create(name="Alice")
详细迁移指南: 👉 doc/MIGRATION_TO_SQLALCHEMY.md
📊 性能测试模块
infomankit 内置了标准化的性能测试工具,支持定制化接口测试和精美的 HTML 报告生成。
核心特性
- 标准化评估:内置 4 种接口类型(fast/normal/complex/heavy)的性能标准
- 定制化配置:支持 YAML 配置文件,灵活定义测试用例
- 高并发测试:基于 asyncio 的异步并发执行
- 详细统计:P50/P95/P99 响应时间、吞吐量、成功率等指标
- 精美报告:自动生成响应式 HTML 报告,色彩分级展示
- 认证支持:Bearer Token、Basic Auth 等多种认证方式
快速开始
1. 创建配置文件
# performance-test.yaml
project_name: "My API"
base_url: "http://localhost:8000"
# 并发配置
concurrent_users: 50
duration: 60 # 秒
# 测试用例
test_cases:
- name: "健康检查"
url: "/api/health"
method: "GET"
interface_type: "fast"
- name: "用户列表"
url: "/api/v1/users"
method: "GET"
interface_type: "normal"
params:
page: 1
page_size: 20
2. 运行测试
# 使用 Python 代码
python -c "
import asyncio
from infoman.performance import TestConfig, PerformanceTestRunner, HTMLReporter
async def test():
config = TestConfig.from_yaml('performance-test.yaml')
runner = PerformanceTestRunner(config)
results = await runner.run()
reporter = HTMLReporter(config)
reporter.generate(results)
asyncio.run(test())
"
# 或使用 Makefile
make perf-test
make perf-test-api
make perf-test-stress
3. 查看报告
测试完成后会生成精美的 HTML 报告,包含:
- 汇总指标(总请求数、成功率、平均响应时间、吞吐量)
- 每个接口的详细统计
- 响应时间百分位(P50/P95/P99)
- 性能评级和优化建议
- 错误信息汇总
性能标准
模块内置 4 种接口类型的标准:
| 接口类型 | 优秀 | 良好 | 可接受 | 较差 |
|---|---|---|---|---|
| 快速接口 (fast) | <10ms | <30ms | <50ms | <100ms |
| 一般接口 (normal) | <50ms | <100ms | <200ms | <500ms |
| 复杂接口 (complex) | <100ms | <200ms | <500ms | <1s |
| 重型接口 (heavy) | <200ms | <500ms | <1s | <3s |
更多文档
- 完整文档:infoman/performance/README.md
- 配置示例:examples/performance/
- 高级用法:examples/performance/advanced_example.py
🛠️ CLI 脚手架工具
infomankit 提供了 infomancli 命令行工具,帮助你快速生成标准化的项目结构。
基本用法
# 交互式创建项目
infomancli init
# 直接指定项目名
infomancli init my-project
# 在指定目录创建
infomancli init my-project --dir /path/to/workspace
生成的项目结构
生成的项目遵循 infoman/service 的标准架构:
my-project/
├── .env.example # 环境变量模板
├── .gitignore
├── README.md
├── main.py # FastAPI 应用入口
├── pyproject.toml
│
├── core/ # 核心业务逻辑
│ ├── auth.py # 认证授权
│ └── response.py # 标准响应模型
├── routers/ # API 路由
├── models/ # 数据模型
│ ├── entity/ # 数据库实体 (ORM)
│ ├── dto/ # 数据传输对象
│ └── schemas/ # Pydantic 验证模式
├── repository/ # 数据访问层
├── services/ # 业务逻辑服务
├── exception/ # 自定义异常
├── middleware/ # 自定义中间件
├── infrastructure/ # 基础设施
│ ├── database/ # 数据库连接
│ └── cache/ # 缓存管理
└── utils/ # 工具函数
├── cache/
└── parse/
快速体验
# 1. 创建项目
infomancli init demo-api
# 2. 进入并安装
cd demo-api
pip install -e .
# 3. 启动服务
cp .env.example .env
infoman-serve run main:app --reload
# 4. 访问 API 文档
open http://localhost:8000/docs
生成的项目包含:
- ✅ 完整的项目结构
- ✅ FastAPI 应用框架
- ✅ 环境变量配置
- ✅ 健康检查端点
- ✅ Git 配置
- ✅ 开发文档
License
MIT License © Infoman Contributors
📖 详细指南
本项目提供了详细的使用指南,帮助你快速上手各个功能模块。
指南目录
NATS 事件系统使用指南
📖 简介
infomankit 提供了基于 NATS 的事件驱动系统,支持自动事件处理器发现、装饰器注册和分布式部署。适用于微服务间通信、异步任务处理和事件驱动架构。
🚀 快速开始
1. 安装依赖
pip install "infomankit[messaging]"
2. 配置 NATS
在 .env 或 config/.env.{env} 中配置:
# NATS 服务器地址(支持多个)
NATS_SERVERS=["nats://localhost:4222"]
# 应用名称(可选)
APP_NAME=my_service
# 事件处理器目录(可选,默认 app.events)
MQ_EVENT_PACKAGE=app.events
MQ_EVENT_PATH=./app/events
3. 创建事件处理器目录
mkdir -p app/events
touch app/events/__init__.py
4. 定义事件处理器
创建 app/events/user_events.py:
from loguru import logger
from infoman.service.infrastructure.mq.nats.nats_event_router import event_router
@event_router.on("user.created")
async def on_user_created(msg, nats_cli):
"""处理用户创建事件"""
import json
data = json.loads(msg.data.decode())
user_id = data.get("user_id")
username = data.get("username")
logger.info(f"新用户创建: {username} (ID: {user_id})")
# 执行业务逻辑
# - 发送欢迎邮件
# - 初始化用户配置
# - 发布后续事件
# 发布后续事件
welcome_data = json.dumps({
"user_id": user_id,
"type": "welcome"
})
await nats_cli.publish("email.send", welcome_data.encode())
@event_router.on("user.updated", queue="user-processors")
async def on_user_updated(msg, nats_cli):
"""处理用户更新事件(使用队列组实现负载均衡)"""
import json
data = json.loads(msg.data.decode())
user_id = data.get("user_id")
logger.info(f"用户更新: ID={user_id}")
# 处理更新逻辑
5. 启动应用
from fastapi import FastAPI
from infoman.service.core.lifespan import lifespan
app = FastAPI(lifespan=lifespan)
# 事件处理器会自动加载和注册
启动后,系统会:
- 自动连接 NATS 服务器
- 扫描
app/events/目录 - 注册所有事件处理器
- 开始监听事件
6. 发布事件
from fastapi import APIRouter, Request
import json
router = APIRouter()
@router.post("/users")
async def create_user(request: Request):
"""创建用户并发布事件"""
# 创建用户逻辑
user_data = {
"user_id": 123,
"username": "alice"
}
# 发布事件
nats_client = request.app.state.nats_manager.client
await nats_client.publish(
"user.created",
json.dumps(user_data).encode()
)
return {"status": "ok", "user": user_data}
📚 核心概念
事件路由器 (EventRouter)
事件路由器负责管理事件订阅和分发。
from infoman.service.infrastructure.mq.nats.nats_event_router import event_router
@event_router.on(
subject="event.topic", # 事件主题
queue="worker-group" # 队列组(可选)
)
async def handler(msg, nats_cli):
"""事件处理函数
Args:
msg: NATS 消息对象
- msg.data: 消息内容 (bytes)
- msg.subject: 消息主题
- msg.reply: 回复主题(用于 request-reply)
nats_cli: NATS 客户端实例
"""
pass
队列组 (Queue Group)
队列组实现负载均衡,同一队列组内的多个订阅者会平均分配消息。
# 实例 A
@event_router.on("order.process", queue="order-workers")
async def process_order_a(msg, nats_cli):
pass
# 实例 B(相同队列组)
@event_router.on("order.process", queue="order-workers")
async def process_order_b(msg, nats_cli):
pass
# 每条消息只会被 A 或 B 其中一个处理
主题模式 (Subject Pattern)
NATS 支持通配符订阅:
# 精确匹配
@event_router.on("user.created")
# 单层通配符(*)
@event_router.on("user.*") # 匹配 user.created, user.updated 等
# 多层通配符(>)
@event_router.on("user.>") # 匹配 user.created, user.profile.updated 等
🎯 使用场景
1. 微服务间通信
用户服务发布事件:
# user_service/app/routes/users.py
@router.post("/users")
async def create_user(request: Request):
user = await create_user_in_db(...)
# 发布用户创建事件
await request.app.state.nats_manager.client.publish(
"user.created",
json.dumps({"user_id": user.id}).encode()
)
订单服务监听事件:
# order_service/app/events/user_events.py
@event_router.on("user.created")
async def init_user_wallet(msg, nats_cli):
"""为新用户初始化钱包"""
data = json.loads(msg.data.decode())
await create_wallet_for_user(data["user_id"])
2. 异步任务处理
# app/events/task_events.py
@event_router.on("task.heavy", queue="task-workers")
async def process_heavy_task(msg, nats_cli):
"""处理耗时任务"""
data = json.loads(msg.data.decode())
# 执行耗时操作
result = await process_video(data["video_url"])
# 发布完成事件
await nats_cli.publish(
"task.completed",
json.dumps({"task_id": data["task_id"], "result": result}).encode()
)
3. 事件链式处理
# app/events/order_events.py
@event_router.on("order.created")
async def handle_order_created(msg, nats_cli):
"""订单创建 -> 扣减库存"""
data = json.loads(msg.data.decode())
# 扣减库存
await reduce_inventory(data["items"])
# 发布库存扣减成功事件
await nats_cli.publish("inventory.reduced", msg.data)
@event_router.on("inventory.reduced")
async def handle_inventory_reduced(msg, nats_cli):
"""库存扣减 -> 创建支付订单"""
data = json.loads(msg.data.decode())
# 创建支付订单
payment = await create_payment(data["order_id"])
# 发布支付创建事件
await nats_cli.publish(
"payment.created",
json.dumps({"order_id": data["order_id"], "payment_id": payment.id}).encode()
)
@event_router.on("payment.created")
async def handle_payment_created(msg, nats_cli):
"""支付创建 -> 发送通知"""
data = json.loads(msg.data.decode())
# 发送支付通知
await send_payment_notification(data["order_id"])
4. Request-Reply 模式
# 请求端
@router.get("/user/{user_id}/profile")
async def get_user_profile(request: Request, user_id: int):
"""通过 NATS 请求用户信息"""
nats_client = request.app.state.nats_manager.client
# 发送请求并等待响应(超时 5 秒)
response = await nats_client.request(
"user.get",
json.dumps({"user_id": user_id}).encode(),
timeout=5
)
user_data = json.loads(response.data.decode())
return user_data
# 响应端
@event_router.on("user.get")
async def handle_user_get(msg, nats_cli):
"""处理用户查询请求"""
data = json.loads(msg.data.decode())
user = await get_user_from_db(data["user_id"])
# 回复请求
if msg.reply:
await nats_cli.publish(
msg.reply,
json.dumps(user).encode()
)
🔧 高级用法
自定义事件目录
# 方式1: 通过配置
# .env
MQ_EVENT_PACKAGE=myapp.custom_events
MQ_EVENT_PATH=./myapp/custom_events
# 方式2: 手动注册
from infoman.service.utils.module_loader import register_event_handlers
from infoman.service.infrastructure.mq.nats.nats_event_router import event_router
async def custom_startup(app):
# 加载自定义事件处理器
register_event_handlers(
package="myapp.custom_events",
folder="./myapp/custom_events"
)
# 注册到 NATS
nats_client = app.state.nats_manager.client
await event_router.register(nats_client=nats_client)
错误处理
@event_router.on("order.process")
async def handle_order(msg, nats_cli):
"""带错误处理的事件处理器"""
try:
data = json.loads(msg.data.decode())
await process_order(data)
except json.JSONDecodeError as e:
logger.error(f"JSON 解析失败: {e}")
# 发布错误事件
await nats_cli.publish(
"order.error",
json.dumps({"error": "invalid_json", "raw": msg.data.decode()}).encode()
)
except Exception as e:
logger.error(f"订单处理失败: {e}", exc_info=True)
# 发布失败事件用于重试
await nats_cli.publish("order.retry", msg.data)
消息验证
from pydantic import BaseModel, ValidationError
class UserCreatedEvent(BaseModel):
user_id: int
username: str
email: str
@event_router.on("user.created")
async def handle_user_created(msg, nats_cli):
"""使用 Pydantic 验证消息"""
try:
data = json.loads(msg.data.decode())
event = UserCreatedEvent(**data)
# 使用验证后的数据
logger.info(f"新用户: {event.username} ({event.email})")
except ValidationError as e:
logger.error(f"事件数据验证失败: {e}")
except json.JSONDecodeError as e:
logger.error(f"JSON 解析失败: {e}")
📊 监控和调试
健康检查
from fastapi import APIRouter
router = APIRouter()
@router.get("/health/nats")
async def nats_health(request: Request):
"""NATS 健康检查"""
manager = request.app.state.nats_manager
if not manager.is_available:
return {"status": "unhealthy", "error": "NATS 未连接"}
return {
"status": "healthy",
"connected": manager.client.connected,
"servers": manager.client.servers
}
事件日志
@event_router.on("important.event")
async def handle_important_event(msg, nats_cli):
"""记录详细的事件处理日志"""
logger.info(
f"收到事件",
extra={
"event": "important.event",
"subject": msg.subject,
"size": len(msg.data),
"has_reply": msg.reply is not None
}
)
# 处理逻辑...
logger.info("事件处理完成", extra={"event": "important.event"})
🎨 最佳实践
1. 事件命名规范
# ✅ 推荐:使用层级结构
"user.created"
"user.updated"
"user.deleted"
"order.created"
"order.paid"
"order.cancelled"
# ❌ 不推荐:扁平命名
"user_created"
"create_user"
2. 目录组织
app/
├── events/
│ ├── __init__.py
│ ├── user_events.py # 用户相关事件
│ ├── order_events.py # 订单相关事件
│ ├── payment_events.py # 支付相关事件
│ └── _base.py # 基类和工具(不会被加载)
3. 消息格式
# ✅ 推荐:使用 JSON 并包含元数据
{
"event_id": "evt_123",
"timestamp": "2025-01-28T10:00:00Z",
"source": "user-service",
"data": {
"user_id": 123,
"username": "alice"
}
}
# 发布事件
event = {
"event_id": str(uuid.uuid4()),
"timestamp": datetime.utcnow().isoformat(),
"source": settings.APP_NAME,
"data": user_data
}
await nats_cli.publish("user.created", json.dumps(event).encode())
4. 幂等性
@event_router.on("payment.process")
async def process_payment(msg, nats_cli):
"""确保幂等性"""
data = json.loads(msg.data.decode())
payment_id = data["payment_id"]
# 检查是否已处理
if await is_payment_processed(payment_id):
logger.info(f"支付 {payment_id} 已处理,跳过")
return
# 处理支付
await do_process_payment(payment_id)
# 标记已处理
await mark_payment_processed(payment_id)
🐛 故障排查
问题:事件处理器未被加载
检查清单:
- 确认目录结构正确:
app/events/__init__.py - 文件名不要以下划线开头
- 确认 NATS 配置正确:
NATS_SERVERS - 查看启动日志中的加载信息
问题:事件未被触发
检查清单:
- 确认 NATS 服务器运行正常
- 检查 subject 名称是否正确
- 确认消息已成功发布
- 查看事件处理器是否有异常
问题:多实例重复处理
解决方案: 使用队列组实现负载均衡:
@event_router.on("task.process", queue="task-workers")
async def process_task(msg, nats_cli):
# 同一队列组内只有一个实例处理
pass
📖 参考资料
快速参考
# 基础事件处理器
@event_router.on("event.topic")
async def handler(msg, nats_cli):
data = json.loads(msg.data.decode())
# 处理逻辑
# 队列组(负载均衡)
@event_router.on("event.topic", queue="workers")
# 发布事件
await nats_cli.publish("event.topic", json.dumps(data).encode())
# Request-Reply
response = await nats_cli.request("service.query", data, timeout=5)
# 通配符订阅
@event_router.on("user.*") # 单层通配符
@event_router.on("user.>") # 多层通配符
ORM 数据库操作指南
infomankit 提供了灵活的 ORM 抽象层,支持 Tortoise ORM 和 SQLAlchemy 2.0 两种后端,可无缝切换,统一的 API 设计让代码迁移零成本。
📋 目录
快速开始
1. 安装依赖
# Tortoise ORM (默认)
pip install infomankit[database]
# SQLAlchemy 2.0
pip install infomankit[database-alchemy]
2. 配置数据库
创建 .env 文件:
# 选择 ORM 后端
USE_PRO_ORM=false # false=Tortoise, true=SQLAlchemy
# MySQL 配置
MYSQL_ENABLED=true
MYSQL_HOST=localhost
MYSQL_PORT=3306
MYSQL_DB=myapp
MYSQL_USER=root
MYSQL_PASSWORD=secret
MYSQL_MODELS=user,product # 模型名称(逗号分隔)
3. 定义模型
创建 infoman/models/user.py:
from infoman.service.models.base import TimestampMixin
from tortoise import fields
class User(TimestampMixin):
"""用户模型"""
class Meta:
table = "users"
name = fields.CharField(max_length=100, description="用户名")
email = fields.CharField(max_length=255, unique=True, description="邮箱")
is_active = fields.BooleanField(default=True, description="是否激活")
4. 使用模型
# 创建用户
user = await User.create(name="Alice", email="alice@example.com")
# 查询用户
user = await User.get(id=1)
users = await User.filter(is_active=True).all()
# 更新用户
user.name = "Alice Wang"
await user.save()
# 删除用户
await user.delete()
安装
基础安装
# Tortoise ORM (轻量级,推荐新项目)
pip install infomankit[database]
# SQLAlchemy 2.0 (企业级,适合复杂查询)
pip install infomankit[database-alchemy]
# 同时安装两者(支持运行时切换)
pip install infomankit[database,database-alchemy]
数据库驱动
根据使用的数据库安装对应驱动:
# MySQL
pip install asyncmy aiomysql
# PostgreSQL
pip install asyncpg
# SQLite (Python 内置,无需额外安装)
核心概念
双 ORM 支持
infomankit 支持两种 ORM 后端,可通过配置自由切换:
| 特性 | Tortoise ORM | SQLAlchemy 2.0 |
|---|---|---|
| 性能 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ |
| 易用性 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ |
| 功能完整性 | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
| 生态系统 | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
| 适用场景 | 新项目、轻量级应用 | 企业级、复杂查询 |
抽象层设计
用户代码
↓
BaseRepository (抽象接口)
↓
├── TortoiseRepository (Tortoise 实现)
└── SQLAlchemyRepository (SQLAlchemy 实现)
优势:
- 统一 API,切换 ORM 无需修改业务代码
- 支持多数据库连接(MySQL + PostgreSQL + SQLite)
- 自动健康检查和连接池管理
模型定义
Tortoise ORM 模型
from infoman.service.models.base import TimestampMixin
from tortoise import fields
class User(TimestampMixin):
"""
用户模型 (Tortoise ORM)
继承 TimestampMixin 自动获得:
- id: 主键
- created_at: 创建时间
- updated_at: 更新时间
"""
class Meta:
table = "users"
app = "mysql_models" # 对应数据库连接
# 基础字段
name = fields.CharField(max_length=100, description="用户名")
email = fields.CharField(max_length=255, unique=True, description="邮箱")
age = fields.IntField(null=True, description="年龄")
# 布尔字段
is_active = fields.BooleanField(default=True, description="是否激活")
is_superuser = fields.BooleanField(default=False, description="是否管理员")
# JSON 字段
metadata = fields.JSONField(default={}, description="元数据")
# 文本字段
bio = fields.TextField(null=True, description="个人简介")
class Product(TimestampMixin):
"""产品模型"""
class Meta:
table = "products"
name = fields.CharField(max_length=200, description="产品名")
price = fields.DecimalField(max_digits=10, decimal_places=2, description="价格")
stock = fields.IntField(default=0, description="库存")
# 外键关系
category_id = fields.IntField(description="分类ID")
SQLAlchemy 2.0 模型
from infoman.service.models.base import AlchemyBase, AlchemyTimestampMixin
from sqlalchemy import String, Integer, Boolean, Text, JSON
from sqlalchemy.orm import Mapped, mapped_column
class User(AlchemyBase, AlchemyTimestampMixin):
"""
用户模型 (SQLAlchemy 2.0)
使用 Mapped 类型注解 (Python 3.9+)
"""
__tablename__ = "users"
# 必填字段
name: Mapped[str] = mapped_column(String(100), comment="用户名")
email: Mapped[str] = mapped_column(String(255), unique=True, comment="邮箱")
# 可选字段
age: Mapped[int | None] = mapped_column(Integer, comment="年龄")
bio: Mapped[str | None] = mapped_column(Text, comment="个人简介")
# 默认值
is_active: Mapped[bool] = mapped_column(Boolean, default=True, comment="是否激活")
# JSON 字段
metadata: Mapped[dict] = mapped_column(JSON, default={}, comment="元数据")
class Product(AlchemyBase, AlchemyTimestampMixin):
"""产品模型"""
__tablename__ = "products"
name: Mapped[str] = mapped_column(String(200), comment="产品名")
price: Mapped[float] = mapped_column(comment="价格")
stock: Mapped[int] = mapped_column(default=0, comment="库存")
category_id: Mapped[int] = mapped_column(comment="分类ID")
字段类型对照表
| 数据类型 | Tortoise ORM | SQLAlchemy 2.0 |
|---|---|---|
| 整数 | IntField() |
Mapped[int] |
| 字符串 | CharField(max_length=N) |
Mapped[str] = mapped_column(String(N)) |
| 文本 | TextField() |
Mapped[str] = mapped_column(Text) |
| 布尔 | BooleanField() |
Mapped[bool] |
| 浮点数 | FloatField() |
Mapped[float] |
| 小数 | DecimalField() |
Mapped[Decimal] |
| JSON | JSONField() |
Mapped[dict] = mapped_column(JSON) |
| 日期时间 | DatetimeField() |
Mapped[datetime] |
仓储模式
基础 CRUD 操作
使用仓储模式统一管理数据库操作:
from infoman.service.models.base import create_repository
# 创建仓储实例(自动检测 ORM 后端)
user_repo = create_repository(User)
# 1. 创建 (Create)
user = await user_repo.create(
name="Alice",
email="alice@example.com",
age=25
)
print(f"创建成功: User(id={user.id})")
# 2. 读取 (Read)
# 根据 ID 获取
user = await user_repo.get(1)
# 根据条件筛选
active_users = await user_repo.filter(is_active=True)
# 获取所有记录
all_users = await user_repo.all()
# 3. 更新 (Update)
updated_user = await user_repo.update(
id=1,
name="Alice Wang",
age=26
)
# 4. 删除 (Delete)
success = await user_repo.delete(1)
# 5. 统计 (Count)
count = await user_repo.count(is_active=True)
print(f"活跃用户数: {count}")
直接使用模型(Tortoise ORM)
from infoman.models.user import User
# 创建
user = await User.create(name="Bob", email="bob@example.com")
# 查询单条
user = await User.get(id=1)
user = await User.get_or_none(email="bob@example.com")
# 查询多条
users = await User.filter(is_active=True).all()
users = await User.filter(age__gte=18).order_by('-created_at').limit(10)
# 更新
user.name = "Bob Smith"
await user.save()
# 批量更新
await User.filter(is_active=False).update(is_active=True)
# 删除
await user.delete()
# 批量删除
await User.filter(age__lt=18).delete()
# 统计
count = await User.filter(is_active=True).count()
# 聚合
from tortoise.functions import Count, Sum
result = await User.annotate(count=Count('id')).values('is_active', 'count')
直接使用 Session(SQLAlchemy 2.0)
from infoman.service.infrastructure.db_relation.manager_pro import get_db_session
from sqlalchemy import select, update, delete
from fastapi import Depends
@app.get("/users")
async def get_users(session = Depends(get_db_session)):
"""FastAPI 路由中使用数据库会话"""
# 查询
stmt = select(User).where(User.is_active == True)
result = await session.execute(stmt)
users = result.scalars().all()
# 创建
new_user = User(name="Charlie", email="charlie@example.com")
session.add(new_user)
await session.commit()
await session.refresh(new_user)
# 更新
stmt = update(User).where(User.id == 1).values(name="Charlie Brown")
await session.execute(stmt)
await session.commit()
# 删除
stmt = delete(User).where(User.id == 1)
await session.execute(stmt)
await session.commit()
return {"users": [u.name for u in users]}
数据库管理
配置多数据库
# .env 文件
# MySQL 主数据库
MYSQL_ENABLED=true
MYSQL_HOST=localhost
MYSQL_PORT=3306
MYSQL_DB=app
MYSQL_USER=root
MYSQL_PASSWORD=secret
MYSQL_MODELS=user,product
# PostgreSQL 分析数据库
POSTGRES_ENABLED=true
POSTGRES_HOST=localhost
POSTGRES_PORT=5432
POSTGRES_DB=analytics
POSTGRES_USER=postgres
POSTGRES_PASSWORD=secret
POSTGRES_MODELS=log,metric
# SQLite 缓存数据库
SQLITE_ENABLED=true
SQLITE_DB=./cache.db
SQLITE_MODELS=cache
启动数据库管理器
from fastapi import FastAPI
from infoman.service.infrastructure.db_relation.manager import db_manager
# 或使用 SQLAlchemy 版本
# from infoman.service.infrastructure.db_relation.manager_pro import db_manager
# 方式 1: 使用 lifespan (推荐)
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
# 启动时初始化数据库
await db_manager.startup(app)
yield
# 关闭时清理连接
await db_manager.shutdown()
app = FastAPI(lifespan=lifespan)
# 方式 2: 使用事件钩子
@app.on_event("startup")
async def startup_event():
await db_manager.startup(app)
@app.on_event("shutdown")
async def shutdown_event():
await db_manager.shutdown()
健康检查
from fastapi import FastAPI, Request
@app.get("/health/database")
async def check_database(request: Request):
"""数据库健康检查"""
# 检查所有数据库
health = await db_manager.health_check()
# 检查指定数据库
mysql_health = await db_manager.health_check("mysql")
return health
# 响应示例:
# {
# "status": "healthy",
# "name": "database",
# "details": {
# "connections": {
# "mysql": {
# "status": "healthy",
# "type": "mysql",
# "database": "app",
# "pool": {"size": 5, "free": 3}
# }
# },
# "count": 1
# }
# }
获取数据库统计
@app.get("/stats/database")
async def get_database_stats():
"""获取数据库连接统计"""
stats = await db_manager.get_stats()
return stats
高级用法
事务处理
Tortoise ORM:
from tortoise.transactions import in_transaction
async def transfer_money(from_user_id: int, to_user_id: int, amount: float):
"""转账操作(事务)"""
async with in_transaction() as conn:
# 扣款
from_user = await User.get(id=from_user_id).using_db(conn)
from_user.balance -= amount
await from_user.save(using_db=conn)
# 加款
to_user = await User.get(id=to_user_id).using_db(conn)
to_user.balance += amount
await to_user.save(using_db=conn)
# 记录日志
await TransferLog.create(
from_user_id=from_user_id,
to_user_id=to_user_id,
amount=amount,
using_db=conn
)
SQLAlchemy 2.0:
async def transfer_money(session, from_user_id: int, to_user_id: int, amount: float):
"""转账操作(事务)"""
try:
# SQLAlchemy session 自动管理事务
from_user = await session.get(User, from_user_id)
from_user.balance -= amount
to_user = await session.get(User, to_user_id)
to_user.balance += amount
log = TransferLog(
from_user_id=from_user_id,
to_user_id=to_user_id,
amount=amount
)
session.add(log)
await session.commit()
except Exception:
await session.rollback()
raise
复杂查询
Tortoise ORM:
from tortoise.expressions import Q, F
from tortoise.functions import Count, Avg
# Q 对象(OR 查询)
users = await User.filter(
Q(age__gte=18) | Q(is_superuser=True)
).all()
# F 对象(字段比较)
users = await User.filter(
age__gte=F('min_age')
).all()
# 关联查询
users = await User.filter(
orders__total__gte=1000
).prefetch_related('orders')
# 聚合查询
stats = await User.annotate(
order_count=Count('orders')
).filter(
order_count__gte=10
).values('name', 'order_count')
# 分组统计
avg_age = await User.all().annotate(
avg_age=Avg('age')
).group_by('is_active').values('is_active', 'avg_age')
SQLAlchemy 2.0:
from sqlalchemy import select, func, and_, or_
# OR 查询
stmt = select(User).where(
or_(User.age >= 18, User.is_superuser == True)
)
result = await session.execute(stmt)
users = result.scalars().all()
# 聚合查询
stmt = select(
User.is_active,
func.count(User.id).label('count'),
func.avg(User.age).label('avg_age')
).group_by(User.is_active)
result = await session.execute(stmt)
stats = result.all()
# JOIN 查询
stmt = select(User, Order).join(Order).where(Order.total >= 1000)
result = await session.execute(stmt)
原生 SQL
Tortoise ORM:
from tortoise import Tortoise
# 执行原生 SQL
conn = Tortoise.get_connection("default")
results = await conn.execute_query_dict(
"SELECT * FROM users WHERE age > ?",
[18]
)
SQLAlchemy 2.0:
from sqlalchemy import text
# 执行原生 SQL
stmt = text("SELECT * FROM users WHERE age > :age")
result = await session.execute(stmt, {"age": 18})
rows = result.fetchall()
批量操作
# 批量创建(Tortoise)
users = await User.bulk_create([
User(name="User1", email="user1@example.com"),
User(name="User2", email="user2@example.com"),
])
# 批量创建(SQLAlchemy)
session.add_all([
User(name="User1", email="user1@example.com"),
User(name="User2", email="user2@example.com"),
])
await session.commit()
# 批量更新(Tortoise)
await User.filter(is_active=False).update(is_active=True)
# 批量更新(SQLAlchemy)
stmt = update(User).where(User.is_active == False).values(is_active=True)
await session.execute(stmt)
await session.commit()
最佳实践
1. 选择合适的 ORM
# ✅ 新项目、简单 CRUD:使用 Tortoise ORM
USE_PRO_ORM=false
# ✅ 企业级、复杂查询:使用 SQLAlchemy
USE_PRO_ORM=true
2. 模型设计
# ✅ 推荐:清晰的字段注释
class User(TimestampMixin):
name = fields.CharField(max_length=100, description="用户名")
email = fields.CharField(max_length=255, unique=True, description="邮箱")
class Meta:
table = "users"
indexes = [("email",)] # 添加索引
# ❌ 不推荐:没有注释和索引
class User(TimestampMixin):
name = fields.CharField(max_length=100)
email = fields.CharField(max_length=255)
3. 使用仓储模式
# ✅ 推荐:使用仓储模式(便于测试和切换 ORM)
user_repo = create_repository(User)
user = await user_repo.get(1)
# ⚠️ 可用:直接使用模型(紧耦合到 ORM)
user = await User.get(id=1)
4. 错误处理
from tortoise.exceptions import DoesNotExist, IntegrityError
async def get_user_safe(user_id: int):
"""安全的用户查询"""
try:
user = await User.get(id=user_id)
return user
except DoesNotExist:
logger.warning(f"用户不存在: {user_id}")
return None
except Exception as e:
logger.error(f"查询失败: {e}")
raise
5. 性能优化
# ✅ 推荐:使用 prefetch_related 避免 N+1 查询
users = await User.all().prefetch_related('orders')
# ❌ 不推荐:N+1 查询
users = await User.all()
for user in users:
orders = await user.orders.all() # 每次都查询
# ✅ 推荐:使用分页
users = await User.all().limit(20).offset(0)
# ✅ 推荐:只查询需要的字段
users = await User.all().values('id', 'name', 'email')
6. 数据库迁移
使用 Aerich(Tortoise ORM)或 Alembic(SQLAlchemy)管理数据库迁移:
# Tortoise ORM - Aerich
pip install aerich
aerich init -t infoman.config.TORTOISE_ORM
aerich init-db
aerich migrate
aerich upgrade
# SQLAlchemy - Alembic
pip install alembic
alembic init migrations
alembic revision --autogenerate -m "create users table"
alembic upgrade head
常见问题
Q: 如何在 Tortoise 和 SQLAlchemy 之间切换?
A: 修改配置文件中的 USE_PRO_ORM 参数:
# 使用 Tortoise ORM
USE_PRO_ORM=false
# 使用 SQLAlchemy 2.0
USE_PRO_ORM=true
如果使用仓储模式,业务代码无需修改。
Q: 如何连接多个数据库?
A: 在 .env 中启用多个数据库配置,每个模型指定对应的连接:
# Tortoise ORM
class User(TimestampMixin):
class Meta:
app = "mysql_models" # 使用 MySQL
class Log(TimestampMixin):
class Meta:
app = "postgres_models" # 使用 PostgreSQL
Q: 性能如何优化?
A: 主要优化点:
- 使用连接池(自动配置)
- 添加数据库索引
- 避免 N+1 查询(使用 prefetch_related)
- 使用批量操作
- 只查询需要的字段
Q: 如何进行数据库迁移?
A:
- Tortoise ORM: 使用 Aerich
- SQLAlchemy: 使用 Alembic
详见上面的"数据库迁移"章节。
参考资料
定时任务调度器使用指南
infomankit 提供了完整的定时任务调度功能,基于 APScheduler 实现,支持自动任务发现、分布式锁和多种触发器。
📋 目录
快速开始
1. 安装依赖
pip install infomankit[scheduler]
2. 创建任务目录
mkdir -p app/tasks
touch app/tasks/__init__.py
3. 定义任务
创建 app/tasks/daily_tasks.py:
from loguru import logger
from infoman.service.infrastructure.scheduler import cron, distributed_task
@cron(hour=0, minute=0) # 每天 00:00 执行
@distributed_task(timeout=600) # 使用分布式锁
async def daily_cleanup():
"""每日清理任务"""
logger.info("开始每日清理...")
# 你的业务逻辑
logger.success("清理完成")
4. 启动应用
from fastapi import FastAPI
from infoman.service.core.lifespan import lifespan
app = FastAPI(lifespan=lifespan) # 自动加载并启动调度器
安装
基础安装
# 只安装调度器
pip install infomankit[scheduler]
# 完整安装(包含所有功能)
pip install infomankit[full]
可选依赖
如果需要分布式锁功能,还需要安装 Redis:
pip install infomankit[scheduler,cache]
基础用法
Cron 表达式任务
from infoman.service.infrastructure.scheduler import cron
# 每天特定时间
@cron(hour=8, minute=30)
async def morning_task():
"""每天 08:30 执行"""
pass
# 每小时执行
@cron(minute=0)
async def hourly_task():
"""每小时整点执行"""
pass
# 每 N 分钟
@cron(minute='*/15')
async def frequent_task():
"""每 15 分钟执行"""
pass
# 工作日任务
@cron(day_of_week='mon-fri', hour=9, minute=0)
async def workday_task():
"""工作日 09:00 执行"""
pass
# 每月第一天
@cron(day=1, hour=0, minute=0)
async def monthly_task():
"""每月 1 号 00:00 执行"""
pass
间隔任务
from infoman.service.infrastructure.scheduler import interval
# 每 N 秒/分钟/小时
@interval(seconds=30)
async def frequent_check():
"""每 30 秒执行"""
pass
@interval(minutes=5)
async def regular_check():
"""每 5 分钟执行"""
pass
@interval(hours=1)
async def hourly_sync():
"""每小时执行"""
pass
一次性任务
from infoman.service.infrastructure.scheduler import scheduled_task
@scheduled_task(trigger='date', run_date='2025-12-31 23:59:59')
async def new_year_task():
"""指定时间执行一次"""
pass
装饰器详解
@cron - Cron 表达式任务
@cron(
job_id='my_task', # 任务ID(唯一标识)
name='我的任务', # 任务名称(用于展示)
year=None, # 年份
month=None, # 月份 (1-12)
day=None, # 日期 (1-31)
week=None, # ISO 周数 (1-53)
day_of_week=None, # 星期 (0-6 或 mon,tue,wed,thu,fri,sat,sun)
hour=None, # 小时 (0-23)
minute=None, # 分钟 (0-59)
second=None, # 秒 (0-59)
)
示例:
# 每天 8:30
@cron(hour=8, minute=30)
# 每小时的第 0 和 30 分
@cron(minute='0,30')
# 每 10 分钟
@cron(minute='*/10')
# 工作日 9-17 点,每小时
@cron(day_of_week='mon-fri', hour='9-17', minute=0)
# 每月1号和15号的 00:00
@cron(day='1,15', hour=0, minute=0)
@interval - 间隔任务
@interval(
job_id='my_interval_task',
name='间隔任务',
weeks=0, # 周
days=0, # 天
hours=0, # 小时
minutes=0, # 分钟
seconds=0, # 秒
)
示例:
# 每 30 秒
@interval(seconds=30)
# 每 5 分钟
@interval(minutes=5)
# 每 2 小时 30 分钟
@interval(hours=2, minutes=30)
# 每天
@interval(days=1)
@scheduled_task - 通用任务
@scheduled_task(
trigger='cron', # 触发器类型: cron, interval, date
job_id='task_id', # 任务ID
name='任务名称', # 任务名称
**trigger_args # 触发器参数
)
示例:
# Cron 任务
@scheduled_task(trigger='cron', hour=0, minute=0)
# Interval 任务
@scheduled_task(trigger='interval', minutes=5)
# Date 任务(一次性)
@scheduled_task(trigger='date', run_date='2025-12-31 23:59:59')
分布式锁
在多实例部署时,使用分布式锁确保任务只在一个实例上执行。
@distributed_task 装饰器
from infoman.service.infrastructure.scheduler import cron, distributed_task
@cron(hour=0, minute=0)
@distributed_task(
lock_key='my_unique_task', # 锁的唯一键(默认使用函数名)
timeout=600, # 锁超时时间(秒),默认 300
skip_locked=True # 锁被占用时跳过执行,默认 True
)
async def my_task():
"""多实例环境下只有一个实例会执行"""
pass
手动使用锁
from infoman.service.infrastructure.scheduler import RedisDistributedLock
from infoman.service.app import application
@cron(hour=2, minute=0)
async def manual_lock_task():
redis = application.state.redis
lock = RedisDistributedLock(redis, "my_task", timeout=300)
if await lock.acquire():
try:
# 执行任务
logger.info("执行任务...")
# 如果任务执行时间长,可以延期锁
await lock.extend(additional_time=300)
finally:
await lock.release()
else:
logger.info("锁被其他实例占用,跳过执行")
上下文管理器方式
@cron(hour=3, minute=0)
async def context_lock_task():
from infoman.service.app import application
redis = application.state.redis
async with RedisDistributedLock(redis, "context_task", timeout=300):
# 自动获取和释放锁
logger.info("执行任务...")
任务管理
动态添加任务
from fastapi import Request
@app.post("/scheduler/jobs")
async def add_job(request: Request):
scheduler = request.app.state.scheduler_manager.scheduler
async def dynamic_task():
logger.info("动态任务执行中...")
# 添加 Cron 任务
job = scheduler.add_job(
func=dynamic_task,
trigger='cron',
job_id='dynamic_task_1',
hour=10,
minute=30
)
return {"status": "ok", "job_id": job.id}
移除任务
@app.delete("/scheduler/jobs/{job_id}")
async def remove_job(request: Request, job_id: str):
scheduler = request.app.state.scheduler_manager.scheduler
success = scheduler.remove_job(job_id)
return {"status": "ok" if success else "error"}
暂停/恢复任务
# 暂停
@app.post("/scheduler/jobs/{job_id}/pause")
async def pause_job(request: Request, job_id: str):
scheduler = request.app.state.scheduler_manager.scheduler
scheduler.pause_job(job_id)
return {"status": "ok"}
# 恢复
@app.post("/scheduler/jobs/{job_id}/resume")
async def resume_job(request: Request, job_id: str):
scheduler = request.app.state.scheduler_manager.scheduler
scheduler.resume_job(job_id)
return {"status": "ok"}
查询任务
@app.get("/scheduler/jobs")
async def list_jobs(request: Request):
scheduler = request.app.state.scheduler_manager.scheduler
jobs = scheduler.get_jobs()
return {
"jobs": [
{
"id": job.id,
"name": job.name,
"next_run": job.next_run_time.isoformat() if job.next_run_time else None,
}
for job in jobs
]
}
配置选项
环境变量配置
在 .env 或 config/.env.{env} 文件中配置:
# 是否启用调度器
SCHEDULER_ENABLED=true
# 时区
SCHEDULER_TIMEZONE=Asia/Shanghai
# 自定义任务目录
SCHEDULER_PACKAGE=app.tasks
SCHEDULER_PATH=./app/tasks
代码配置
from infoman.config import settings
settings.SCHEDULER_ENABLED = True
settings.SCHEDULER_TIMEZONE = 'Asia/Shanghai'
settings.SCHEDULER_PACKAGE = 'app.tasks'
settings.SCHEDULER_PATH = './app/tasks'
最佳实践
1. 目录结构
推荐的项目结构:
app/
├── schedulers/ # 定时任务目录
│ ├── __init__.py
│ ├── daily_tasks.py # 每日任务
│ ├── hourly_tasks.py # 每小时任务
│ ├── monitoring.py # 监控任务
│ └── _helpers.py # 辅助函数(不会被自动加载)
├── main.py
└── .env
2. 任务命名
# ✅ 推荐:清晰的任务ID和名称
@cron(
job_id='daily_data_cleanup',
name='每日数据清理',
hour=0, minute=0
)
async def daily_data_cleanup():
pass
# ❌ 不推荐:没有指定ID和名称
@cron(hour=0, minute=0)
async def task1():
pass
3. 分布式锁使用
# ✅ 推荐:关键任务使用分布式锁
@cron(hour=0, minute=0)
@distributed_task(lock_key='daily_backup', timeout=1800)
async def daily_backup():
"""数据备份(确保只执行一次)"""
pass
# ✅ 推荐:监控任务不使用锁(每个实例都执行)
@interval(minutes=5)
async def health_check():
"""健康检查(每个实例独立检查)"""
pass
4. 错误处理
@cron(hour=0, minute=0)
@distributed_task(timeout=600)
async def robust_task():
"""健壮的任务实现"""
logger.info("任务开始...")
try:
# 业务逻辑
await do_something()
logger.success("任务完成")
except Exception as e:
logger.error(f"任务失败: {e}", exc_info=True)
# 发送告警
# await send_alert(f"任务失败: {e}")
raise # 重新抛出异常,让调度器记录
5. 长时间运行的任务
@cron(hour=1, minute=0)
@distributed_task(timeout=3600)
async def long_running_task():
"""长时间运行的任务"""
from infoman.service.app import application
redis = application.state.redis
lock = RedisDistributedLock(redis, "long_task", timeout=1800)
if await lock.acquire():
try:
# 每 10 分钟延期一次锁
for i in range(6):
await process_batch(i)
await lock.extend(additional_time=600)
finally:
await lock.release()
6. 日志记录
@cron(hour=0, minute=0)
@distributed_task(timeout=600)
async def well_logged_task():
"""良好的日志记录"""
logger.info("📊 开始生成报表...")
try:
logger.info(" 步骤1: 收集数据...")
data = await collect_data()
logger.info(" 步骤2: 处理数据...")
result = await process_data(data)
logger.info(" 步骤3: 生成报表...")
await generate_report(result)
logger.success("✅ 报表生成完成")
except Exception as e:
logger.error(f"❌ 报表生成失败: {e}")
raise
常见问题
Q: 任务没有被自动加载?
A: 检查以下几点:
- 任务目录是否正确:
app/tasks/ __init__.py文件是否存在- 文件名不要以下划线开头
- 配置中
SCHEDULER_ENABLED=true
Q: 多实例部署时任务重复执行?
A: 使用 @distributed_task 装饰器,并确保 Redis 已正确配置。
Q: 如何调试任务?
A: 使用日志和手动触发:
# 在开发环境手动触发任务
@app.post("/debug/run-task/{task_name}")
async def debug_task(request: Request, task_name: str):
scheduler = request.app.state.scheduler_manager.scheduler
job = scheduler.get_job(task_name)
if job:
# 手动执行
await job.func()
return {"status": "ok"}
else:
return {"status": "error", "message": "任务不存在"}
参考资料
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file infomankit-0.3.28.tar.gz.
File metadata
- Download URL: infomankit-0.3.28.tar.gz
- Upload date:
- Size: 155.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3443ade227937558679c47bd3c47f080c83a6b9b24bccc9c1476b7ea625fff2c
|
|
| MD5 |
7ca8642b583f8a9d281892f49bb31680
|
|
| BLAKE2b-256 |
8a7deff70346bccebbcb71204f5b610d2094a45c6c1570e370f06305a070da0b
|
File details
Details for the file infomankit-0.3.28-py3-none-any.whl.
File metadata
- Download URL: infomankit-0.3.28-py3-none-any.whl
- Upload date:
- Size: 187.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7ae9acfebea03eeedd6863295926a4e92ae0ad30e06c03e751c1ed1a1a408100
|
|
| MD5 |
bc28db67211d8eef94629ec477b8c809
|
|
| BLAKE2b-256 |
1bf126b1aada49bf097b6e7d80e38ae2447c9eb31c826269c6a1b6b8618f63af
|