Roal 框架 - 纯数据流转与任务编排框架
Project description
Roal 框架
框架概述
Roal = Routing + Orchestration + Analytics + Logistics
代表框架的核心能力:数据路由、流程编排、分析处理、物流式流转。
Roal 是一个纯数据流转与任务编排框架,采用 Python 语言实现。
核心信条:框架不关心数据从哪来、到哪去、长什么样,只负责在任务之间传递数据、编排执行顺序、管理任务生命周期。
开发者拥有绝对自由:
- 数据源可以是文件、数据库、消息队列、HTTP 请求、硬件传感器、随机数生成器
- 数据格式可以是字符串、JSON、Protobuf、Avro、二进制字节流、Pandas DataFrame、NumPy 数组
- 处理逻辑可以是任何 Python 代码
框架职责边界:开发者写好 Task,框架负责串联执行。
核心概念
Task(任务)
Task 是框架中最基础的抽象,代表一个独立的处理单元。
开发者视角:Task 就是一个可调用对象(函数、lambda、类的 __call__ 方法),接收一个输入,产出一个输出。输入输出的类型完全由开发者决定。
框架视角:Task 是一个黑盒,框架只知道它有输入和输出,不关心内部实现。框架负责在合适的时机调用它,并把输出传递给下一个 Task。
定义方式:
- 普通函数
- 异步函数(async def)
- 生成器函数(yield)
- 实现了
__call__的类实例 - 使用
@task装饰器增强的任意可调用对象
Pipeline(流水线)
Pipeline 是多个 Task 的有序组合,定义了数据从产生到消费的完整路径。
构成元素:
- 节点:每个 Task 是一个节点
- 边:表示数据流向,从上游 Task 的输出指向下游 Task 的输入
- 路由规则:定义数据如何从上游分发到下游
支持的拓扑结构:
- 串联:A → B → C
- 分流:A 的输出同时发给 B 和 C(广播)
- 合并:B 和 C 的输出都发给 D
- 条件分支:根据数据内容决定发送给哪个下游
- 扇出扇入:一对多再聚合
- 哈希路由:根据数据字段的哈希值选择下游
TaskContext(任务上下文)
TaskContext 是框架暴露给开发者的服务接口,让 Task 在执行时能够获取框架提供的能力。
提供的服务:
- 配置获取:获取 Task 的配置参数,支持类型转换和默认值
- 状态存储:可选的 KV 存储,用于保存跨数据记录的 Task 状态
- 指标上报:向框架上报自定义指标(计数器、直方图、仪表)
- 日志记录:自动携带流水线 ID、Task 名称、数据 ID 的结构化日志
- 元数据访问:获取当前 Pipeline 的名称、运行 ID、启动时间等信息
- 数据属性:获取当前数据的元数据(来源、时间戳、序列号)
安装
基本安装
pip install roal
安装可选依赖
# 安装 Redis 支持
pip install roal[redis]
# 安装异步文件 I/O 支持
pip install roal[aio]
# 安装 Kafka 支持
pip install roal[kafka]
# 安装 PostgreSQL 支持
pip install roal[postgres]
# 安装所有可选依赖
pip install roal[redis,aio,kafka,postgres]
快速开始
定义 Task
from roal import task, TaskContext
import json
@task(name="parse_json", workers=4)
async def parse_json(data: bytes, ctx: TaskContext) -> dict:
return json.loads(data)
@task
async def filter_errors(record: dict, ctx: TaskContext) -> dict:
if record.get("level") == "error":
return record
return None # 过滤非错误记录
@task
async def send_alert(error: dict, ctx: TaskContext) -> None:
print(f"Alert: {error['message']}")
组装 Pipeline
from roal import Pipeline
pipeline = Pipeline("log_processor")
pipeline.source(read_logs) \ # 假设 read_logs 是一个数据源 Task
.then(parse_json) \
.fork() \
.to(filter_errors, send_alert) \
.to(extract_fields, store_to_db) \
.merge() \
.then(generate_stats) \
.sink(write_output)
# 运行 Pipeline
pipeline.run()
核心特性
- 数据血缘追踪:框架为每一条数据记录分配全局唯一的 ID,并记录它经过的每一个 Task。
- 可观测性:自动采集吞吐量、延迟、队列深度等指标,支持 Prometheus 集成。
- 断点续传:框架定期保存每个 Task 的消费位点,支持从故障中恢复。
- 动态配置:支持运行时修改 Task 配置,无需重启 Pipeline。
- 测试支持:提供内存数据源和数据汇,简化单元测试。
典型应用场景
- 实时日志分析:实时监控应用日志,错误日志发送告警,访问日志统计 QPS。
- 数据 ETL 管道:从业务库抽取数据,清洗转换后写入数仓,同时更新缓存。
- 物联网数据处理:接收设备传感器数据,异常检测,持久化,下发指令。
- AI 推理流水线:接收图片,预处理,模型推理,后处理,返回结果。
- 事件驱动工作流:用户操作触发一系列后续处理。
技术选型
- 核心依赖:asyncio, uvloop, structlog, prometheus_client, PyYAML
- Python 版本:3.9+
- 操作系统:Linux / macOS / Windows
部署形态
- 嵌入式部署:作为 Python 库直接嵌入现有应用。
- 独立服务:提供可选的 HTTP API 服务层。
- 容器化部署:提供官方 Docker 镜像。
- Kubernetes 部署:支持 K8s 部署,利用 Horizontal Pod Autoscaler 实现自动扩缩容。
与现有框架对比
| 特性 | Roal | Apache Flink | Apache Airflow | Prefect | Bytewax |
|---|---|---|---|---|---|
| 语言 | Python | Java/Python | Python | Python | Python |
| 编程模型 | 纯函数式 Task | 富算子 | DAG 定义 | 函数式 Flow | 数据流 |
| 部署复杂度 | 极低,pip install | 高,需要集群 | 中,需要调度器 | 低 | 低 |
| 学习曲线 | 几分钟 | 数周 | 数天 | 数小时 | 数小时 |
| 实时处理 | 原生支持 | 原生支持 | 分钟级调度 | 支持 | 原生支持 |
| 状态管理 | 可选,开发者控制 | 强制,框架管理 | 有限(XCom) | 支持 | 支持 |
| Python 生态集成 | 无缝 | 通过 PyFlink | 一般 | 良好 | 良好 |
| 适用场景 | 通用数据编排 | 大规模流计算 | 定时任务调度 | 数据工程 | 流处理 |
设计原则
- 简单至上:API 应该让开发者在 5 分钟内理解并开始使用
- 零侵入:不强制继承基类,普通函数即可成为 Task
- 显式优于隐式:框架行为可预测,避免魔法
- 渐进式复杂度:简单场景只需几行代码,复杂场景提供完整扩展能力
- Pythonic:遵循 Python 社区惯例,利用语言特性简化设计
- 测试友好:提供一流测试支持,鼓励 TDD
项目结构
roal/
├── pyproject.toml # 项目配置和依赖
├── README.md
├── docs/ # 文档
│ ├── getting-started.md
│ ├── core-concepts.md
│ ├── advanced/
│ └── api-reference/
├── roal/ # 核心源码
│ ├── __init__.py
│ ├── api/ # 公开 API
│ │ ├── __init__.py
│ │ ├── task.py # Task 协议和装饰器
│ │ ├── pipeline.py # Pipeline 类
│ │ └── context.py # TaskContext 类
│ ├── core/ # 内部实现
│ │ ├── dag.py # DAG 构建和拓扑排序
│ │ ├── router.py # 路由策略实现
│ │ └── executor.py # 执行引擎
│ ├── runtime/ # 运行时
│ │ ├── worker.py # Worker 协程管理
│ │ ├── queue.py # 异步队列封装
│ │ └── backpressure.py # 背压控制
│ ├── state/ # 状态管理
│ │ ├── base.py # StateStore 抽象
│ │ ├── memory.py # 内存存储
│ │ ├── redis.py # Redis 存储
│ │ └── file.py # 文件存储
│ ├── checkpoint/ # 断点续传
│ │ ├── manager.py
│ │ └── stores.py
│ ├── monitoring/ # 监控指标
│ │ ├── metrics.py
│ │ ├── tracing.py
│ │ └── prometheus.py
│ └── testing/ # 测试工具
│ ├── harness.py
│ └── fixtures.py
├── examples/ # 示例代码
│ ├── wordcount/
│ ├── log_analysis/
│ ├── etl_pipeline/
│ └── iot_processing/
└── tests/ # 测试
├── unit/
├── integration/
└── performance/
版本路线图
| 版本 | 里程碑 | 核心功能 |
|---|---|---|
| 0.1.0 | MVP | 基础 Task 和 Pipeline,支持串联和分支,asyncio 执行器 |
| 0.2.0 | 路由增强 | 条件路由、哈希路由、广播路由,Worker 并发 |
| 0.3.0 | 状态管理 | StateStore 抽象,内存/Redis 实现 |
| 0.4.0 | 可观测性 | 指标收集,Prometheus 集成,结构化日志 |
| 0.5.0 | 容错能力 | 断点续传,重试策略,死信队列 |
| 0.6.0 | 动态能力 | 运行时配置更新,动态扩缩容 |
| 0.7.0 | 进程扩展 | 多进程支持,CPU 密集型任务隔离 |
| 0.8.0 | API 服务 | HTTP API,Web UI 基础 |
| 0.9.0 | 生态集成 | 官方连接器(Kafka、Redis、PostgreSQL) |
| 1.0.0 | 稳定版 | API 稳定,生产就绪,完整文档 |
贡献指南
Roal 采用开放治理模式,欢迎社区贡献:
- 代码贡献:Fork → Feature Branch → Pull Request
- 文档改进:直接提交 PR 到 docs 目录
- Bug 报告:使用 GitHub Issues,附带最小复现示例
- 功能建议:先在 Discussions 中讨论,达成共识后再实现
开发环境搭建:
git clone https://github.com/RestRegular/roal.git
cd roal
python -m venv venv
source venv/bin/activate
pip install -e ".[dev]"
pytest
Roal —— 让数据流转像呼吸一样自然
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file roal-0.7.0.tar.gz.
File metadata
- Download URL: roal-0.7.0.tar.gz
- Upload date:
- Size: 44.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
78d6fa9b955574f44b2dceea16c61109d870784a51880534972fee1475ac86f9
|
|
| MD5 |
5e8bd7de64ad2be3d5c5b06374cb3efe
|
|
| BLAKE2b-256 |
a96d53b255c15741c741ff52dda5fb54dff9ffb9693fcf367a65eb815f1d1f2c
|
File details
Details for the file roal-0.7.0-py3-none-any.whl.
File metadata
- Download URL: roal-0.7.0-py3-none-any.whl
- Upload date:
- Size: 54.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1b5d25614f9fd55db2e5a41ba6a6307fb201d6061ff9bef80daa1c0cfeae6297
|
|
| MD5 |
91fedb18e76a718a2ea0314dc64cb8d7
|
|
| BLAKE2b-256 |
d3cb823445a9051ce513f66869fedce434391d2802dc1c5457bfec7705b8bf13
|