Skip to main content

A unified server management framework for data processing component

Project description

数据处理平台组件基类

统一的数据处理组件开发框架,标准化入参、出参和执行入口。

功能特性

  • 统一的接口规范(继承 BaseService,实现 process 方法)
  • 配置加载(本地 JSON / HTTP 远程 JSON / base64 编码 URL)
  • 动态 pip 包安装(runtime_env.pip
  • 结构化日志(自动包含服务名称和版本,兼容 Ray)
  • Ray 分布式业务日志(header/worker 按 key 拆分文件,9 列竖线格式,自动上传 OSS 并清理本地,带指数退避重试)
  • 结果投递(上传 OSS + MNS 通知,自动重试)
  • 日志上传 OSS(上传后自动删除本地文件)
  • 数据跟踪(track 模块,两段式上报到 MNS 顺序队列)
  • 工具函数(tools 模块,环境变量提取、bag 路径解析)

安装

pip install hello-datap-component-base

快速上手

详见 快速开始 | 用户手册

from hello_datap_component_base import BaseService

class MyService(BaseService):
    async def process(self, data: dict) -> dict:
        return {"result": data.get("value", 0) * 2}
component_manager start config.json

配置文件

{
  "name": "my-service",
  "version": "1.0.0",
  "work_flow_id": 123,
  "work_flow_instance_id": 456,
  "task_id": "task-001",
  "runtime_env": {
    "pip": ["requests>=2.25.0"],
    "env_vars": {
      "LOG_LEVEL": "INFO",
      "MNS_ENDPOINT": "https://your-account.mns.cn-shanghai.aliyuncs.com",
      "MNS_ACCESS_KEY_ID": "your-key-id",
      "MNS_ACCESS_KEY_SECRET": "your-key-secret",
      "OSS_ENDPOINT_FOR_LOG": "https://oss-cn-shanghai.aliyuncs.com",
      "OSS_ACCESS_KEY_ID_FOR_LOG": "your-key-id",
      "OSS_ACCESS_KEY_SECRET_FOR_LOG": "your-key-secret",
      "OSS_BUCKET_NAME_FOR_LOG": "your-bucket"
    }
  },
  "params": {
    "field1": "value1"
  }
}
字段 类型 必需 说明
name string 服务名称
version string 服务版本
work_flow_id int 工作流 ID
work_flow_instance_id int 工作流实例 ID
task_id string 任务 ID
runtime_env.pip list 启动前自动安装的 pip 包
runtime_env.env_vars dict 注入到进程的环境变量
params dict 传递给 process 方法的输入数据

返回结果格式

{
    "code": 0,
    "message": "success",
    "processing_time": 0.123,
    "data": {
        "work_flow_id": 123,
        "work_flow_instance_id": 456,
        "task_id": "task-001",
        "out_put": { "...用户返回的结果..." }
    }
}

异常时 code=-1message 为错误信息,out_putnull

结果投递(MNS + OSS)

配置 MNS 和 OSS 环境变量后,框架自动完成:

  1. 完整结果 JSON → 上传到 oss://infra-hads-artifacts/data-process-platform/task-result/{task_id}.json
  2. OSS 地址 → 发送到 MNS 队列

避免 MNS 64KB 消息限制。未配置则静默跳过。

数据跟踪(track)

from hello_datap_component_base import track

track.log(data=bag_path)                  # 任务开始
track.log(data=bag_path, status=1)        # 任务成功(status: 1=成功 2=失败)

通过 data + data_type + pipeline_instance_id + node_task_id 四元组判断插入/更新。

环境变量:DATA_TRACK_PIPELINE_IDDATA_TRACK_PIPELINE_INSTANCE_IDDATA_TRACK_NODE_IDDATA_TRACK_NODE_TASK_IDDATA_TRACK_COMPONENT_VERSIONDATA_TRACK_PRIORITYDATA_TRACK_EXECUTORDATA_TRACK_RUN_TYPEDATA_TRACK_MNS_QUEUE_NAME。MNS 认证复用 MNS_ENDPOINT 等。

工具函数(tools)

from hello_datap_component_base import tools

env_vars = tools.extract_specific_env_vars()           # 提取框架相关环境变量
vehicle  = tools.get_vehicle_id_from_bag(bag_path)     # "1_002"
date     = tools.get_date_from_bag(bag_path)           # "20260119"
package  = tools.get_package_name_from_bag(bag_path)   # "1_002_20260119-151708"

日志

from hello_datap_component_base import logger

logger.info("处理中...")
logger.error(f"失败: {e}")

或在服务类内使用 self.logger。日志自动包含服务名称和版本,兼容 Ray。

命令行

component_manager start <config_path>       # 启动服务
component_manager list                       # 列出服务类
component_manager validate <config_path>     # 验证配置
component_manager test <config_path> [data]  # 测试服务

也可用 python -m hello_datap_component_base.cli 替代 component_manager

项目结构

hello_datap_component_base/
├── __init__.py        # 模块导出
├── base.py            # BaseService 基类
├── config.py          # 配置管理
├── runner.py          # 服务运行器
├── cli.py             # 命令行工具
├── discover.py        # 服务发现
├── logger.py          # 日志管理
├── mns_client.py      # MNS 队列客户端
├── oss_client.py      # OSS 客户端
├── track.py           # 数据跟踪
├── tools.py           # 工具函数
└── data/
    └── bag_data_service.py  # Bag 数据服务

许可证

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

hello_datap_component_base-0.4.4.tar.gz (48.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

hello_datap_component_base-0.4.4-py3-none-any.whl (47.7 kB view details)

Uploaded Python 3

File details

Details for the file hello_datap_component_base-0.4.4.tar.gz.

File metadata

File hashes

Hashes for hello_datap_component_base-0.4.4.tar.gz
Algorithm Hash digest
SHA256 e361e5d1b6aa502854b918a067afcbfe12ac2d7a5859c65ee4fb074bd8977307
MD5 612ee9b733331bef089cad983168031a
BLAKE2b-256 6a0367136b0b653716aff5d122af322fe8e103f5b06df624ef61e38eb4ae3c65

See more details on using hashes here.

File details

Details for the file hello_datap_component_base-0.4.4-py3-none-any.whl.

File metadata

File hashes

Hashes for hello_datap_component_base-0.4.4-py3-none-any.whl
Algorithm Hash digest
SHA256 dd36fa97cc79a1709487686caa6db86e05448f4a31e88013b5ffe0849b32b920
MD5 6b3adeb801cde5864a1a20123d698356
BLAKE2b-256 1f764666c095648cf0ac8887846312bc1f8aedc803f8e073d9057d005614bdcc

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page