Skip to main content

A lightweight Python framework for building asynchronous data processing pipelines with stateful nodes

Project description

OoFlow

A lightweight AI-Ready Python framework for building asynchronous data processing pipelines with stateful nodes.

Chinese version description please see below : D

文件下半部分有中文说明哟 : D

Quick Start

import asyncio
import ooflow

@ooflow.Node
async def A(ctx: ooflow.Context):
    count = 0
    while True:
        count = count + 1
        msg = await ctx.fetch()
        await ctx.emit(f"{count} {msg} Hello")

@ooflow.Node
async def B(ctx: ooflow.Context):
    while True:
        msg = await ctx.fetch()
        await ctx.emit(f"{msg} World")

@ooflow.Node
async def C(ctx: ooflow.Context):
    while True:
        msg = await ctx.fetch()
        await ctx.emit(f"{msg}!")

async def main():
    # Create and run the flow
    flow = ooflow.create(
        A.to(B),
        B.to(C)
    )

    flow.run()
    count_down = 3
    while count_down > 0:
        count_down = count_down - 1

        await flow.emit("start")
        print(await flow.fetch())
        await asyncio.sleep(1)
    flow.stop()

if __name__ == "__main__":
    asyncio.run(main())

Installation

Requirements: Python 3.9+

pip install ooflow

Features

What's the biggest difference between OoFlow and other frameworks? Your processing functions/methods are stateful, making logic that depends on previous messages possible.

Other Keypoints:

  • 🎯 Stateful Nodes: Nodes maintain state across messages, enabling complex business logic implementation
  • 📝 Super Easy To Use APIs: Intuitive APIs, no complex Python package dependencies
  • 🤖 AI Ready: Easy to process AI-related streaming messages, such as chat, pic, audio, video
  • 🔄 Asynchronous Processing: Built on Python's asyncio for high-performance concurrent execution
  • 📊 Flexible Topology: Support for complex graph structures including branching and merging and cycling
  • ⚡ Non-blocking Communication: Efficient message passing between nodes
  • 🛡️ Type Safety: Full type hints support with runtime validation

Core Concepts

Nodes

Nodes are the basic processing units in OoFlow. They are defined using the @ooflow.Node decorator:

@ooflow.Node
async def my_processor(ctx: ooflow.Context):
    # Fetch data from predecessor nodes
    while True:
        data = await ctx.fetch()
        # data_from_A = await ctx.fetch(A) / ctx.fetch_nowait(A)
        # data_from_B = await ctx.fetch(B) / ctx.fetch_nowait(B)
        # data_from_A_or_B = await ctx.fetch([A, B]) / ctx.fetch_nowait([A, B])
    
        ##############################
        # Your processing logic here #
        ##############################
    
        # Send result to successor nodes
        await ctx.emit(result)
        # await ctx.emit(result, C) / ctx.emit_nowait(result, C)
        # await ctx.emit(result, D) / ctx.emit_nowait(result, D)
        # await ctx.emit(result, [C, D]) / ctx.emit_nowait(result, [C, D])

Context

Each node receives a Context object that provides methods for communication:

  • await ctx.fetch() - Receive messages from all predecessor nodes
  • await ctx.emit(data) - Send messages to all successor nodes
  • ctx.fetch_nowait() - Non-blocking message retrieval
  • ctx.emit_nowait(data) - Non-blocking message sending

If you want to specify the source / target nodes to fetch from / emit to, you can:

  • ctx.fetch_nowait(A) or await ctx.fetch(A) - Receive messages only from node A
  • ctx.fetch_nowait([A, B]) or await ctx.fetch([A, B]) - Receive messages only from nodes A and B
  • ctx.emit_nowait(data, C) or await ctx.emit(data, C) - Send messages only to node C
  • ctx.emit_nowait(data, [C, D]) or await ctx.emit(data, [C, D]) - Send messages only to nodes C and D

Flow Creation

Connect nodes to create processing pipelines:

"""
Flow topology diagram:
    A


    B
   ╱ ╲
  ▼   ▼
  C   D
   ╲ ╱

    E
"""
flow = ooflow.create(
    A.to(B),           # A → B
    B.to(C, D),        # B → C, D (branching)
    C.to(E),           # C → E
    D.to(E)            # D → E (merging)
)

Advanced Examples

Branching and Merging

@ooflow.Node
async def splitter(ctx: ooflow.Context):
    data = await ctx.fetch()
    # Send to multiple nodes
    await ctx.emit(data, [branch1, branch2])

@ooflow.Node
async def branch1(ctx: ooflow.Context):
    data = await ctx.fetch()
    result = await process_branch1(data)
    await ctx.emit(result)

@ooflow.Node
async def branch2(ctx: ooflow.Context):
    data = await ctx.fetch()
    result = await process_branch2(data)
    await ctx.emit(result)

@ooflow.Node
async def merger(ctx: ooflow.Context):
    # Collect from both branches
    result1 = await ctx.fetch(branch1)
    result2 = await ctx.fetch(branch2)
    combined = combine_results(result1, result2)
    await ctx.emit(combined)

