A powerful Python framework for building declarative, concurrent data processing workflows
Project description
Streamlet - 智能流式数据处理框架
声明式数据流处理框架:用方法链表达业务逻辑,框架自动处理异步/同步混合执行、并行调度和重试。
- 🎯 声明式工作流:
.then().fan_out_to().fan_in().branch_on().repeat()方法链构建数据流 - 🤖 智能异步执行:自动检测 async/sync 函数并选择正确的执行策略,无需手动协调
- 🔗 @node 装饰器:任意函数变为可组合节点,内置 pydantic 类型校验和依赖注入
- 🛡️ 重试机制:基于异常分类的可配置指数退避重试
快速开始
pip install streamlet-py
from streamlet import node
@node
def double(x: int) -> int:
return x * 2
@node
def add_ten(x: int) -> int:
return x + 10
result = double.then(add_ten)(5) # 20
核心 API
| 方法 | 功能 | 示例 |
|---|---|---|
.then(node) |
顺序连接 | a.then(b)(data) |
.fan_out_to([nodes], executor="thread") |
并行分发 | a.fan_out_to([b, c])() |
.fan_in(aggregator) |
聚合并行结果 | flow.fan_in(merge)() |
.fan_out_in([nodes], agg) |
扇出 + 聚合 | a.fan_out_in([b, c], merge)() |
.branch_on({key: node}) |
条件分支 | a.branch_on({True: b, False: c})() |
.repeat(times) |
重复执行 | a.repeat(3)(data) |
示例
顺序流:ETL 管道
from streamlet import node
import asyncio
@node
async def fetch_data(source: str) -> dict:
await asyncio.sleep(0.1)
return {"value": 100, "source": source}
@node
def validate(data: dict) -> dict:
if data["value"] <= 0:
raise ValueError("invalid value")
return data
@node
def enrich(data: dict) -> dict:
return {**data, "doubled": data["value"] * 2}
pipeline = fetch_data.then(validate).then(enrich)
async def main():
result = await pipeline("db")
print(result) # {"value": 100, "source": "db", "doubled": 200}
asyncio.run(main())
并行流:扇出 + 聚合
from streamlet import node
@node
def source(x: int) -> dict:
return {"value": x}
@node
def multiply(data: dict) -> int:
return data["value"] * 2
@node
def add_ten(data: dict) -> int:
return data["value"] + 10
@node
def aggregate(results: dict) -> dict:
values = [r.result for r in results.values() if r.success]
return {"total": sum(values), "results": values}
workflow = source.fan_out_to([multiply, add_ten], executor="thread").fan_in(aggregate)
result = workflow(5)
print(result) # {"total": 25, "results": [10, 15]}
条件流:分支路由 + 依赖注入
from streamlet import BaseFlowContext, node
from dependency_injector.wiring import Provide
container = BaseFlowContext()
@node
def evaluate(data: dict) -> str:
return "pass" if data["score"] >= 60 else "fail"
@node
def handle_pass(state: dict = Provide[BaseFlowContext.state]) -> dict:
return {"result": "pass", "score": state["score"]}
@node
def handle_fail(state: dict = Provide[BaseFlowContext.state]) -> dict:
return {"result": "fail", "score": state["score"]}
container.wire(modules=[__name__])
container.state()["score"] = 75
flow = evaluate.branch_on({"pass": handle_pass, "fail": handle_fail})
print(flow({"score": 75})) # {"result": "pass", "score": 75}
重试机制
from streamlet import node
@node(retry_count=3, retry_delay=0.5, backoff_factor=2.0, enable_retry=True)
def external_call(x: int) -> int:
# 失败时自动重试,延迟按 0.5s → 1.0s → 2.0s 指数增长
return call_external_api(x)
开发环境
git clone https://github.com/12306hujunjie/Streamlet.git
cd Streamlet
pdm install
pdm run pytest # 运行测试
pdm run pytest --cov=src/streamlet # 覆盖率
pdm run ruff check src/ tests/ # 代码检查
pdm run mypy src/streamlet/ # 类型检查
技术栈
- Python 3.10+
- dependency-injector — 依赖注入与线程安全状态管理
- pydantic v2 — 类型校验
核心模块:asyncio | threading | concurrent.futures
文档
许可证
MIT — 详见 LICENSE
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
streamlet_py-0.0.2.tar.gz
(27.4 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file streamlet_py-0.0.2.tar.gz.
File metadata
- Download URL: streamlet_py-0.0.2.tar.gz
- Upload date:
- Size: 27.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: pdm/2.26.8 CPython/3.10.20 Linux/6.17.0-1010-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6b2430bd0720e317d55c9c0c1c5a3f852b7b76d94d8e0eba30513376c98a2ebd
|
|
| MD5 |
95a08fad64cc776ff494400053005dfa
|
|
| BLAKE2b-256 |
88ba1d18c646e17944fa7c3e5b51239d7928a916afd6ba6ac62f30e28d687224
|
File details
Details for the file streamlet_py-0.0.2-py3-none-any.whl.
File metadata
- Download URL: streamlet_py-0.0.2-py3-none-any.whl
- Upload date:
- Size: 15.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: pdm/2.26.8 CPython/3.10.20 Linux/6.17.0-1010-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9ccfb13c0ee643b7082a6ba10be7e6c29e0e24af37fd527d4901993f351f6c59
|
|
| MD5 |
615c50696e13947570d0fc01ae4d661b
|
|
| BLAKE2b-256 |
a9c7ec53f21fa417fee3681111bbf47a962391c8413827a10eb01be5b4ac7e12
|