Skip to main content

A distributed job execution framework

Project description

snail-job-Logo

🔥🔥🔥 灵活,可靠和快速的分布式任务重试和分布式任务调度平台

✅️ 可重放,可管控、为提高分布式业务系统一致性的分布式任务重试平台
✅️ 支持秒级、可中断、可编排的高性能分布式任务调度平台

简介

SnailJob 是一个灵活、可靠且高效的分布式任务重试和任务调度平台。其核心采用分区模式实现,具备高度可伸缩性和容错性的分布式系统。拥有完善的权限管理、强大的告警监控功能和友好的界面交互。欢迎大家接入并使用。

snail-job-python

snail-job 项目的 python 客户端。snail-job项目 java 后端

Snail Job Python客户端主打的是“原汁原昧”, 无需嵌套在其他语言环境中; 具备与SnailJob的Java客户端Job模块一样的能力包括(集群、广播、静态分片、Map、MapReuce、DAG工作流、实时日志等功能),而 xxl-jobPowerJob 等其他的任务调度系统都是通过 Java 客户端使用 Runtime 执行 Python 脚本, 那么会有如下几个问题:

  1. 需要运行在Java环境中,即耗内存和又显得笨重
  2. 不方便编写复杂的 Python 脚本
  3. Java 客户端通过 Python 命令执行脚本,需要系统全局安装脚本的第三方依赖
  4. 代码可维护性和可调试比较差

Snail Job Python 客户端可以直接对接 SnailJob 服务器,实现定时任务调度,并上报日志。Python 客户端当前仍不支持重试任务,也没有支持计划。

开始使用

基于源码开发

git clone https://gitee.com/opensnail/snail-job-python.git && cd snail-job-python
# 参考项目的 .env.example 文件创建 .env
cp .env.example .env
# 安装依赖
pip install -e .
# 参考 example 目录示例程序编写客户端业务代码
cd example/
# 启动程序
python main.py

tip: 可以使用 uv run --with 的方式运行:

uv run --with=snail-job-python main.py

登录后台,能看到对应host-id 为 py-xxxxxx 的客户端

注意: snail-job-python 支持 pip 包安装,包名为snail-job-python

示例

定时任务

import snailjob as sj

@sj.job("testJobExecutor")                                      # 1. testJobExecutor 为执行器名称
def test_job_executor(args: sj.JobArgs) -> sj.ExecuteResult:
    sj.SnailLog.REMOTE.info(f"job_params: {args.job_params}")
    return sj.ExecuteResult.success()                           # 2. 返回执行结果

if __name__ == "__main__":
    sj.ExecutorManager.register(test_job_executor)              # 3. 注册执行器
    sj.client_main()                                            # 4. 执行客户端主函数

新建定时任务, 执行器类型选择【Python】,执行器名称填入【testJobExecutor】

动态分片

import snailjob as sj

testMyMapExecutor = sj.MapExecutor("testMyMapExecutor")         # 1. 定义 MapExecutor 变量

@testMyMapExecutor.map()                                        # 2. 定义 ROOT_MAP 阶段任务
def testMyMapExecutor_rootMap(args: sj.MapArgs):
    assert args.task_name == sj.ROOT_MAP
    return sj.ExecuteResult.success("TWO_MAP")


@testMyMapExecutor.map("TWO_MAP")                               # 3. 定义 TWO_MAP 阶段任务
def testMyMapExecutor_twoMap(args: sj.MapArgs):
    return sj.ExecuteResult.success(args.map_result)


if __name__ == "__main__":
    sj.ExecutorManager.register(testMyMapExecutor)              # 4. 注册执行器
    sj.client_main()     

MapReduce

import snailjob as sj

testMapReduceJobExecutor = sj.MapReduceExecutor("testMapReduceJobExecutor")     # 1. 定义 MapReduceExecutor 变量


@testMapReduceJobExecutor.map()                                                 # 2. 定义 ROOT_MAP 阶段任务
def testMapReduceJobExecutor_rootMap(args: sj.MapArgs):
    return sj.ExecuteResult.success("MONTH_MAP")                                # 3. 上报分片信息


@testMapReduceJobExecutor.map("MONTH_MAP")                                      # 4. 定义 ROOT_MAP 阶段任务
def testMapReduceJobExecutor_monthMap(args: sj.MapArgs):
    return sj.ExecuteResult.success(int(args.map_result) * 2)


@testMapReduceJobExecutor.reduce()                                              # 5. 定义 reduce 阶段任务
def testMapReduceJobExecutor_reduce(args: sj.ReduceArgs):
    return sj.ExecuteResult.success(sum([int(x) for x in args.map_result]))


@testMapReduceJobExecutor.merge()                                               # 6. 定义 merge 阶段任务
def testMapReduceJobExecutor_merge(args: sj.MergeReduceArgs):
    return sj.ExecuteResult.success(sum([int(x) for x in args.reduces]))


if __name__ == "__main__":
    sj.ExecutorManager.register(testMapReduceJobExecutor)                       # 7. 注册执行器
    sj.client_main()   

响应停止事件

@sj.job("testJobExecutor")
def test_job_executor(args: sj.JobArgs) -> sj.ExecuteResult:
    for i in range(40):
        if sj.ThreadPoolCache.event_is_set(args.task_batch_id):     # 1. 判断当前任务批次是否被终止
            sj.SnailLog.REMOTE.info("任务已经被中断,立即返回")
            return sj.ExecuteResult.failure()
        time.sleep(1)

    return sj.ExecuteResult.success()

工作流、静态分片与普通定时任务类似,不做赘述

gRPC

开发者工具

python -m grpc_tools.protoc \
    --python_out=. \
    --grpc_python_out=. \
    --proto_path=. \
    snailjob/grpc/snailjob.proto

开发环境

pre-commit

项目使用 ruff-pre-commit

通过如下命令安装hook:

uv sync
uv run pre-commit install

本地开发验证

cd example && uv run --with=.. main.py

基于 Docker 开发环境

详见 Dockerfile.dev 文件

配置

详见 CONFIGURATION.md

Change Log

详见 CHANGELOG.md

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

snail_job_python-0.1.2.tar.gz (88.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

snail_job_python-0.1.2-py3-none-any.whl (35.0 kB view details)

Uploaded Python 3

File details

Details for the file snail_job_python-0.1.2.tar.gz.

File metadata

  • Download URL: snail_job_python-0.1.2.tar.gz
  • Upload date:
  • Size: 88.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.8.22

File hashes

Hashes for snail_job_python-0.1.2.tar.gz
Algorithm Hash digest
SHA256 b6fe704ae86ceab8e7650ad1b7ccf498c3e1da42a50c046e4cc7ea935ffe0042
MD5 d1704f917b2858be94c93f8a42b9a71b
BLAKE2b-256 af13582b381407f7643ef44e22570d8b93dea01cd3896504ef99790419f57ba7

See more details on using hashes here.

File details

Details for the file snail_job_python-0.1.2-py3-none-any.whl.

File metadata

File hashes

Hashes for snail_job_python-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 b1c5ac1ff17943fe4e3b90d56411a06245e0352222ddc1f905f1511203569919
MD5 52e572c9e5c29ac88e58ea70e569bfd3
BLAKE2b-256 d2c265b78c86c16fe970dfb5dd2aa193f9c1e0a6e02ed9262d0d33de964f7282

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page