# Create the flow
flow = ooflow.create(
    splitter.to(branch1, branch2),
    branch1.to(merger),
    branch2.to(merger)
)

Method Decoration

class DataProcessor:
    def __init__(self, multiplier=2):
        self.multiplier = multiplier
        self.processed_count = 0
    
    @ooflow.Node
    async def instance_method(self, ctx: ooflow.Context):
        """Instance method as a node - can access instance state"""
        while True:
            data = await ctx.fetch()
            result = data * self.multiplier
            self.processed_count += 1
            await ctx.emit({"result": result, "count": self.processed_count})
    
    @classmethod
    @ooflow.Node
    async def class_method(cls, ctx: ooflow.Context):
        """Class method as a node - can access class-level information"""
        while True:
            data = await ctx.fetch()
            result = {"processed_by": cls.__name__, "data": data}
            await ctx.emit(result)
    
    @staticmethod
    @ooflow.Node
    async def static_method(ctx: ooflow.Context):
        """Static method as a node - pure function behavior"""
        while True:
            data = await ctx.fetch()
            result = data.upper() if isinstance(data, str) else str(data).upper()
            await ctx.emit(result)

async def main():
    # Create processor instance
    processor = DataProcessor(multiplier=3)

    # Create flow using different method types
    flow = ooflow.create(
        processor.instance_method.to(processor.class_method),
        processor.class_method.to(processor.static_method)
    )

    flow.run()
    count_down = 3
    while count_down > 0:
        count_down = count_down - 1
        await flow.emit("Hello")
        print(await flow.fetch())
        await asyncio.sleep(1)
    flow.stop()

if __name__ == "__main__":
    asyncio.run(main())

License

MIT License - see LICENSE file for details. If you use OoFlow in your project, please cite this repo.


OoFlow

一个轻量级的 Python 框架,用于构建有状态节点的数据处理图。

快速开始

import asyncio
import ooflow

@ooflow.Node
async def A(ctx: ooflow.Context):
    count = 0
    while True:
        count = count + 1
        msg = await ctx.fetch()
        await ctx.emit(f"{count} {msg} Hello")

@ooflow.Node
async def B(ctx: ooflow.Context):
    while True:
        msg = await ctx.fetch()
        await ctx.emit(f"{msg} World")

@ooflow.Node
async def C(ctx: ooflow.Context):
    while True:
        msg = await ctx.fetch()
        await ctx.emit(f"{msg}!")

async def main():
    # 创建并运行流程
    flow = ooflow.create(
        A.to(B),
        B.to(C)
    )

    flow.run()
    count_down = 3
    while count_down > 0:
        count_down = count_down - 1

        await flow.emit("start")
        print(await flow.fetch())
        await asyncio.sleep(1)
    flow.stop()

if __name__ == "__main__":
    asyncio.run(main())

安装

pip install ooflow

特性

OoFlow 与其它框架最大的区别是什么? 你的函数/方法节点是有状态的,可以做跨消息逻辑的实现。

其他要点:

  • 🎯 状态节点: 带状态的处理节点,支持你写更丰富的业务逻辑
  • 📝 超级易用的 API: 极其直观的使用方法,没有复杂 Python 包依赖
  • 🤖 AI Ready: 易于处理 AI 相关的流式消息,如聊天、图片、音频、视频
  • 🔄 异步处理: 基于 Python asyncio 构建,支持高性能并发执行
  • 📊 灵活拓扑: 支持复杂的图结构,包括分支、合并和循环
  • ⚡ 非阻塞通信: 节点间高效的消息传递
  • 🛡️ 类型安全: 完整的类型提示支持和运行时验证

核心概念

节点

节点是 OoFlow 中的基本处理单元,使用 @ooflow.Node 装饰器定义:

@ooflow.Node
async def my_processor(ctx: ooflow.Context):
    # 从前驱节点获取数据
    while True:
        data = await ctx.fetch()
        # data_from_A = await ctx.fetch(A) / ctx.fetch_nowait(A)
        # data_from_B = await ctx.fetch(B) / ctx.fetch_nowait(B)
        # data_from_A_or_B = await ctx.fetch([A, B]) / ctx.fetch_nowait([A, B])
    
        ##############################
        #     你的处理逻辑写在这儿      #
        ##############################
    
        # 将结果发送到后继节点
        await ctx.emit(result)
        # await ctx.emit(result, C) / ctx.emit_nowait(result, C)
        # await ctx.emit(result, D) / ctx.emit_nowait(result, D)
        # await ctx.emit(result, [C, D]) / ctx.emit_nowait(result, [C, D])

上下文

每个节点接收一个 Context 对象,提供通信方法:

  • await ctx.fetch() - 从所有前驱节点接收消息
  • await ctx.emit(data) - 向所有后继节点发送消息
  • ctx.fetch_nowait() - 非阻塞消息检索
  • ctx.emit_nowait(data) - 非阻塞消息发送

