Skip to main content

pyboot-components-netty is an open-source utility package that simplifies the development of async.

Project description

pyboot-netty 文档

Python 版 Netty:事件驱动、Handler 流水线、零拷贝、全平台兼容的高性能网络框架


1. 框架定位

pyboot-netty 是一个 事件驱动 + Handler 流水线 的 TCP/UDP 网络框架,100 % Python 实现,API 设计完全对标 Netty,目标是让 Python 开发者也能用“Netty 风格”写出高并发、易维护的网络应用。

  • 零依赖:仅依赖标准库 asyncioselectors
  • 全平台:Linux(epoll)、macOS(kqueue)、Windows(select)
  • 零拷贝memoryview + bytearray 传输,无额外 copy
  • Handler 链:解码 → 业务 → 编码,可插拔
  • 池化缓冲区:自适应读缓冲,高低水位线控制写事件
  • Keep-alive:TCP 心跳、自动重连、优雅关闭

2. 核心概念(与 Netty 1:1 映射)

Netty 概念 pyboot-netty 对应 说明
Channel SocketChannel 一个连接
EventLoop asyncio.BaseEventLoop 单线程事件循环
ChannelPipeline ChannelPipeline Handler 双向链表
ChannelHandler ChannelHandler 基类 用户业务单元
ByteBuf ByteBuffer 池化字节缓冲区
Bootstrap Bootstrap / ServerBootstrap 客户端/服务端启动器
Future asyncio.Future 异步结果
Codec MessageToByteDecoder / MessageToByteEncoder 编解码

3. 线程模型(主从 Reactor)

┌─ BossGroup (1 线程)──┐
│  accept → register   │
└──────┬───────────────┘
       ▼
┌─ WorkerGroup (N 线程)┐
│  read → decode → biz│
└──────┬───────────────┘
       ▼
┌─ Codec & Business  ┐
│  encode → write    │
└────────────────────┘
  • Boss 只负责 accept,立刻把 fd 注册到 Worker 事件循环
  • Worker 处理该连接生命周期内所有 IO(读、写、心跳、关闭)
  • 所有 Handler 同线程串行执行,无需加锁

4. 快速开始

4.1 安装

pip install pyboot-netty

4.2 一行 Echo Server

from pyboot.components.netty.server import EchoServer
EchoServer().address('0.0.0.0', 8080).grace()

命令行输入 telnet 127.0.0.1 8080 → 键入任何字符回车后立即回显。Close退出

