lsyzwm-work-core - 简化work任务分发与执行的核心库
Project description
lsyzwmworkcore
简化 Worker 任务分发与执行的核心库,基于 ZooKeeper 实现分布式任务调度。
特性
- 🚀 简单易用 - 继承
WorkerBase,实现process_task方法即可 - 🔄 生命周期钩子 - 提供
on_task_started、on_task_completed、on_task_failed钩子方法 - 📡 自动监听 - 基于 ZooKeeper Watch 机制自动监听任务变化
- 🔧 增量处理 - 内置
NodeChangeTracker避免重复处理 - ⏰ 定时任务 - 支持延时任务、Cron 任务、间隔任务
- 💾 缓存支持 - Worker 级别和实例级别的缓存节点管理
安装
pip install lsyzwmworkcore
快速开始
1. 定义 Worker
from lsyzwmworkcore.base import WorkerBase
from lsyzwm_master_sdk import MasterZooClient
class MyWorker(WorkerBase):
@property
def worker_name(self) -> str:
return "my_worker"
def process_task(self, task_id: str) -> None:
"""处理单个任务(必须实现)"""
# 获取任务数据
data = self.get_task_value(task_id, as_json=True)
# 处理业务逻辑
print(f"处理任务: {task_id}, 数据: {data}")
# 可选:重载钩子方法
def on_task_started(self, task_id: str) -> None:
super().on_task_started(task_id)
print(f"任务开始: {task_id}")
def on_task_completed(self, task_id: str) -> None:
super().on_task_completed(task_id)
print(f"任务完成: {task_id}")
def on_task_failed(self, task_id: str, exception: Exception = None) -> None:
super().on_task_failed(task_id, exception)
print(f"任务失败: {task_id}, 错误: {exception}")
2. 启动 Worker
from lsyzwm_master_sdk import MasterZooClient
# 初始化 ZooKeeper 客户端
zk_client = MasterZooClient(hosts="localhost:2181")
zk_client.start()
# 创建并启动 Worker
worker = MyWorker(sid=1, zk_client=zk_client)
worker.register() # 注册 Worker 实例
worker.watch_tasks() # 开始监听任务
3. 使用 WorkerManager(可选)
from lsyzwmworkcore.worker_manager import worker_manager
# 注册 Worker 类
worker_manager.register_worker_class(MyWorker)
# 获取 Worker 实例
worker = worker_manager.get_worker("my_worker")
# 获取所有 Worker
all_workers = worker_manager.get_all_workers()
任务生命周期
on_task_started(task_id) # 钩子:任务开始,默认标记为处理中
↓
process_task(task_id) # 核心:子类实现业务逻辑
↓
on_task_completed(task_id) # 钩子:任务成功,默认标记完成并删除节点
或
on_task_failed(task_id, ex) # 钩子:任务失败,默认标记失败并删除节点
核心组件
WorkerBase
Worker 抽象基类,提供:
| 方法 | 说明 |
|---|---|
process_task(task_id) |
抽象方法,子类必须实现 |
on_task_started(task_id) |
钩子:任务开始 |
on_task_completed(task_id) |
钩子:任务完成 |
on_task_failed(task_id, exception) |
钩子:任务失败 |
register() |
注册 Worker 实例 |
watch_tasks() |
监听任务节点变化 |
get_task_value(task_id, as_json) |
获取任务数据 |
add_task_node(worker_name, payload, ...) |
添加任务到其他 Worker |
add_self_task_node(payload, ...) |
添加任务到当前实例 |
NodeChangeTracker
节点变更跟踪器,用于增量检测:
tracker = NodeChangeTracker(worker_name="my_worker")
# 获取新增节点
new_nodes = tracker.get_new_nodes(current_node_ids)
# 标记状态
tracker.mark_processing(node_id)
tracker.mark_completed(node_id)
tracker.mark_failed(node_id, remove_from_processing=True)
WorkerManager
Worker 管理器(单例模式):
from lsyzwmworkcore.worker_manager import worker_manager
worker_manager.register_worker_class(MyWorker)
worker = worker_manager.get_worker("my_worker")
names = worker_manager.get_worker_names()
定时任务
# 延时任务
worker.create_delay_job_node(
job_id="job_001",
worker_name="target_worker",
payload={"action": "process"},
delay_ts=int(time.time()) + 3600, # 1小时后执行
who=worker.worker_id
)
# Cron 任务
worker.create_cron_job_node(
job_id="job_002",
worker_name="target_worker",
payload={"action": "daily_report"},
cron="0 9 * * *", # 每天9点执行
who=worker.worker_id
)
# 间隔任务
worker.create_interval_job_node(
job_id="job_003",
worker_name="target_worker",
payload={"action": "heartbeat"},
who=worker.worker_id,
minutes=5 # 每5分钟执行
)
缓存管理
# Worker 级别缓存(所有实例共享)
worker.set_worker_cache_value("config", {"key": "value"})
config = worker.get_worker_cache_value("config", as_json=True)
# 实例级别缓存(仅当前实例)
worker.set_worker_instance_cache_value("state", {"status": "running"})
state = worker.get_worker_instance_cache_value("state", as_json=True)
依赖
- Python >= 3.8
- Twisted == 24.11.0
- lsyzwm-master-sdk == 0.0.9
License
MIT
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
lsyzwmworkcore-0.0.4.tar.gz
(13.6 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file lsyzwmworkcore-0.0.4.tar.gz.
File metadata
- Download URL: lsyzwmworkcore-0.0.4.tar.gz
- Upload date:
- Size: 13.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
780da20a859871666ddd01dae7a0cc4b842d50c71db8b73e2828f2c3859dc747
|
|
| MD5 |
17ecd810f85a202faa9ade27a39e2cac
|
|
| BLAKE2b-256 |
265b95ff9d592e2182306160dc9e1bf9617b16475b7c93cdacc3d89be38022e6
|
File details
Details for the file lsyzwmworkcore-0.0.4-py3-none-any.whl.
File metadata
- Download URL: lsyzwmworkcore-0.0.4-py3-none-any.whl
- Upload date:
- Size: 12.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
bab33df05ce8e37710476fe4ca3a76a3245d8d3c31a40875ac1eaa57210cfd7f
|
|
| MD5 |
b8cea30eef2734b18570357a0ab2d01e
|
|
| BLAKE2b-256 |
18997fed4c2ae1a129c15b993938fc29e80dde8640b56bf5838b5823fd002e0f
|