Skip to main content

lsyzwm-work-core - 简化work任务分发与执行的核心库

Project description

lsyzwmworkcore

简化 Worker 任务分发与执行的核心库,基于 ZooKeeper 实现分布式任务调度。

特性

  • 🚀 简单易用 - 继承 WorkerBase,实现 process_task 方法即可
  • 🔄 生命周期钩子 - 提供 on_task_startedon_task_completedon_task_failed 钩子方法
  • 📡 自动监听 - 基于 ZooKeeper Watch 机制自动监听任务变化
  • 🔧 增量处理 - 内置 NodeChangeTracker 避免重复处理
  • 定时任务 - 支持延时任务、Cron 任务、间隔任务
  • 💾 缓存支持 - Worker 级别和实例级别的缓存节点管理

安装

pip install lsyzwmworkcore

快速开始

1. 定义 Worker

from lsyzwmworkcore.base import WorkerBase
from lsyzwm_master_sdk import MasterZooClient


class MyWorker(WorkerBase):
    @property
    def worker_name(self) -> str:
        return "my_worker"

    def process_task(self, task_id: str) -> None:
        """处理单个任务(必须实现)"""
        # 获取任务数据
        data = self.get_task_value(task_id, as_json=True)
        
        # 处理业务逻辑
        print(f"处理任务: {task_id}, 数据: {data}")

    # 可选:重载钩子方法
    def on_task_started(self, task_id: str) -> None:
        super().on_task_started(task_id)
        print(f"任务开始: {task_id}")

    def on_task_completed(self, task_id: str) -> None:
        super().on_task_completed(task_id)
        print(f"任务完成: {task_id}")

    def on_task_failed(self, task_id: str, exception: Exception = None) -> None:
        super().on_task_failed(task_id, exception)
        print(f"任务失败: {task_id}, 错误: {exception}")

2. 启动 Worker

from lsyzwm_master_sdk import MasterZooClient

# 初始化 ZooKeeper 客户端
zk_client = MasterZooClient(hosts="localhost:2181")
zk_client.start()

# 创建并启动 Worker
worker = MyWorker(sid=1, zk_client=zk_client)
worker.register()      # 注册 Worker 实例
worker.watch_tasks()   # 开始监听任务

3. 使用 WorkerManager(可选)

from lsyzwmworkcore.worker_manager import worker_manager

# 注册 Worker 类
worker_manager.register_worker_class(MyWorker)

# 获取 Worker 实例
worker = worker_manager.get_worker("my_worker")

# 获取所有 Worker
all_workers = worker_manager.get_all_workers()

任务生命周期

on_task_started(task_id)      # 钩子:任务开始,默认标记为处理中
        ↓
process_task(task_id)         # 核心:子类实现业务逻辑
        ↓
on_task_completed(task_id)    # 钩子:任务成功,默认标记完成并删除节点
        或
on_task_failed(task_id, ex)   # 钩子:任务失败,默认标记失败并删除节点

核心组件

WorkerBase

Worker 抽象基类,提供:

方法 说明
process_task(task_id) 抽象方法,子类必须实现
on_task_started(task_id) 钩子:任务开始
on_task_completed(task_id) 钩子:任务完成
on_task_failed(task_id, exception) 钩子:任务失败
register() 注册 Worker 实例
watch_tasks() 监听任务节点变化
get_task_value(task_id, as_json) 获取任务数据
add_task_node(worker_name, payload, ...) 添加任务到其他 Worker
add_self_task_node(payload, ...) 添加任务到当前实例

NodeChangeTracker

节点变更跟踪器,用于增量检测:

tracker = NodeChangeTracker(worker_name="my_worker")

# 获取新增节点
new_nodes = tracker.get_new_nodes(current_node_ids)

# 标记状态
tracker.mark_processing(node_id)
tracker.mark_completed(node_id)
tracker.mark_failed(node_id, remove_from_processing=True)

WorkerManager

Worker 管理器(单例模式):

from lsyzwmworkcore.worker_manager import worker_manager

worker_manager.register_worker_class(MyWorker)
worker = worker_manager.get_worker("my_worker")
names = worker_manager.get_worker_names()

定时任务

# 延时任务
worker.create_delay_job_node(
    job_id="job_001",
    worker_name="target_worker",
    payload={"action": "process"},
    delay_ts=int(time.time()) + 3600,  # 1小时后执行
    who=worker.worker_id
)

# Cron 任务
worker.create_cron_job_node(
    job_id="job_002",
    worker_name="target_worker",
    payload={"action": "daily_report"},
    cron="0 9 * * *",  # 每天9点执行
    who=worker.worker_id
)

# 间隔任务
worker.create_interval_job_node(
    job_id="job_003",
    worker_name="target_worker",
    payload={"action": "heartbeat"},
    who=worker.worker_id,
    minutes=5  # 每5分钟执行
)

缓存管理

# Worker 级别缓存(所有实例共享)
worker.set_worker_cache_value("config", {"key": "value"})
config = worker.get_worker_cache_value("config", as_json=True)

# 实例级别缓存(仅当前实例)
worker.set_worker_instance_cache_value("state", {"status": "running"})
state = worker.get_worker_instance_cache_value("state", as_json=True)

依赖

  • Python >= 3.8
  • Twisted == 24.11.0
  • lsyzwm-master-sdk == 0.0.9

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

lsyzwmworkcore-0.0.2.tar.gz (12.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

lsyzwmworkcore-0.0.2-py3-none-any.whl (9.5 kB view details)

Uploaded Python 3

File details

Details for the file lsyzwmworkcore-0.0.2.tar.gz.

File metadata

  • Download URL: lsyzwmworkcore-0.0.2.tar.gz
  • Upload date:
  • Size: 12.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.11

File hashes

Hashes for lsyzwmworkcore-0.0.2.tar.gz
Algorithm Hash digest
SHA256 8fa634423d055c4b6fb464b779192eb3797eaa92ace82fd9aafebf6f8414da53
MD5 861093ec29eaa90a94f9dad04a96c4fe
BLAKE2b-256 364096d32650de293408c95d1874010a1b03e7696223d2318867f6e04e59a3fb

See more details on using hashes here.

File details

Details for the file lsyzwmworkcore-0.0.2-py3-none-any.whl.

File metadata

  • Download URL: lsyzwmworkcore-0.0.2-py3-none-any.whl
  • Upload date:
  • Size: 9.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.11

File hashes

Hashes for lsyzwmworkcore-0.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 6de9999cae97a98e9f0f7a8897da6a88fec949489d15801dde3bc9ded2f3253c
MD5 51c3e3b358fc72cb578be334d7d8c76e
BLAKE2b-256 01ec05449136fa54d80e07f1f18a58c7e380b4e786069a2b89d92d4a8551f6d5

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page