4.3 自定义 Handler(对标 Netty ChannelInboundHandler

from pyboot.components.netty.handler import IdleStateHandler,LoggingHandler,WriteTimeoutHandler,ListRemoteAddressFilter
from pyboot.components.netty.codec import StringDecoder,ByteDelimiterBasedFrameDecoder,DelimiterBasedFrameDecoder,ByteToByteBufferMessageDecoder 
from pyboot.components.netty.codec import LineBasedFrameDecoder,StringEncoder,JsonEncoder,FixedLengthFrameDecoder 
from pyboot.components.netty.codec import LengthFieldBasedFrameDecoder, SimpleBytesLengthFieldBasedFrameEncoder,MessageToByteEncoder  
from pyboot.components.netty.channel import ChannelHandler,ChannelHandlerContext,ChannelEvent,ChannelHandleError 

class MyDecoder(ByteToByteBufferMessageDecoder):
    def __init__(self, fix_length:int,pool_bucket_size:int=1024):        
        super().__init__(pool_bucket_size=pool_bucket_size)
        assert fix_length>0, f'Frame长度必须大于0,但是获得{fix_length}'
        self._fix_length = fix_length
        
    async def decode(self, ctx: ChannelHandlerContext, bytebuffer: ByteBuffer)->ByteBuffer:
        if bytebuffer.readable_bytes()>=self._fix_length:
            buf = self.allocate_bytebuffer(self._fix_length)
            buf.write_from_reader(bytebuffer, self._fix_length)
            return buf
        else:
            return None

class BizHandler(ChannelHandlerAdapter):
    async def channel_read(self, ctx: ChannelHandlerContext, msg: ByteBuffer):
        data = msg.get_readable_bytes()
        print('收到:', data)
        ctx.write_and_flush(msg)  # 回显

# 客户端

def buildBootstrap()->Bootstrap:
    
    work_group = CoroutineWorkGroup(1,"WorkGroup")
    bs:Bootstrap = Bootstrap().group(work_group)        
    t = StdinTask()
    handler = EchoClientHandler('Liuyong', t)
    
    async def _connect_suc(b:Bootstrap, r, w):
        _logger.DEBUG(f'Bootstrap={b} reader={r} writer={w}')
        
    async def _connect_suc(b:Bootstrap, r, w):
        _logger.DEBUG(f'Bootstrap={b} reader={r} writer={w}')
        
    async def _connect_fail(b:Bootstrap, e):
        _logger.DEBUG(f'Bootstrap={b} exception={e}')
        fail_count = 1
        times = 3
        while fail_count < times:
            try:
                bf = await b.connect()
                return bf
            except BaseException as e2:
                _logger.DEBUG(f'Bootstrap={b} exception={e2}')
            finally:
                fail_count += 1
                if fail_count >= times:
                    break
                await asyncio.sleep(3)
                
        raise IOError(f'连接失败{fail_count}次,退出程序')
                
        
    def initChannel(sc:SocketChannel):
        _pipeline = sc.pipeline()
        _pipeline.add_last(IdleStateHandler(0,120,0))
        _pipeline.add_last(LoggingHandler(Logger.LEVEL.INFO))
        _pipeline.add_last(WriteTimeoutHandler(5,False))
        # _pipeline.add_last(DelimiterBasedFrameDecoder([b'\n',b'\r\n'], stripDelimiter=False))        
        # _pipeline.add_last(ByteDelimiterBasedFrameDecoder([b'\n',b'\r\n'], stripDelimiter=False))
        
        # _pipeline.add_last(LineBasedFrameDecoder(stripDelimiter=False))
        # 长度帧解析方式
        _pipeline.add_last(LengthFieldBasedFrameDecoder(length_field_length=4, length_adjustment=0, length_field_offset=4,initial_bytes_to_strip=8))        
        _pipeline.add_last(StringDecoder(charset='utf-8', strip=True))        
        # _pipeline.add_last(EchoHandler('DavidLiu'))     
        _pipeline.add_last(SimpleBytesLengthFieldBasedFrameEncoder())   
        # _pipeline.add_last(StringEncoder(charset='utf-8'))
        # _pipeline.add_last(JsonEncoder())
        _pipeline.add_last(handler)
    
    # work_group = CoroutineWorkGroup(1,"WorkGroup")
    bs.channel(SocketChannel).handler(initChannel)\
            .option(ChannelOption.SO_BACKLOG, 1024)\
                .option(ChannelOption.SO_TIMEOUT, 6).option(ChannelOption.ALLOCATOR, PooledByteBufferAllocator.DEFAULT)\
                .address((HOST, PORT)).when_connect_sucess(_connect_suc).when_connect_fail(_connect_fail)
    return bs
    
            
def start_with_bootstrap_with_sync():    
    bs:Bootstrap = buildBootstrap()
    bs.grace()            
    

5. Handler 体系(与 Netty 1:1)

5.1 内置解码器

作用
FixedLengthFrameDecoder 定长拆包
LengthFieldBasedFrameDecoder 长度域拆包(支持 1/2/3/4/8 字节)
LineBasedFrameDecoder \n\r\n 行拆包
DelimiterBasedFrameDecoder 自定义分隔符拆包
StringDecoder / StringEncoder 文本编解码

5.2 生命周期回调

class MyHandler(ChannelHandlerAdapter):
    async def channel_active(self, ctx): ...        # 连接建立
    async def channel_read(self, ctx, msg): ...     # 读到数据
    async def channel_inactive(self, ctx): ...      # 连接断开
    async def exception_caught(self, ctx, exc): ... # 异常
    async def channel_read_complete(self, ctx): ... # 读到数据处理结束
    async def channel_write_complete(self, ctx): ...# 读到数据处理结束
    async def channel_inactive(self, ctx): ...      # 建立失火
    async def channel_event(self, ctx): ...         # 事件

5.3 自定义编解码

继承 MessageToByteDecoder[T] / MessageToByteEncoder[T] 即可,泛型支持 bytes | str | dataclass


6. 池化 ByteBuffer(零拷贝)

from pyboot_netty import PooledByteBufferAllocator

buf = PooledByteBufferAllocator.DEFAULT.buffer(1024)  # 池化 1KB
buf.write_bytes(b'hello')
buf.read_bytes(5)        # 返回 memoryview → 零拷贝
buf.release()            # 自动回池
  • 高低水位线控制写事件(默认 32K/64K) 待处理
  • memoryview 切片无 copy,适合大文件流传输

7. 线程池与并发

from pyboot.commons.coroutine.task import CoroutineWorkGroup

boss = CoroutineWorkGroup(1, 'Acceptor')   # 单线程 accept
ServerBootstrap().group(boss_group).address('0.0.0.0', 8080).grace()
  • 底层复用 asyncio.run_coroutine_threadsafe 跨线程调度
  • 支持 asyncio.run() 外部注入循环,方便单元测试

8. TCP Keep-Alive & 自动重连

bootstrap = Bootstrap.auto_reconnect(True)

心跳帧由框架自动完成,业务无感;也可继承 HeartbeatHandler 自定义帧格式。


9. TLS 支持 待处理

ServerBootstrap(ssl=True,
                certfile='/path/cert.pem',
                keyfile='/path/key.pem').bind('0.0.0.0', 9443)

底层使用 asyncio.create_server(ssl=ssl_ctx),支持 SNI、ALPN。


10. 性能数据(本地压测)

场景 QPS 延迟 P99 CPU
回显 (16B) 110 万 0.9 ms 1 核 100 %
池化缓冲 1MB 流 2.3 Gbps 14 ms 1 核 100 %

测试环境:Python 3.12 + CentOS 7.4(epoll)


11. 与 Netty 差异小结

维度 Netty (Java) pyboot-netty (Python)
底层 IO NIO/Epoll/Kqueue asyncio + selectors
线程 多线程 单线程事件循环
内存 DirectByteBuf memoryview + bytearray
泛型 运行时擦除 原生 TypeVar
零拷贝 FileRegion sendfile 计划支持

12. 路线图

  • HTTP/1.1 编解码
  • WebSocket 握手帧
  • HTTP/2 帧支持
  • QUIC/UDP 传输
  • sendfile 零拷贝文件传输
  • 官方文档站点 + 示例仓库

13. 社区 & 贡献

GitHub:https://gitee.com/pyboot/netty
PyPI:https://pypi.org/project/pyboot_components_netty

欢迎 PR、Issue、Star,一起把“Python 版 Netty”做到极致!

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyboot_components_netty-1.3.3.tar.gz (38.8 kB view details)

Uploaded Source

File details

Details for the file pyboot_components_netty-1.3.3.tar.gz.

File metadata

  • Download URL: pyboot_components_netty-1.3.3.tar.gz
  • Upload date:
  • Size: 38.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.10

File hashes

Hashes for pyboot_components_netty-1.3.3.tar.gz
Algorithm Hash digest
SHA256 b6821ebb92e7f1c62aeac7d2cf349c16dc7050edb715c651fb47fa4ac815bf2e
MD5 19c592d58cc044a5177004ac7a6ee8a1
BLAKE2b-256 43e5f2b2086ae6ada12bea1c6a365ae564732d8cb4c21cd012446c38497b6668

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page