A lightweight Python framework for building asynchronous data processing pipelines with stateful nodes
Project description
OoFlow
A lightweight AI-Ready Python framework for building asynchronous data processing pipelines with stateful nodes.
Chinese version description please see below : D
文件下半部分有中文说明哟 : D
Quick Start
import asyncio
import ooflow
@ooflow.Node
async def A(ctx: ooflow.Context):
count = 0
while True:
count = count + 1
msg = await ctx.fetch()
await ctx.emit(f"{count} {msg} Hello")
@ooflow.Node
async def B(ctx: ooflow.Context):
while True:
msg = await ctx.fetch()
await ctx.emit(f"{msg} World")
@ooflow.Node
async def C(ctx: ooflow.Context):
while True:
msg = await ctx.fetch()
await ctx.emit(f"{msg}!")
async def main():
# Create and run the flow
flow = ooflow.create(
A.to(B),
B.to(C)
)
flow.run()
count_down = 3
while count_down > 0:
count_down = count_down - 1
await flow.emit("start")
print(await flow.fetch())
await asyncio.sleep(1)
flow.stop()
if __name__ == "__main__":
asyncio.run(main())
Installation
Requirements: Python 3.9+
pip install ooflow
Features
What's the biggest difference between OoFlow and other frameworks? Your processing functions/methods are stateful, making logic that depends on previous messages possible.
Other Keypoints:
- 🎯 Stateful Nodes: Nodes maintain state across messages, enabling complex business logic implementation
- 📝 Super Easy To Use APIs: Intuitive APIs, no complex Python package dependencies
- 🤖 AI Ready: Easy to process AI-related streaming messages, such as chat, pic, audio, video
- 🔄 Asynchronous Processing: Built on Python's asyncio for high-performance concurrent execution
- 📊 Flexible Topology: Support for complex graph structures including branching and merging and cycling
- ⚡ Non-blocking Communication: Efficient message passing between nodes
- 🛡️ Type Safety: Full type hints support with runtime validation
Core Concepts
Nodes
Nodes are the basic processing units in OoFlow. They are defined using the @ooflow.Node decorator:
@ooflow.Node
async def my_processor(ctx: ooflow.Context):
# Fetch data from predecessor nodes
while True:
data = await ctx.fetch()
# data_from_A = await ctx.fetch(A) / ctx.fetch_nowait(A)
# data_from_B = await ctx.fetch(B) / ctx.fetch_nowait(B)
# data_from_A_or_B = await ctx.fetch([A, B]) / ctx.fetch_nowait([A, B])
##############################
# Your processing logic here #
##############################
# Send result to successor nodes
await ctx.emit(result)
# await ctx.emit(result, C) / ctx.emit_nowait(result, C)
# await ctx.emit(result, D) / ctx.emit_nowait(result, D)
# await ctx.emit(result, [C, D]) / ctx.emit_nowait(result, [C, D])
Context
Each node receives a Context object that provides methods for communication:
await ctx.fetch()- Receive messages from all predecessor nodesawait ctx.emit(data)- Send messages to all successor nodesctx.fetch_nowait()- Non-blocking message retrievalctx.emit_nowait(data)- Non-blocking message sending
If you want to specify the source / target nodes to fetch from / emit to, you can:
ctx.fetch_nowait(A)orawait ctx.fetch(A)- Receive messages only from node Actx.fetch_nowait([A, B])orawait ctx.fetch([A, B])- Receive messages only from nodes A and Bctx.emit_nowait(data, C)orawait ctx.emit(data, C)- Send messages only to node Cctx.emit_nowait(data, [C, D])orawait ctx.emit(data, [C, D])- Send messages only to nodes C and D
Flow Creation
Connect nodes to create processing pipelines:
"""
Flow topology diagram:
A
│
▼
B
╱ ╲
▼ ▼
C D
╲ ╱
▼
E
"""
flow = ooflow.create(
A.to(B), # A → B
B.to(C, D), # B → C, D (branching)
C.to(E), # C → E
D.to(E) # D → E (merging)
)
Advanced Examples
Branching and Merging
@ooflow.Node
async def splitter(ctx: ooflow.Context):
data = await ctx.fetch()
# Send to multiple nodes
await ctx.emit(data, [branch1, branch2])
@ooflow.Node
async def branch1(ctx: ooflow.Context):
data = await ctx.fetch()
result = await process_branch1(data)
await ctx.emit(result)
@ooflow.Node
async def branch2(ctx: ooflow.Context):
data = await ctx.fetch()
result = await process_branch2(data)
await ctx.emit(result)
@ooflow.Node
async def merger(ctx: ooflow.Context):
# Collect from both branches
result1 = await ctx.fetch(branch1)
result2 = await ctx.fetch(branch2)
combined = combine_results(result1, result2)
await ctx.emit(combined)
# Create the flow
flow = ooflow.create(
splitter.to(branch1, branch2),
branch1.to(merger),
branch2.to(merger)
)
Method Decoration
class DataProcessor:
def __init__(self, multiplier=2):
self.multiplier = multiplier
self.processed_count = 0
@ooflow.Node
async def instance_method(self, ctx: ooflow.Context):
"""Instance method as a node - can access instance state"""
while True:
data = await ctx.fetch()
result = data * self.multiplier
self.processed_count += 1
await ctx.emit({"result": result, "count": self.processed_count})
@classmethod
@ooflow.Node
async def class_method(cls, ctx: ooflow.Context):
"""Class method as a node - can access class-level information"""
while True:
data = await ctx.fetch()
result = {"processed_by": cls.__name__, "data": data}
await ctx.emit(result)
@staticmethod
@ooflow.Node
async def static_method(ctx: ooflow.Context):
"""Static method as a node - pure function behavior"""
while True:
data = await ctx.fetch()
result = data.upper() if isinstance(data, str) else str(data).upper()
await ctx.emit(result)
async def main():
# Create processor instance
processor = DataProcessor(multiplier=3)
# Create flow using different method types
flow = ooflow.create(
processor.instance_method.to(processor.class_method),
processor.class_method.to(processor.static_method)
)
flow.run()
count_down = 3
while count_down > 0:
count_down = count_down - 1
await flow.emit("Hello")
print(await flow.fetch())
await asyncio.sleep(1)
flow.stop()
if __name__ == "__main__":
asyncio.run(main())
License
MIT License - see LICENSE file for details. If you use OoFlow in your project, please cite this repo.
OoFlow
一个轻量级的 Python 框架,用于构建有状态节点的数据处理图。
快速开始
import asyncio
import ooflow
@ooflow.Node
async def A(ctx: ooflow.Context):
count = 0
while True:
count = count + 1
msg = await ctx.fetch()
await ctx.emit(f"{count} {msg} Hello")
@ooflow.Node
async def B(ctx: ooflow.Context):
while True:
msg = await ctx.fetch()
await ctx.emit(f"{msg} World")
@ooflow.Node
async def C(ctx: ooflow.Context):
while True:
msg = await ctx.fetch()
await ctx.emit(f"{msg}!")
async def main():
# 创建并运行流程
flow = ooflow.create(
A.to(B),
B.to(C)
)
flow.run()
count_down = 3
while count_down > 0:
count_down = count_down - 1
await flow.emit("start")
print(await flow.fetch())
await asyncio.sleep(1)
flow.stop()
if __name__ == "__main__":
asyncio.run(main())
安装
pip install ooflow
特性
OoFlow 与其它框架最大的区别是什么? 你的函数/方法节点是有状态的,可以做跨消息逻辑的实现。
其他要点:
- 🎯 状态节点: 带状态的处理节点,支持你写更丰富的业务逻辑
- 📝 超级易用的 API: 极其直观的使用方法,没有复杂 Python 包依赖
- 🤖 AI Ready: 易于处理 AI 相关的流式消息,如聊天、图片、音频、视频
- 🔄 异步处理: 基于 Python asyncio 构建,支持高性能并发执行
- 📊 灵活拓扑: 支持复杂的图结构,包括分支、合并和循环
- ⚡ 非阻塞通信: 节点间高效的消息传递
- 🛡️ 类型安全: 完整的类型提示支持和运行时验证
核心概念
节点
节点是 OoFlow 中的基本处理单元,使用 @ooflow.Node 装饰器定义:
@ooflow.Node
async def my_processor(ctx: ooflow.Context):
# 从前驱节点获取数据
while True:
data = await ctx.fetch()
# data_from_A = await ctx.fetch(A) / ctx.fetch_nowait(A)
# data_from_B = await ctx.fetch(B) / ctx.fetch_nowait(B)
# data_from_A_or_B = await ctx.fetch([A, B]) / ctx.fetch_nowait([A, B])
##############################
# 你的处理逻辑写在这儿 #
##############################
# 将结果发送到后继节点
await ctx.emit(result)
# await ctx.emit(result, C) / ctx.emit_nowait(result, C)
# await ctx.emit(result, D) / ctx.emit_nowait(result, D)
# await ctx.emit(result, [C, D]) / ctx.emit_nowait(result, [C, D])
上下文
每个节点接收一个 Context 对象,提供通信方法:
await ctx.fetch()- 从所有前驱节点接收消息await ctx.emit(data)- 向所有后继节点发送消息ctx.fetch_nowait()- 非阻塞消息检索ctx.emit_nowait(data)- 非阻塞消息发送
如果你想指定从哪些源节点获取数据或向哪些目标节点发送数据,你可以:
ctx.fetch_nowait(A)或await ctx.fetch(A)- 仅从节点 A 接收消息ctx.fetch_nowait([A, B])或await ctx.fetch([A, B])- 仅从节点 A 和 B 接收消息ctx.emit_nowait(data, C)或await ctx.emit(data, C)- 仅向节点 C 发送消息ctx.emit_nowait(data, [C, D])或await ctx.emit(data, [C, D])- 仅向节点 C 和 D 发送消息
流程创建
连接节点以创建处理管道:
"""
流程拓扑图:
A
│
▼
B
╱ ╲
▼ ▼
C D
╲ ╱
▼
E
"""
flow = ooflow.create(
A.to(B), # A → B
B.to(C, D), # B → C, D (分支)
C.to(E), # C → E
D.to(E) # D → E (合并)
)
高级示例
分支和合并
@ooflow.Node
async def splitter(ctx: ooflow.Context):
data = await ctx.fetch()
# 发送到多个节点
await ctx.emit(data, [branch1, branch2])
@ooflow.Node
async def branch1(ctx: ooflow.Context):
data = await ctx.fetch()
result = await process_branch1(data)
await ctx.emit(result)
@ooflow.Node
async def branch2(ctx: ooflow.Context):
data = await ctx.fetch()
result = await process_branch2(data)
await ctx.emit(result)
@ooflow.Node
async def merger(ctx: ooflow.Context):
# 从两个分支收集数据
result1 = await ctx.fetch(branch1)
result2 = await ctx.fetch(branch2)
combined = combine_results(result1, result2)
await ctx.emit(combined)
# 创建流程
flow = ooflow.create(
splitter.to(branch1, branch2),
branch1.to(merger),
branch2.to(merger)
)
方法装饰
class DataProcessor:
def __init__(self, multiplier=2):
self.multiplier = multiplier
self.processed_count = 0
@ooflow.Node
async def instance_method(self, ctx: ooflow.Context):
"""实例方法作为节点 - 可以访问实例状态"""
while True:
data = await ctx.fetch()
result = data * self.multiplier
self.processed_count += 1
await ctx.emit({"result": result, "count": self.processed_count})
@classmethod
@ooflow.Node
async def class_method(cls, ctx: ooflow.Context):
"""类方法作为节点 - 可以访问类级别信息"""
while True:
data = await ctx.fetch()
result = {"processed_by": cls.__name__, "data": data}
await ctx.emit(result)
@staticmethod
@ooflow.Node
async def static_method(ctx: ooflow.Context):
"""静态方法作为节点 - 纯函数行为"""
while True:
data = await ctx.fetch()
result = data.upper() if isinstance(data, str) else str(data).upper()
await ctx.emit(result)
async def main():
# 创建处理器实例
processor = DataProcessor(multiplier=3)
# 使用不同方法类型创建流程
flow = ooflow.create(
processor.instance_method.to(processor.class_method),
processor.class_method.to(processor.static_method)
)
flow.run()
count_down = 3
while count_down > 0:
count_down = count_down - 1
await flow.emit("Hello")
print(await flow.fetch())
await asyncio.sleep(1)
flow.stop()
if __name__ == "__main__":
asyncio.run(main())
许可证
MIT 许可证 - 详见 LICENSE 文件。 如果你在项目中用了 OoFlow,麻烦也注明、引用下我的仓库,谢谢!
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file ooflow-0.2.1.tar.gz.
File metadata
- Download URL: ooflow-0.2.1.tar.gz
- Upload date:
- Size: 21.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e099ee0cc9f5b1bb613d80580cb738af254c487c6f5b9407126c4791c53c39b2
|
|
| MD5 |
ab7ddbfbc667c19639b63296b7f040f3
|
|
| BLAKE2b-256 |
91b9babf888dba18adbdc647337921f1a8615ebd48d040d218f17fc82d8abc9b
|
Provenance
The following attestation bundles were made for ooflow-0.2.1.tar.gz:
Publisher:
publish.yml on fanfank/ooflow
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
ooflow-0.2.1.tar.gz -
Subject digest:
e099ee0cc9f5b1bb613d80580cb738af254c487c6f5b9407126c4791c53c39b2 - Sigstore transparency entry: 731664744
- Sigstore integration time:
-
Permalink:
fanfank/ooflow@baf0ff2ed4d7803d89c31b97dae339195ed09533 -
Branch / Tag:
refs/tags/v0.2.1 - Owner: https://github.com/fanfank
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@baf0ff2ed4d7803d89c31b97dae339195ed09533 -
Trigger Event:
release
-
Statement type:
File details
Details for the file ooflow-0.2.1-py3-none-any.whl.
File metadata
- Download URL: ooflow-0.2.1-py3-none-any.whl
- Upload date:
- Size: 10.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b41711e03e7b862b0f31bba001042996b491714f15a2168f394e0a8fc4c245fd
|
|
| MD5 |
a44460bd942485ccb6084c0cc87903c5
|
|
| BLAKE2b-256 |
b2203818b555e67fe9cb79c52b4370a74ea2a66626617700fd963d954a588704
|
Provenance
The following attestation bundles were made for ooflow-0.2.1-py3-none-any.whl:
Publisher:
publish.yml on fanfank/ooflow
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
ooflow-0.2.1-py3-none-any.whl -
Subject digest:
b41711e03e7b862b0f31bba001042996b491714f15a2168f394e0a8fc4c245fd - Sigstore transparency entry: 731664745
- Sigstore integration time:
-
Permalink:
fanfank/ooflow@baf0ff2ed4d7803d89c31b97dae339195ed09533 -
Branch / Tag:
refs/tags/v0.2.1 - Owner: https://github.com/fanfank
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@baf0ff2ed4d7803d89c31b97dae339195ed09533 -
Trigger Event:
release
-
Statement type: