Skip to main content

Roal 框架 - 纯数据流转与任务编排框架

Project description

Roal 框架

框架概述

Roal = Routing + Orchestration + Analytics + Logistics

代表框架的核心能力:数据路由、流程编排、分析处理、物流式流转。

Roal 是一个纯数据流转与任务编排框架,采用 Python 语言实现。

核心信条:框架不关心数据从哪来、到哪去、长什么样,只负责在任务之间传递数据、编排执行顺序、管理任务生命周期。

开发者拥有绝对自由

  • 数据源可以是文件、数据库、消息队列、HTTP 请求、硬件传感器、随机数生成器
  • 数据格式可以是字符串、JSON、Protobuf、Avro、二进制字节流、Pandas DataFrame、NumPy 数组
  • 处理逻辑可以是任何 Python 代码

框架职责边界:开发者写好 Task,框架负责串联执行。

核心概念

Task(任务)

Task 是框架中最基础的抽象,代表一个独立的处理单元。

开发者视角:Task 就是一个可调用对象(函数、lambda、类的 __call__ 方法),接收一个输入,产出一个输出。输入输出的类型完全由开发者决定。

框架视角:Task 是一个黑盒,框架只知道它有输入和输出,不关心内部实现。框架负责在合适的时机调用它,并把输出传递给下一个 Task。

定义方式

  • 普通函数
  • 异步函数(async def)
  • 生成器函数(yield)
  • 实现了 __call__ 的类实例
  • 使用 @task 装饰器增强的任意可调用对象

Pipeline(流水线)

Pipeline 是多个 Task 的有序组合,定义了数据从产生到消费的完整路径。

构成元素

  • 节点:每个 Task 是一个节点
  • :表示数据流向,从上游 Task 的输出指向下游 Task 的输入
  • 路由规则:定义数据如何从上游分发到下游

支持的拓扑结构

  • 串联:A → B → C
  • 分流:A 的输出同时发给 B 和 C(广播)
  • 合并:B 和 C 的输出都发给 D
  • 条件分支:根据数据内容决定发送给哪个下游
  • 扇出扇入:一对多再聚合
  • 哈希路由:根据数据字段的哈希值选择下游

TaskContext(任务上下文)

TaskContext 是框架暴露给开发者的服务接口,让 Task 在执行时能够获取框架提供的能力。

提供的服务

  • 配置获取:获取 Task 的配置参数,支持类型转换和默认值
  • 状态存储:可选的 KV 存储,用于保存跨数据记录的 Task 状态
  • 指标上报:向框架上报自定义指标(计数器、直方图、仪表)
  • 日志记录:自动携带流水线 ID、Task 名称、数据 ID 的结构化日志
  • 元数据访问:获取当前 Pipeline 的名称、运行 ID、启动时间等信息
  • 数据属性:获取当前数据的元数据(来源、时间戳、序列号)

安装

基本安装

pip install roal

安装可选依赖

# 安装 Redis 支持
pip install roal[redis]

# 安装异步文件 I/O 支持
pip install roal[aio]

# 安装 Kafka 支持
pip install roal[kafka]

# 安装 PostgreSQL 支持
pip install roal[postgres]

# 安装所有可选依赖
pip install roal[redis,aio,kafka,postgres]

快速开始

定义 Task

from roal import task, TaskContext
import json

@task(name="parse_json", workers=4)
async def parse_json(data: bytes, ctx: TaskContext) -> dict:
    return json.loads(data)

@task
async def filter_errors(record: dict, ctx: TaskContext) -> dict:
    if record.get("level") == "error":
        return record
    return None  # 过滤非错误记录

@task
async def send_alert(error: dict, ctx: TaskContext) -> None:
    print(f"Alert: {error['message']}")

组装 Pipeline

from roal import Pipeline

pipeline = Pipeline("log_processor")

pipeline.source(read_logs) \  # 假设 read_logs 是一个数据源 Task
    .then(parse_json) \    
    .fork() \
        .to(filter_errors, send_alert) \
        .to(extract_fields, store_to_db) \
    .merge() \
    .then(generate_stats) \
    .sink(write_output)

# 运行 Pipeline
pipeline.run()

核心特性

  1. 数据血缘追踪:框架为每一条数据记录分配全局唯一的 ID,并记录它经过的每一个 Task。
  2. 可观测性:自动采集吞吐量、延迟、队列深度等指标,支持 Prometheus 集成。
  3. 断点续传:框架定期保存每个 Task 的消费位点,支持从故障中恢复。
  4. 动态配置:支持运行时修改 Task 配置,无需重启 Pipeline。
  5. 测试支持:提供内存数据源和数据汇,简化单元测试。

典型应用场景

  • 实时日志分析:实时监控应用日志,错误日志发送告警,访问日志统计 QPS。
  • 数据 ETL 管道:从业务库抽取数据,清洗转换后写入数仓,同时更新缓存。
  • 物联网数据处理:接收设备传感器数据,异常检测,持久化,下发指令。
  • AI 推理流水线:接收图片,预处理,模型推理,后处理,返回结果。
  • 事件驱动工作流:用户操作触发一系列后续处理。

技术选型

  • 核心依赖:asyncio, uvloop, structlog, prometheus_client, PyYAML
  • Python 版本:3.9+
  • 操作系统:Linux / macOS / Windows

部署形态

  • 嵌入式部署:作为 Python 库直接嵌入现有应用。
  • 独立服务:提供可选的 HTTP API 服务层。
  • 容器化部署:提供官方 Docker 镜像。
  • Kubernetes 部署:支持 K8s 部署,利用 Horizontal Pod Autoscaler 实现自动扩缩容。

与现有框架对比