如果你想指定从哪些源节点获取数据或向哪些目标节点发送数据,你可以:

  • ctx.fetch_nowait(A)await ctx.fetch(A) - 仅从节点 A 接收消息
  • ctx.fetch_nowait([A, B])await ctx.fetch([A, B]) - 仅从节点 A 和 B 接收消息
  • ctx.emit_nowait(data, C)await ctx.emit(data, C) - 仅向节点 C 发送消息
  • ctx.emit_nowait(data, [C, D])await ctx.emit(data, [C, D]) - 仅向节点 C 和 D 发送消息

流程创建

连接节点以创建处理管道:

"""
流程拓扑图:
    A


    B
   ╱ ╲
  ▼   ▼
  C   D
   ╲ ╱

    E
"""
flow = ooflow.create(
    A.to(B),           # A → B
    B.to(C, D),        # B → C, D (分支)
    C.to(E),           # C → E
    D.to(E)            # D → E (合并)
)

高级示例

分支和合并

@ooflow.Node
async def splitter(ctx: ooflow.Context):
    data = await ctx.fetch()
    # 发送到多个节点
    await ctx.emit(data, [branch1, branch2])

@ooflow.Node
async def branch1(ctx: ooflow.Context):
    data = await ctx.fetch()
    result = await process_branch1(data)
    await ctx.emit(result)

@ooflow.Node
async def branch2(ctx: ooflow.Context):
    data = await ctx.fetch()
    result = await process_branch2(data)
    await ctx.emit(result)

@ooflow.Node
async def merger(ctx: ooflow.Context):
    # 从两个分支收集数据
    result1 = await ctx.fetch(branch1)
    result2 = await ctx.fetch(branch2)
    combined = combine_results(result1, result2)
    await ctx.emit(combined)

# 创建流程
flow = ooflow.create(
    splitter.to(branch1, branch2),
    branch1.to(merger),
    branch2.to(merger)
)

方法装饰

class DataProcessor:
    def __init__(self, multiplier=2):
        self.multiplier = multiplier
        self.processed_count = 0
    
    @ooflow.Node
    async def instance_method(self, ctx: ooflow.Context):
        """实例方法作为节点 - 可以访问实例状态"""
        while True:
            data = await ctx.fetch()
            result = data * self.multiplier
            self.processed_count += 1
            await ctx.emit({"result": result, "count": self.processed_count})
    
    @classmethod
    @ooflow.Node
    async def class_method(cls, ctx: ooflow.Context):
        """类方法作为节点 - 可以访问类级别信息"""
        while True:
            data = await ctx.fetch()
            result = {"processed_by": cls.__name__, "data": data}
            await ctx.emit(result)
    
    @staticmethod
    @ooflow.Node
    async def static_method(ctx: ooflow.Context):
        """静态方法作为节点 - 纯函数行为"""
        while True:
            data = await ctx.fetch()
            result = data.upper() if isinstance(data, str) else str(data).upper()
            await ctx.emit(result)

async def main():
    # 创建处理器实例
    processor = DataProcessor(multiplier=3)

    # 使用不同方法类型创建流程
    flow = ooflow.create(
        processor.instance_method.to(processor.class_method),
        processor.class_method.to(processor.static_method)
    )

    flow.run()
    count_down = 3
    while count_down > 0:
        count_down = count_down - 1
        await flow.emit("Hello")
        print(await flow.fetch())
        await asyncio.sleep(1)
    flow.stop()

if __name__ == "__main__":
    asyncio.run(main())

许可证

MIT 许可证 - 详见 LICENSE 文件。 如果你在项目中用了 OoFlow,麻烦也注明、引用下我的仓库,谢谢!

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ooflow-0.1.1.tar.gz (20.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ooflow-0.1.1-py3-none-any.whl (10.3 kB view details)

Uploaded Python 3

File details

Details for the file ooflow-0.1.1.tar.gz.

File metadata

  • Download URL: ooflow-0.1.1.tar.gz
  • Upload date:
  • Size: 20.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for ooflow-0.1.1.tar.gz
Algorithm Hash digest
SHA256 16140a86ff3f656964f44e2431b6fba7bbc1d7b16f5927d41ec56e234135b460
MD5 47423ae5c993446314072759585dc981
BLAKE2b-256 44c09bb325efcae10148b953651daac359b50a5cc7daf26cb4e81b3e7f197e9a

See more details on using hashes here.

Provenance

The following attestation bundles were made for ooflow-0.1.1.tar.gz:

Publisher: publish.yml on fanfank/ooflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file ooflow-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: ooflow-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 10.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for ooflow-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 82c46b2cb6cbb052a2b45b9e0bb74940e565880201696368193eeeb977262b53
MD5 d34317b9607c50343a6e5e7061588fa1
BLAKE2b-256 7618c8be344d3fe9291d833f5dca50ff6dff83329173a7cab933ca87cb57ed84

See more details on using hashes here.

Provenance

The following attestation bundles were made for ooflow-0.1.1-py3-none-any.whl:

Publisher: publish.yml on fanfank/ooflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page