Data-Oriented Async Task Orchestrator
Project description
Piko: Enterprise-Grade Async Orchestrator
Piko 专为数据工程设计,旨在解决高并发 I/O 与复杂资源管理之间的矛盾,通过微内核设计与依赖注入机制,让开发者能够轻松构建支撑数万 QPS 的流水线。
🔥 Piko 0.1.6 新特性 (New in v0.1.6)
- App 实例模式:
app = PikoApp(),彻底告别隐式全局变量,支持多实例运行。 - 动态系统配置: 支持在不重启服务的情况下,通过数据库动态调整轮询间隔等系统参数。
- 增强的连接池: 自动探活与断线重连(Pre-ping),适应不稳定的网络环境。
- 智能回填策略: 支持
SKIP(跳过) 和CATCH_UP(追赶) 等多种回溯策略。
前置要求 (Prerequisites)
Piko 依赖 MySQL (5.7 或 8.0+) 作为核心组件。
# 创建数据库
CREATE DATABASE piko_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
安装 (Installation)
pip install piko-cucc
快速开始 (Quick Start)
1. 定义应用 (app.py)
Piko 强制要求显式实例化 App 对象。
from piko.app import PikoApp
# 实例化 App
app = PikoApp(name="my_crawler_service")
2. 编写任务 (jobs.py)
使用 @app.job 装饰器注册任务。
from .app import app
@app.job(
job_id="hello_world",
cron="* * * * *" # 每分钟执行
)
async def hello_handler(ctx, scheduled_time):
print(f"Hello Piko ! Time: {scheduled_time}")
3. 启动入口 (main.py)
from my_project.app import app
import my_project.jobs # 必须导入任务模块以触发注册
if __name__ == "__main__":
# 一键启动:自动处理 DB 初始化、Leader 选举、Worker 启动
app.run()
核心模式 (Core Patterns)
场景一:高并发网络 I/O (The Async IO Pattern)
利用 asyncio 原生并发能力,单节点轻松支撑数千并发。
import aiohttp
from .app import app
@app.job(job_id="fetch_price", cron="*/1 * * * *")
async def fetch_handler(ctx, scheduled_time):
# 纯异步非阻塞
async with aiohttp.ClientSession() as session:
async with session.get("[https://api.example.com/data](https://api.example.com/data)") as resp:
data = await resp.json()
print(f"Data: {data}")
场景二:资源依赖注入 (Dependency Injection)
通过 resources 参数注入连接池,自动管理生命周期,防止资源泄露。
from piko.core.resource import Resource
from contextlib import asynccontextmanager
class DBPool(Resource):
@asynccontextmanager
async def acquire(self, ctx):
# 模拟借出连接
yield "db_connection_obj"
@app.job(
job_id="db_task",
cron="0 * * * *",
resources={"db": DBPool} # 注入资源
)
async def db_handler(ctx, scheduled_time, db):
# db 参数由框架自动注入
print(f"Using connection: {db}")
场景三:动态系统调优 (Dynamic Configuration)
Piko 允许你通过数据库动态调整系统行为。
在 job_config 表中插入特殊 ID piko_system_settings:
// job_id: piko_system_settings
{
"poll_interval_s": 5, // 将轮询间隔调整为 5 秒
"log_level": "DEBUG"
}
Watcher 会自动感知并应用新配置,无需重启服务。
配置 (Configuration)
推荐使用 settings.toml 或环境变量。格式如下:
[default]
# ============================================================================
# General - 通用配置
# ============================================================================
version = "0.1.6"
# ============================================================================
# Startup Strategy - 启动策略
# ============================================================================
# startup_mode 控制服务启动时的容错行为:
# - "fail_closed": 严格模式,任何配置或依赖检查失败都立即终止启动(生产推荐)
# - "fail_open_snapshot": 宽容模式,允许使用快照数据降级启动(仅用于开发/测试)
startup_mode = "fail_closed"
# debug 模式会启用更详细的日志和调试端点,生产环境必须设为 false
debug = false
# ============================================================================
# Database - 数据库连接配置 (Piko 核心)
# ============================================================================
# [注意] 库内部默认留空,配合 init_db 的检查逻辑。
# 这样用户如果没配置环境变量,启动时会看到友好的 FATAL ERROR 提示
#
# ⚠️ 必须使用异步驱动!
# 推荐使用 mysql+asyncmy (速度最快) 或 mysql+aiomysql (兼容性好)
# ❌ 不要使用 pymysql (这是同步驱动,会阻塞 Event Loop)
#
# 格式示例:
# "mysql+asyncmy://user:pass@host:port/dbname?charset=utf8mb4"
mysql_dsn = ""
# mysql_pool_size: 连接池初始大小,影响并发能力
# 设为 10-20 是基于典型中小型服务的经验值
mysql_pool_size = 20
# mysql_max_overflow: 连接池最大溢出连接数
# 允许在高峰期临时创建额外连接,避免请求阻塞
mysql_max_overflow = 10
# mysql_pool_recycle_s: 连接回收时间(秒)
# 强制每 3600 秒回收一次连接,防止连接存在太久被防火墙/MySQL服务端切断
# 这配合代码中的 pool_pre_ping=True 共同解决了 "Lost connection" 问题
mysql_pool_recycle_s = 3600
# ============================================================================
# Scheduler Tuning - 调度器调优参数
# ============================================================================
# poll_interval_s: 主轮询间隔(秒)
# [Piko 新特性] 这是"默认"间隔。
# 系统启动后,ConfigWatcher 会优先读取数据库中的 piko_system_settings 配置。
# 如果数据库没配置,才使用此值。
poll_interval_s = 10
# poll_jitter_s: 轮询抖动时间(秒)
# 随机化轮询时刻,避免多实例在同一时刻同时轮询造成数据库压力峰值
poll_jitter_s = 2
# debounce_s: 防抖窗口(秒)
# 在此时间窗口内重复触发的相同任务会被合并,减少无效调度
debounce_s = 2
# timezone: 时区设置
# 影响 cron 表达式的解析和任务触发时间计算,必须与业务时区一致
timezone = "Asia/Shanghai"
# ap_misfire_grace_s_default: APScheduler misfire 容忍时间(秒)
# 任务错过预定时间后,在此时间窗口内仍会执行;超出则跳过
# 300 秒(5分钟)是平衡及时性与系统压力的经验值
ap_misfire_grace_s_default = 300
# ap_max_instances_default: APScheduler 单任务最大并发实例数
# 设为 1 防止同一任务的多个实例并发执行,避免数据竞争
ap_max_instances_default = 1
# ============================================================================
# Leader Election - 分布式Leader选举
# ============================================================================
# leader_enabled: 是否启用Leader选举
# 多实例部署时必须启用,确保只有一个实例执行调度逻辑
leader_enabled = true
# leader_name: Leader名称/组名
# 同一组内的实例会竞争同一个Leader锁,不同组互不干扰
leader_name = "default"
# leader_lease_s: Leader租约时长(秒)
# Leader必须在此时间内续租,否则被视为失效,其他实例可接管
leader_lease_s = 30
# leader_renew_interval_s: 续租间隔(秒)
# Leader多久续租一次,必须小于 lease 时长且留有余量(当前为 1/3)
# 这样即使某次续租失败,仍有时间在租约到期前重试
leader_renew_interval_s = 10
# ============================================================================
# Concurrency & Compute - 并发与计算资源
# ============================================================================
# cpu_workers: CPU 密集型任务的工作线程数
# 0 表示自动检测(通常为 CPU 核心数),可根据任务特性手动调整
cpu_workers = 0
# per_job_cpu_max: 单任务最大 CPU 并发数
# 限制单个任务可使用的最大线程数,防止某个任务耗尽所有资源
per_job_cpu_max = 8
# ============================================================================
# Persistence - 持久化与缓冲配置
# ============================================================================
# persist_queue_max: 持久化队列最大长度
# 控制内存中待持久化对象的最大数量,防止内存溢出
persist_queue_max = 200
# persist_flush_timeout_s: 强制刷盘超时(秒)
# 即使队列未满,也会在此时间后强制写入,平衡数据丢失风险与 I/O 效率
persist_flush_timeout_s = 60
# persist_disk_fallback_path: 磁盘降级备份路径
# 当主存储(如数据库)不可用时,临时写入本地文件,防止数据丢失
# 注意:此路径应在容器/主机重启后仍可访问(如挂载卷)
persist_disk_fallback_path = "/tmp/piko_fallback.bin"
# ============================================================================
# Observability & Logging - 可观测性与日志
# ============================================================================
# metrics_enabled: 是否启用 Prometheus 等指标采集
# 生产环境强烈建议启用,用于监控调度器健康状况
metrics_enabled = true
# health_port: 健康检查和指标暴露端口
# K8s/Docker 可通过此端口进行 liveness/readiness probe
health_port = 8080
# log_level: 日志级别
# 可选: DEBUG, INFO, WARNING, ERROR, CRITICAL
# 生产环境建议 INFO,排查问题时临时调为 DEBUG
log_level = "INFO"
# log_json: 日志格式开关
# - true: 生产模式,输出结构化 JSON 日志,便于日志聚合平台解析
# - false: 开发模式,输出彩色文本日志,提升本地调试体验
log_json = false
⚙️ 进阶:动态系统配置 (Dynamic System Settings)
除了前面的配置文件或环境变量方式,Piko 还引入了 “控制面与数据面分离” 的设计理念。你还可以通过数据库实时调整系统的运行时行为,实现 无停机调优 (Hot Reload)。
这是通过在 job_config 表中插入一个特殊的保留任务 ID piko_system_settings 来实现的。
1. 静态 vs 动态配置对照表
并不是所有配置都能放入数据库。请务必遵循以下规则,否则配置将无效:
| 配置分类 | 典型参数 | 存放位置 | 生效时机 | 备注 |
|---|---|---|---|---|
| 基础设施 (Infrastructure) | mysql_dsnmysql_pool_sizeleader_namestartup_modetimezone |
仅 settings.toml或 环境变量 |
启动时 | 涉及连接池初始化、线程启动或时区基准,修改后必须重启服务。 |
| 运行时策略 (Runtime Policy) | poll_interval_spoll_jitter_slog_levelper_job_cpu_max |
settings.toml (默认值)+ 数据库 (覆盖值) |
即时生效 (< 1分钟) |
Watcher 会在下一次心跳时自动加载新值。 |
2. 如何使用动态配置
你不需要重启服务,只需在数据库中插入或更新以下 JSON:
INSERT INTO job_config (job_id, schema_version, config_json, version, updated_at)
VALUES (
'piko_system_settings', -- ⚠️ 系统保留 ID,不可更改
1,
'{
"poll_interval_s": 5, -- [调优] 将轮询间隔加速到 5秒 (默认10s)
"poll_jitter_s": 1, -- [调优] 减少抖动范围
"log_level": "DEBUG" -- [排查] 临时开启调试日志,查完改回 INFO
}',
1,
NOW()
)
ON DUPLICATE KEY UPDATE
config_json = VALUES(config_json),
version = version + 1;
3. 常见误区 (Common Pitfalls)
- ❌ 错误做法:在数据库里修改
mysql_pool_size想扩大连接池。- 后果:无效。连接池在进程启动的第一秒就已经固定了,数据库里的配置会被忽略。
- ❌ 错误做法:在数据库里修改
timezone。- 后果:极度危险。调度器可能产生逻辑分裂(部分任务按旧时区跑,部分按新时区计算),导致任务漏跑或重跑。
- ✅ 正确做法:仅在数据库中调整
poll_interval_s(控制节奏) 和log_level(控制日志量)。
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file piko_cucc-0.1.7.tar.gz.
File metadata
- Download URL: piko_cucc-0.1.7.tar.gz
- Upload date:
- Size: 102.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.7.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
35516dd5bd957924256277a238b89b69dae80bda6705c0cb5991ec5f4743f463
|
|
| MD5 |
070a8b333cf9162f22cdadf1e7ebf2a0
|
|
| BLAKE2b-256 |
b293f178404e50379cf7af254b605fe30570acee5a9575b7eb3d74b28f642f2b
|
File details
Details for the file piko_cucc-0.1.7-py3-none-any.whl.
File metadata
- Download URL: piko_cucc-0.1.7-py3-none-any.whl
- Upload date:
- Size: 81.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.7.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b0799602b7a45e8c68be0d58b2b60f1bb040910d4df5db6063752487999b8b5b
|
|
| MD5 |
7e38c93082ae40a307ace3a839c3c9a4
|
|
| BLAKE2b-256 |
b9397f884b3b44bdb704e2e31f194cf9917efad907d3fec7b2ed249654808220
|