A powerful Python framework for building declarative, concurrent data processing workflows
Project description
Streamlet - 智能流式数据处理框架
声明式数据流处理框架:用方法链表达业务逻辑,框架自动处理异步/同步混合执行、并行调度和重试。
- 🎯 声明式工作流:
.then().fan_out_to().fan_in().branch_on().repeat()方法链构建数据流 - 🤖 智能异步执行:自动检测 async/sync 函数并选择正确的执行策略,无需手动协调
- 🔗 @node 装饰器:任意函数变为可组合节点,内置 pydantic 类型校验和依赖注入
- 🛡️ 重试机制:基于异常分类的可配置指数退避重试
快速开始
pip install streamlet
from streamlet import node
@node
def double(x: int) -> int:
return x * 2
@node
def add_ten(x: int) -> int:
return x + 10
result = double.then(add_ten)(5) # 20
核心 API
| 方法 | 功能 | 示例 |
|---|---|---|
.then(node) |
顺序连接 | a.then(b)(data) |
.fan_out_to([nodes], executor="thread") |
并行分发 | a.fan_out_to([b, c])() |
.fan_in(aggregator) |
聚合并行结果 | flow.fan_in(merge)() |
.fan_out_in([nodes], agg) |
扇出 + 聚合 | a.fan_out_in([b, c], merge)() |
.branch_on({key: node}) |
条件分支 | a.branch_on({True: b, False: c})() |
.repeat(times) |
重复执行 | a.repeat(3)(data) |
示例
顺序流:ETL 管道
from streamlet import node
import asyncio
@node
async def fetch_data(source: str) -> dict:
await asyncio.sleep(0.1)
return {"value": 100, "source": source}
@node
def validate(data: dict) -> dict:
if data["value"] <= 0:
raise ValueError("invalid value")
return data
@node
def enrich(data: dict) -> dict:
return {**data, "doubled": data["value"] * 2}
pipeline = fetch_data.then(validate).then(enrich)
async def main():
result = await pipeline("db")
print(result) # {"value": 100, "source": "db", "doubled": 200}
asyncio.run(main())
并行流:扇出 + 聚合
from streamlet import node
@node
def source(x: int) -> dict:
return {"value": x}
@node
def multiply(data: dict) -> int:
return data["value"] * 2
@node
def add_ten(data: dict) -> int:
return data["value"] + 10
@node
def aggregate(results: dict) -> dict:
values = [r.result for r in results.values() if r.success]
return {"total": sum(values), "results": values}
workflow = source.fan_out_to([multiply, add_ten], executor="thread").fan_in(aggregate)
result = workflow(5)
print(result) # {"total": 25, "results": [10, 15]}
条件流:分支路由 + 依赖注入
from streamlet import BaseFlowContext, node
from dependency_injector.wiring import Provide
container = BaseFlowContext()
@node
def evaluate(data: dict) -> str:
return "pass" if data["score"] >= 60 else "fail"
@node
def handle_pass(state: dict = Provide[BaseFlowContext.state]) -> dict:
return {"result": "pass", "score": state["score"]}
@node
def handle_fail(state: dict = Provide[BaseFlowContext.state]) -> dict:
return {"result": "fail", "score": state["score"]}
container.wire(modules=[__name__])
container.state()["score"] = 75
flow = evaluate.branch_on({"pass": handle_pass, "fail": handle_fail})
print(flow({"score": 75})) # {"result": "pass", "score": 75}
重试机制
from streamlet import node
@node(retry_count=3, retry_delay=0.5, backoff_factor=2.0, enable_retry=True)
def external_call(x: int) -> int:
# 失败时自动重试,延迟按 0.5s → 1.0s → 2.0s 指数增长
return call_external_api(x)
开发环境
git clone https://github.com/12306hujunjie/Streamlet.git
cd Streamlet
pdm install
pdm run pytest # 运行测试
pdm run pytest --cov=src/streamlet # 覆盖率
pdm run ruff check src/ tests/ # 代码检查
pdm run mypy src/streamlet/ # 类型检查
技术栈
- Python 3.10+
- dependency-injector — 依赖注入与线程安全状态管理
- pydantic v2 — 类型校验
核心模块:asyncio | threading | concurrent.futures
文档
许可证
MIT — 详见 LICENSE
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
streamlet_py-0.0.1.tar.gz
(27.6 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file streamlet_py-0.0.1.tar.gz.
File metadata
- Download URL: streamlet_py-0.0.1.tar.gz
- Upload date:
- Size: 27.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.18
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
06d72e433d0c63f9d19610c67b4856ebb4733ede766bd22b14696091084b6b56
|
|
| MD5 |
037dd1edd1c9b96c49633ac8fdec6b25
|
|
| BLAKE2b-256 |
9ac17fc8826b964abb9691f177379daa15d86593116276ba09c2403594c89ae9
|
File details
Details for the file streamlet_py-0.0.1-py3-none-any.whl.
File metadata
- Download URL: streamlet_py-0.0.1-py3-none-any.whl
- Upload date:
- Size: 15.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.18
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
242e268e92ee09703f9ad37e6bee9232195015ff9db53b982c7d11d59f5f9374
|
|
| MD5 |
62f4a5e79d9ed8d9f5c0fb4e75cd1c95
|
|
| BLAKE2b-256 |
4a48dc4dac3b6c927becda44aeae07955443d6179ef79b799f732650b879a25d
|