Skip to main content

lsyzwm-work-core - 简化work任务分发与执行的核心库

Project description

lsyzwmworkcore

简化 Worker 任务分发与执行的核心库,基于 ZooKeeper 实现分布式任务调度。

特性

  • 🚀 简单易用 - 继承 WorkerBase,实现 process_task 方法即可
  • 🔄 生命周期钩子 - 提供 on_task_startedon_task_completedon_task_failed 钩子方法
  • 📡 自动监听 - 基于 ZooKeeper Watch 机制自动监听任务变化
  • 🔧 增量处理 - 内置 NodeChangeTracker 避免重复处理
  • 定时任务 - 支持延时任务、Cron 任务、间隔任务
  • 💾 缓存支持 - Worker 级别和实例级别的缓存节点管理

安装

pip install lsyzwmworkcore

快速开始

1. 定义 Worker

from lsyzwmworkcore.base import WorkerBase
from lsyzwm_master_sdk import MasterZooClient


class MyWorker(WorkerBase):
    @property
    def worker_name(self) -> str:
        return "my_worker"

    def process_task(self, task_id: str) -> None:
        """处理单个任务(必须实现)"""
        # 获取任务数据
        data = self.get_task_value(task_id, as_json=True)
        
        # 处理业务逻辑
        print(f"处理任务: {task_id}, 数据: {data}")

    # 可选:重载钩子方法
    def on_task_started(self, task_id: str) -> None:
        super().on_task_started(task_id)
        print(f"任务开始: {task_id}")

    def on_task_completed(self, task_id: str) -> None:
        super().on_task_completed(task_id)
        print(f"任务完成: {task_id}")

    def on_task_failed(self, task_id: str, exception: Exception = None) -> None:
        super().on_task_failed(task_id, exception)
        print(f"任务失败: {task_id}, 错误: {exception}")

2. 启动 Worker

from lsyzwm_master_sdk import MasterZooClient

# 初始化 ZooKeeper 客户端
zk_client = MasterZooClient(hosts="localhost:2181")
zk_client.start()

# 创建并启动 Worker
worker = MyWorker(sid=1, zk_client=zk_client)
worker.register()      # 注册 Worker 实例
worker.watch_tasks()   # 开始监听任务

3. 使用 WorkerManager(可选)

from lsyzwmworkcore.worker_manager import worker_manager

# 注册 Worker 类
worker_manager.register_worker_class(MyWorker)

# 获取 Worker 实例
worker = worker_manager.get_worker("my_worker")

# 获取所有 Worker
all_workers = worker_manager.get_all_workers()

任务生命周期

on_task_started(task_id)      # 钩子:任务开始,默认标记为处理中
        ↓
process_task(task_id)         # 核心:子类实现业务逻辑
        ↓
on_task_completed(task_id)    # 钩子:任务成功,默认标记完成并删除节点
        或
on_task_failed(task_id, ex)   # 钩子:任务失败,默认标记失败并删除节点

核心组件

WorkerBase

Worker 抽象基类,提供:

方法 说明
process_task(task_id) 抽象方法,子类必须实现
on_task_started(task_id) 钩子:任务开始
on_task_completed(task_id) 钩子:任务完成
on_task_failed(task_id, exception) 钩子:任务失败
register() 注册 Worker 实例
watch_tasks() 监听任务节点变化
get_task_value(task_id, as_json) 获取任务数据
add_task_node(worker_name, payload, ...) 添加任务到其他 Worker
add_self_task_node(payload, ...) 添加任务到当前实例

NodeChangeTracker

节点变更跟踪器,用于增量检测:

tracker = NodeChangeTracker(worker_name="my_worker")

# 获取新增节点
new_nodes = tracker.get_new_nodes(current_node_ids)

# 标记状态
tracker.mark_processing(node_id)
tracker.mark_completed(node_id)
tracker.mark_failed(node_id, remove_from_processing=True)

WorkerManager

Worker 管理器(单例模式):

from lsyzwmworkcore.worker_manager import worker_manager

worker_manager.register_worker_class(MyWorker)
worker = worker_manager.get_worker("my_worker")
names = worker_manager.get_worker_names()

定时任务

# 延时任务
worker.create_delay_job_node(
    job_id="job_001",
    worker_name="target_worker",
    payload={"action": "process"},
    delay_ts=int(time.time()) + 3600,  # 1小时后执行
    who=worker.worker_id
)

# Cron 任务
worker.create_cron_job_node(
    job_id="job_002",
    worker_name="target_worker",
    payload={"action": "daily_report"},
    cron="0 9 * * *",  # 每天9点执行
    who=worker.worker_id
)

# 间隔任务
worker.create_interval_job_node(
    job_id="job_003",
    worker_name="target_worker",
    payload={"action": "heartbeat"},
    who=worker.worker_id,
    minutes=5  # 每5分钟执行
)

缓存管理

# Worker 级别缓存(所有实例共享)
worker.set_worker_cache_value("config", {"key": "value"})
config = worker.get_worker_cache_value("config", as_json=True)

# 实例级别缓存(仅当前实例)
worker.set_worker_instance_cache_value("state", {"status": "running"})
state = worker.get_worker_instance_cache_value("state", as_json=True)

依赖

  • Python >= 3.8
  • Twisted == 24.11.0
  • lsyzwm-master-sdk == 0.0.9

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

lsyzwmworkcore-0.0.3.tar.gz (12.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

lsyzwmworkcore-0.0.3-py3-none-any.whl (9.8 kB view details)

Uploaded Python 3

File details

Details for the file lsyzwmworkcore-0.0.3.tar.gz.

File metadata

  • Download URL: lsyzwmworkcore-0.0.3.tar.gz
  • Upload date:
  • Size: 12.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.11

File hashes

Hashes for lsyzwmworkcore-0.0.3.tar.gz
Algorithm Hash digest
SHA256 9def07f888381a2749d2eb2cfa2b2a7818a601f4c551043b1459ba8105f00ccf
MD5 c8bdfc60ec8b6268712319abb8389ea2
BLAKE2b-256 29da7a8c7921570b57e31612a1c44088b91cd4d117f4bfc0fc98b4a6781ab508

See more details on using hashes here.

File details

Details for the file lsyzwmworkcore-0.0.3-py3-none-any.whl.

File metadata

  • Download URL: lsyzwmworkcore-0.0.3-py3-none-any.whl
  • Upload date:
  • Size: 9.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.11

File hashes

Hashes for lsyzwmworkcore-0.0.3-py3-none-any.whl
Algorithm Hash digest
SHA256 3f27ad5dcf655895413747bb306584d2e1146be10dac184623e5135fad61c00b
MD5 d764be40cc12948a43f62c9f2c0fa912
BLAKE2b-256 98aa5e5c8a85550b34840eb75d5f9e82ef5dae3a7bee7ed6a3402004946affae

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page