Rx-Rust — Reactive Extensions for Python, powered by Rust (PyO3). A high-performance reactive programming library with Observable/Observer/Subject/Scheduler support.
Project description
RxPY
Reactive Extensions for Python — powered by Rust 🦀
RxPY 是一个用于组合异步和基于事件的程序的 Python 库,灵感来自微软的 Reactive Extensions (Rx)。 它通过 PyO3 绑定高性能 Rust 实现,提供零开销的响应式编程体验。
✨ 特性
核心类型
Observable— 在未来可能发射 0 个或多个值的惰性序列Subscription— 订阅句柄,支持dispose()取消订阅Observer— 订阅者回调(on_next/on_completed)
Subjects(广播器)
PublishSubject— 广播型主题,向所有订阅者发送值BehaviorSubject— 带"当前值"的主题,新订阅者会立即收到最新值ReplaySubject— 重放历史值的主题,缓存最近 N 个值
调度器
CurrentThreadScheduler— 在当前线程同步执行ImmediateScheduler— 立即执行,不做调度ThreadPoolScheduler— 在线程池中并发执行AsyncScheduler— 异步执行
操作符(全部方法链式可组合)
| 类别 | 操作符 |
|---|---|
| 创建 | of, from_iter, range, repeat, empty, never |
| 转换 | map, flat_map, scan, reduce |
| 过滤 | filter, take, skip, first, take_while, skip_while, distinct, distinct_until_changed, element_at, default_if_empty, ignore_elements |
| 组合 | merge, concat, zip, combine_latest, switch_map |
| 数学 | count, sum, min, max, average |
| 时间 | debounce, throttle, timeout |
| 错误 | catch_error, on_error_resume_next, retry, retry_when |
| 调度 | subscribe_on, observe_on |
🚀 快速开始
安装
# 从 PyPI 安装(推荐)
pip install rxpy
# 或者从源码构建(需要 Rust 工具链)
git clone https://gitcode.com/VictorTop/Rx-Rust.git
cd Rx-Rust/rxpy
pip install maturin
maturin develop
三分钟第一个程序
import rxpy
# 1. 创建: 从 1 到 5 的序列
observable = rxpy.Observable.range(1, 5)
# 2. 管道: 过滤偶数 + 平方
processed = (
observable
.filter(lambda x: x % 2 == 0)
.map(lambda x: x * x)
)
# 3. 订阅并打印
sub = processed.subscribe(
on_next=lambda v: print(f"收到: {v}"),
on_completed=lambda: print("完成!"),
)
# 也可以用上下文管理器自动取消
# with processed.subscribe(on_next=print) as sub:
# pass # 退出时自动 sub.dispose()
运行结果:
收到: 4
收到: 16
完成!
📚 使用指南
1. Subject 示例:事件总线
import rxpy
# 创建一个发布主题
subject = rxpy.PublishSubject()
# 订阅者 A
result_a = []
subject.subscribe(
on_next=lambda v: result_a.append(("A", v)),
on_completed=lambda: result_a.append("A done"),
)
# 发射值
subject.on_next(1) # A 收到 1
subject.on_next(2) # A 收到 2
# 订阅者 B(迟到的订阅者)
result_b = []
subject.subscribe(
on_next=lambda v: result_b.append(("B", v)),
on_completed=lambda: result_b.append("B done"),
)
subject.on_next(3) # A 和 B 都收到 3
subject.on_completed() # A 和 B 都收到完成信号
print(result_a) # [("A", 1), ("A", 2), ("A", 3), "A done"]
print(result_b) # [("B", 3), "B done"]
2. BehaviorSubject:带状态的主题
import rxpy
# 初始值为 0
subject = rxpy.BehaviorSubject(0)
# 订阅者 A 立即收到 0
result_a = []
subject.subscribe(on_next=lambda v: result_a.append(v))
subject.on_next(1) # A 收到 1
subject.on_next(2) # A 收到 2
# 订阅者 B 立即收到当前值 2,以及后续值
result_b = []
subject.subscribe(on_next=lambda v: result_b.append(v))
subject.on_next(3) # A 和 B 都收到 3
print(result_a) # [0, 1, 2, 3]
print(result_b) # [2, 3]
3. ReplaySubject:重放历史
import rxpy
# 缓存最近 2 个值
subject = rxpy.ReplaySubject(2)
subject.on_next(1)
subject.on_next(2)
subject.on_next(3)
# 订阅者会收到缓存的 2 和 3
result = []
subject.subscribe(on_next=lambda v: result.append(v))
subject.on_completed()
print(result) # [2, 3]
4. 数学与聚合
import rxpy
# 求和
result = (
rxpy.Observable.from_iter([1, 2, 3, 4, 5])
.reduce(0, lambda acc, x: acc + x)
.collect()
)
print(result) # [15]
# 使用 collect() 直接收集所有值到列表
values = rxpy.Observable.range(1, 5).collect()
print(values) # [1, 2, 3, 4, 5]
5. 过滤与转换管道
import rxpy
result = (
rxpy.Observable.from_iter(range(1, 11)) # 1..10
.filter(lambda x: x % 2 == 0) # 只保留偶数: 2, 4, 6, 8, 10
.map(lambda x: x * 2) # 翻倍: 4, 8, 12, 16, 20
.take(3) # 只取前 3 个: 4, 8, 12
.collect()
)
print(result) # [4, 8, 12]
🏗️ 架构
+-------------------+
| Python API |
| (rxpy/__init__.py)|
+---------+---------+
|
绑定
|
+---------v---------+
| PyO3 FFI |
| (Cargo.toml + |
| src/lib.rs) |
+---------+---------+
|
调用
|
+------------------------v------------------------+
| rx-rust (Rust 核心库) |
| src/lib.rs + src/observable + src/operators/ |
| + src/subject + src/scheduler + src/observer |
+------------------------+------------------------+
|
62 个单元测试
|
+--------v--------+
| cargo test ✅ |
+-----------------+
RxPY 构建在 Rust 库 rx-rust 之上,所有核心逻辑(Observable、操作符、Subject、调度器)均由 Rust 实现,并通过 PyO3 暴露给 Python。
📖 更多文档
🔧 开发与测试
环境要求
- Rust 1.75+
- Python 3.8+
- Maturin
构建
cd rxpy
pip install maturin
maturin develop # 开发模式安装
测试
# Rust 测试(在 rx-rust 目录下)
cd ../rx-rust
cargo test
# Python 测试(在 rxpy 目录下)
cd ../rxpy
python -c "
import rxpy
# 基本功能测试
result = rxpy.Observable.range(1, 5).filter(lambda x: x % 2 == 0).map(lambda x: x * 10).collect()
print('PASS:', result == [20, 40])
"
📄 许可证
MIT License © RxPY Contributors
请查看 LICENSE 文件以获取完整的许可证文本。
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file rx_rust-0.1.0.tar.gz.
File metadata
- Download URL: rx_rust-0.1.0.tar.gz
- Upload date:
- Size: 48.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
aaa92efb5a7b616568af2ae240d17b0545a8c9d4d731edc31da02abb68693cfb
|
|
| MD5 |
aae291bcda1b23ca30c47e1f0da48f2a
|
|
| BLAKE2b-256 |
7a79f356fdbf6d1cdb4c5f5945a2450f475bbe3201a2a0afd6a9cab59b49e7ec
|
File details
Details for the file rx_rust-0.1.0-py3-none-any.whl.
File metadata
- Download URL: rx_rust-0.1.0-py3-none-any.whl
- Upload date:
- Size: 8.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3afebd346e7b1228d9fc8db24c461c532dd62963eddb37606105c097934a2cef
|
|
| MD5 |
2167e1ac5a7ffff45ac461753eac4d43
|
|
| BLAKE2b-256 |
64efef69e0418dccb21229d393b923e6b204c05d7e48805be800f2f09e70d55e
|