Skip to main content

A powerful Python framework for building declarative, concurrent data processing workflows

Project description

Streamlet - 智能流式数据处理框架

Python License

声明式数据流处理框架:用方法链表达业务逻辑,框架自动处理异步/同步混合执行、并行调度和重试。

  • 🎯 声明式工作流.then() .fan_out_to() .fan_in() .branch_on() .repeat() 方法链构建数据流
  • 🤖 智能异步执行:自动检测 async/sync 函数并选择正确的执行策略,无需手动协调
  • 🔗 @node 装饰器:任意函数变为可组合节点,内置 pydantic 类型校验和依赖注入
  • 🛡️ 重试机制:基于异常分类的可配置指数退避重试

快速开始

pip install streamlet-py
from streamlet import node

@node
def double(x: int) -> int:
    return x * 2

@node
def add_ten(x: int) -> int:
    return x + 10

result = double.then(add_ten)(5)  # 20

核心 API

方法 功能 示例
.then(node) 顺序连接 a.then(b)(data)
.fan_out_to([nodes], executor="thread") 并行分发 a.fan_out_to([b, c])()
.fan_in(aggregator) 聚合并行结果 flow.fan_in(merge)()
.fan_out_in([nodes], agg) 扇出 + 聚合 a.fan_out_in([b, c], merge)()
.branch_on({key: node}) 条件分支 a.branch_on({True: b, False: c})()
.repeat(times) 重复执行 a.repeat(3)(data)

示例

顺序流:ETL 管道

from streamlet import node
import asyncio

@node
async def fetch_data(source: str) -> dict:
    await asyncio.sleep(0.1)
    return {"value": 100, "source": source}

@node
def validate(data: dict) -> dict:
    if data["value"] <= 0:
        raise ValueError("invalid value")
    return data

@node
def enrich(data: dict) -> dict:
    return {**data, "doubled": data["value"] * 2}

pipeline = fetch_data.then(validate).then(enrich)

async def main():
    result = await pipeline("db")
    print(result)  # {"value": 100, "source": "db", "doubled": 200}

asyncio.run(main())

并行流:扇出 + 聚合

from streamlet import node

@node
def source(x: int) -> dict:
    return {"value": x}

@node
def multiply(data: dict) -> int:
    return data["value"] * 2

@node
def add_ten(data: dict) -> int:
    return data["value"] + 10

@node
def aggregate(results: dict) -> dict:
    values = [r.result for r in results.values() if r.success]
    return {"total": sum(values), "results": values}

workflow = source.fan_out_to([multiply, add_ten], executor="thread").fan_in(aggregate)
result = workflow(5)
print(result)  # {"total": 25, "results": [10, 15]}

条件流:分支路由 + 依赖注入

from streamlet import BaseFlowContext, node
from dependency_injector.wiring import Provide

container = BaseFlowContext()

@node
def evaluate(data: dict) -> str:
    return "pass" if data["score"] >= 60 else "fail"

@node
def handle_pass(state: dict = Provide[BaseFlowContext.state]) -> dict:
    return {"result": "pass", "score": state["score"]}

@node
def handle_fail(state: dict = Provide[BaseFlowContext.state]) -> dict:
    return {"result": "fail", "score": state["score"]}

container.wire(modules=[__name__])
container.state()["score"] = 75

flow = evaluate.branch_on({"pass": handle_pass, "fail": handle_fail})
print(flow({"score": 75}))  # {"result": "pass", "score": 75}

重试机制

from streamlet import node

@node(retry_count=3, retry_delay=0.5, backoff_factor=2.0, enable_retry=True)
def external_call(x: int) -> int:
    # 失败时自动重试,延迟按 0.5s → 1.0s → 2.0s 指数增长
    return call_external_api(x)

开发环境

git clone https://github.com/12306hujunjie/Streamlet.git
cd Streamlet

pdm install

pdm run pytest                                   # 运行测试
pdm run pytest --cov=src/streamlet              # 覆盖率
pdm run ruff check src/ tests/                   # 代码检查
pdm run mypy src/streamlet/                     # 类型检查

技术栈

  • Python 3.10+
  • dependency-injector — 依赖注入与线程安全状态管理
  • pydantic v2 — 类型校验

核心模块:asyncio | threading | concurrent.futures

文档

许可证

MIT — 详见 LICENSE

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

streamlet_py-0.0.2.tar.gz (27.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

streamlet_py-0.0.2-py3-none-any.whl (15.2 kB view details)

Uploaded Python 3

File details

Details for the file streamlet_py-0.0.2.tar.gz.

File metadata

  • Download URL: streamlet_py-0.0.2.tar.gz
  • Upload date:
  • Size: 27.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: pdm/2.26.8 CPython/3.10.20 Linux/6.17.0-1010-azure

File hashes

Hashes for streamlet_py-0.0.2.tar.gz
Algorithm Hash digest
SHA256 6b2430bd0720e317d55c9c0c1c5a3f852b7b76d94d8e0eba30513376c98a2ebd
MD5 95a08fad64cc776ff494400053005dfa
BLAKE2b-256 88ba1d18c646e17944fa7c3e5b51239d7928a916afd6ba6ac62f30e28d687224

See more details on using hashes here.

File details

Details for the file streamlet_py-0.0.2-py3-none-any.whl.

File metadata

  • Download URL: streamlet_py-0.0.2-py3-none-any.whl
  • Upload date:
  • Size: 15.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: pdm/2.26.8 CPython/3.10.20 Linux/6.17.0-1010-azure

File hashes

Hashes for streamlet_py-0.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 9ccfb13c0ee643b7082a6ba10be7e6c29e0e24af37fd527d4901993f351f6c59
MD5 615c50696e13947570d0fc01ae4d661b
BLAKE2b-256 a9c7ec53f21fa417fee3681111bbf47a962391c8413827a10eb01be5b4ac7e12

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page