特性 Roal Apache Flink Apache Airflow Prefect Bytewax
语言 Python Java/Python Python Python Python
编程模型 纯函数式 Task 富算子 DAG 定义 函数式 Flow 数据流
部署复杂度 极低,pip install 高,需要集群 中,需要调度器
学习曲线 几分钟 数周 数天 数小时 数小时
实时处理 原生支持 原生支持 分钟级调度 支持 原生支持
状态管理 可选,开发者控制 强制,框架管理 有限(XCom) 支持 支持
Python 生态集成 无缝 通过 PyFlink 一般 良好 良好
适用场景 通用数据编排 大规模流计算 定时任务调度 数据工程 流处理

设计原则

  1. 简单至上:API 应该让开发者在 5 分钟内理解并开始使用
  2. 零侵入:不强制继承基类,普通函数即可成为 Task
  3. 显式优于隐式:框架行为可预测,避免魔法
  4. 渐进式复杂度:简单场景只需几行代码,复杂场景提供完整扩展能力
  5. Pythonic:遵循 Python 社区惯例,利用语言特性简化设计
  6. 测试友好:提供一流测试支持,鼓励 TDD

项目结构

roal/
├── pyproject.toml                 # 项目配置和依赖
├── README.md
├── docs/                          # 文档
│   ├── getting-started.md
│   ├── core-concepts.md
│   ├── advanced/
│   └── api-reference/
├── roal/                          # 核心源码
│   ├── __init__.py
│   ├── api/                       # 公开 API
│   │   ├── __init__.py
│   │   ├── task.py               # Task 协议和装饰器
│   │   ├── pipeline.py           # Pipeline 类
│   │   └── context.py            # TaskContext 类
│   ├── core/                      # 内部实现
│   │   ├── dag.py                # DAG 构建和拓扑排序
│   │   ├── router.py             # 路由策略实现
│   │   └── executor.py           # 执行引擎
│   ├── runtime/                   # 运行时
│   │   ├── worker.py             # Worker 协程管理
│   │   ├── queue.py              # 异步队列封装
│   │   └── backpressure.py       # 背压控制
│   ├── state/                     # 状态管理
│   │   ├── base.py               # StateStore 抽象
│   │   ├── memory.py             # 内存存储
│   │   ├── redis.py              # Redis 存储
│   │   └── file.py               # 文件存储
│   ├── checkpoint/                # 断点续传
│   │   ├── manager.py
│   │   └── stores.py
│   ├── monitoring/                # 监控指标
│   │   ├── metrics.py
│   │   ├── tracing.py
│   │   └── prometheus.py
│   └── testing/                   # 测试工具
│       ├── harness.py
│       └── fixtures.py
├── examples/                      # 示例代码
│   ├── wordcount/
│   ├── log_analysis/
│   ├── etl_pipeline/
│   └── iot_processing/
└── tests/                         # 测试
    ├── unit/
    ├── integration/
    └── performance/

版本路线图

版本 里程碑 核心功能
0.1.0 MVP 基础 Task 和 Pipeline,支持串联和分支,asyncio 执行器
0.2.0 路由增强 条件路由、哈希路由、广播路由,Worker 并发
0.3.0 状态管理 StateStore 抽象,内存/Redis 实现
0.4.0 可观测性 指标收集,Prometheus 集成,结构化日志
0.5.0 容错能力 断点续传,重试策略,死信队列
0.6.0 动态能力 运行时配置更新,动态扩缩容
0.7.0 进程扩展 多进程支持,CPU 密集型任务隔离
0.8.0 API 服务 HTTP API,Web UI 基础
0.9.0 生态集成 官方连接器(Kafka、Redis、PostgreSQL)
1.0.0 稳定版 API 稳定,生产就绪,完整文档

贡献指南

Roal 采用开放治理模式,欢迎社区贡献:

  • 代码贡献:Fork → Feature Branch → Pull Request
  • 文档改进:直接提交 PR 到 docs 目录
  • Bug 报告:使用 GitHub Issues,附带最小复现示例
  • 功能建议:先在 Discussions 中讨论,达成共识后再实现

开发环境搭建

git clone https://github.com/RestRegular/roal.git
cd roal
python -m venv venv
source venv/bin/activate
pip install -e ".[dev]"
pytest

Roal —— 让数据流转像呼吸一样自然

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

roal-0.7.0.tar.gz (44.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

roal-0.7.0-py3-none-any.whl (54.9 kB view details)

Uploaded Python 3

File details

Details for the file roal-0.7.0.tar.gz.

File metadata

  • Download URL: roal-0.7.0.tar.gz
  • Upload date:
  • Size: 44.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.12

File hashes

Hashes for roal-0.7.0.tar.gz
Algorithm Hash digest
SHA256 78d6fa9b955574f44b2dceea16c61109d870784a51880534972fee1475ac86f9
MD5 5e8bd7de64ad2be3d5c5b06374cb3efe
BLAKE2b-256 a96d53b255c15741c741ff52dda5fb54dff9ffb9693fcf367a65eb815f1d1f2c

See more details on using hashes here.

File details

Details for the file roal-0.7.0-py3-none-any.whl.

File metadata

  • Download URL: roal-0.7.0-py3-none-any.whl
  • Upload date:
  • Size: 54.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.12

File hashes

Hashes for roal-0.7.0-py3-none-any.whl
Algorithm Hash digest
SHA256 1b5d25614f9fd55db2e5a41ba6a6307fb201d6061ff9bef80daa1c0cfeae6297
MD5 91fedb18e76a718a2ea0314dc64cb8d7
BLAKE2b-256 d3cb823445a9051ce513f66869fedce434391d2802dc1c5457bfec7705b8bf13

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page