pyboot-components-netty is an open-source utility package that simplifies the development of async.
Project description
pyboot-netty 文档
Python 版 Netty:事件驱动、Handler 流水线、零拷贝、全平台兼容的高性能网络框架
1. 框架定位
pyboot-netty 是一个 事件驱动 + Handler 流水线 的 TCP/UDP 网络框架,100 % Python 实现,API 设计完全对标 Netty,目标是让 Python 开发者也能用“Netty 风格”写出高并发、易维护的网络应用。
- 零依赖:仅依赖标准库
asyncio与selectors - 全平台:Linux(epoll)、macOS(kqueue)、Windows(select)
- 零拷贝:
memoryview+bytearray传输,无额外 copy - Handler 链:解码 → 业务 → 编码,可插拔
- 池化缓冲区:自适应读缓冲,高低水位线控制写事件
- Keep-alive:TCP 心跳、自动重连、优雅关闭
2. 核心概念(与 Netty 1:1 映射)
| Netty 概念 | pyboot-netty 对应 | 说明 |
|---|---|---|
| Channel | SocketChannel |
一个连接 |
| EventLoop | asyncio.BaseEventLoop |
单线程事件循环 |
| ChannelPipeline | ChannelPipeline |
Handler 双向链表 |
| ChannelHandler | ChannelHandler 基类 |
用户业务单元 |
| ByteBuf | ByteBuffer |
池化字节缓冲区 |
| Bootstrap | Bootstrap / ServerBootstrap |
客户端/服务端启动器 |
| Future | asyncio.Future |
异步结果 |
| Codec | MessageToByteDecoder / MessageToByteEncoder |
编解码 |
3. 线程模型(主从 Reactor)
┌─ BossGroup (1 线程)──┐
│ accept → register │
└──────┬───────────────┘
▼
┌─ WorkerGroup (N 线程)┐
│ read → decode → biz│
└──────┬───────────────┘
▼
┌─ Codec & Business ┐
│ encode → write │
└────────────────────┘
- Boss 只负责
accept,立刻把 fd 注册到 Worker 事件循环 - Worker 处理该连接生命周期内所有 IO(读、写、心跳、关闭)
- 所有 Handler 同线程串行执行,无需加锁
4. 快速开始
4.1 安装
pip install pyboot-netty
4.2 一行 Echo Server
from pyboot.components.netty.server import EchoServer
EchoServer().address('0.0.0.0', 8080).grace()
命令行输入 telnet 127.0.0.1 8080 → 键入任何字符回车后立即回显。Close退出
4.3 自定义 Handler(对标 Netty ChannelInboundHandler)
from pyboot.components.netty.handler import IdleStateHandler,LoggingHandler,WriteTimeoutHandler,ListRemoteAddressFilter
from pyboot.components.netty.codec import StringDecoder,ByteDelimiterBasedFrameDecoder,DelimiterBasedFrameDecoder,ByteToByteBufferMessageDecoder
from pyboot.components.netty.codec import LineBasedFrameDecoder,StringEncoder,JsonEncoder,FixedLengthFrameDecoder
from pyboot.components.netty.codec import LengthFieldBasedFrameDecoder, SimpleBytesLengthFieldBasedFrameEncoder,MessageToByteEncoder
from pyboot.components.netty.channel import ChannelHandler,ChannelHandlerContext,ChannelEvent,ChannelHandleError
class MyDecoder(ByteToByteBufferMessageDecoder):
def __init__(self, fix_length:int,pool_bucket_size:int=1024):
super().__init__(pool_bucket_size=pool_bucket_size)
assert fix_length>0, f'Frame长度必须大于0,但是获得{fix_length}'
self._fix_length = fix_length
async def decode(self, ctx: ChannelHandlerContext, bytebuffer: ByteBuffer)->ByteBuffer:
if bytebuffer.readable_bytes()>=self._fix_length:
buf = self.allocate_bytebuffer(self._fix_length)
buf.write_from_reader(bytebuffer, self._fix_length)
return buf
else:
return None
class BizHandler(ChannelHandlerAdapter):
async def channel_read(self, ctx: ChannelHandlerContext, msg: ByteBuffer):
data = msg.get_readable_bytes()
print('收到:', data)
ctx.write_and_flush(msg) # 回显
# 客户端
def buildBootstrap()->Bootstrap:
work_group = CoroutineWorkGroup(1,"WorkGroup")
bs:Bootstrap = Bootstrap().group(work_group)
t = StdinTask()
handler = EchoClientHandler('Liuyong', t)
async def _connect_suc(b:Bootstrap, r, w):
_logger.DEBUG(f'Bootstrap={b} reader={r} writer={w}')
async def _connect_suc(b:Bootstrap, r, w):
_logger.DEBUG(f'Bootstrap={b} reader={r} writer={w}')
async def _connect_fail(b:Bootstrap, e):
_logger.DEBUG(f'Bootstrap={b} exception={e}')
fail_count = 1
times = 3
while fail_count < times:
try:
bf = await b.connect()
return bf
except BaseException as e2:
_logger.DEBUG(f'Bootstrap={b} exception={e2}')
finally:
fail_count += 1
if fail_count >= times:
break
await asyncio.sleep(3)
raise IOError(f'连接失败{fail_count}次,退出程序')
def initChannel(sc:SocketChannel):
_pipeline = sc.pipeline()
_pipeline.add_last(IdleStateHandler(0,120,0))
_pipeline.add_last(LoggingHandler(Logger.LEVEL.INFO))
_pipeline.add_last(WriteTimeoutHandler(5,False))
# _pipeline.add_last(DelimiterBasedFrameDecoder([b'\n',b'\r\n'], stripDelimiter=False))
# _pipeline.add_last(ByteDelimiterBasedFrameDecoder([b'\n',b'\r\n'], stripDelimiter=False))
# _pipeline.add_last(LineBasedFrameDecoder(stripDelimiter=False))
# 长度帧解析方式
_pipeline.add_last(LengthFieldBasedFrameDecoder(length_field_length=4, length_adjustment=0, length_field_offset=4,initial_bytes_to_strip=8))
_pipeline.add_last(StringDecoder(charset='utf-8', strip=True))
# _pipeline.add_last(EchoHandler('DavidLiu'))
_pipeline.add_last(SimpleBytesLengthFieldBasedFrameEncoder())
# _pipeline.add_last(StringEncoder(charset='utf-8'))
# _pipeline.add_last(JsonEncoder())
_pipeline.add_last(handler)
# work_group = CoroutineWorkGroup(1,"WorkGroup")
bs.channel(SocketChannel).handler(initChannel)\
.option(ChannelOption.SO_BACKLOG, 1024)\
.option(ChannelOption.SO_TIMEOUT, 6).option(ChannelOption.ALLOCATOR, PooledByteBufferAllocator.DEFAULT)\
.address((HOST, PORT)).when_connect_sucess(_connect_suc).when_connect_fail(_connect_fail)
return bs
def start_with_bootstrap_with_sync():
bs:Bootstrap = buildBootstrap()
bs.grace()
5. Handler 体系(与 Netty 1:1)
5.1 内置解码器
| 类 | 作用 |
|---|---|
FixedLengthFrameDecoder |
定长拆包 |
LengthFieldBasedFrameDecoder |
长度域拆包(支持 1/2/3/4/8 字节) |
LineBasedFrameDecoder |
\n 或 \r\n 行拆包 |
DelimiterBasedFrameDecoder |
自定义分隔符拆包 |
StringDecoder / StringEncoder |
文本编解码 |
5.2 生命周期回调
class MyHandler(ChannelHandlerAdapter):
async def channel_active(self, ctx): ... # 连接建立
async def channel_read(self, ctx, msg): ... # 读到数据
async def channel_inactive(self, ctx): ... # 连接断开
async def exception_caught(self, ctx, exc): ... # 异常
async def channel_read_complete(self, ctx): ... # 读到数据处理结束
async def channel_write_complete(self, ctx): ...# 读到数据处理结束
async def channel_inactive(self, ctx): ... # 建立失火
async def channel_event(self, ctx): ... # 事件
5.3 自定义编解码
继承 MessageToByteDecoder[T] / MessageToByteEncoder[T] 即可,泛型支持 bytes | str | dataclass。
6. 池化 ByteBuffer(零拷贝)
from pyboot_netty import PooledByteBufferAllocator
buf = PooledByteBufferAllocator.DEFAULT.buffer(1024) # 池化 1KB
buf.write_bytes(b'hello')
buf.read_bytes(5) # 返回 memoryview → 零拷贝
buf.release() # 自动回池
- 高低水位线控制写事件(默认 32K/64K) 待处理
memoryview切片无 copy,适合大文件流传输
7. 线程池与并发
from pyboot.commons.coroutine.task import CoroutineWorkGroup
boss = CoroutineWorkGroup(1, 'Acceptor') # 单线程 accept
ServerBootstrap().group(boss_group).address('0.0.0.0', 8080).grace()
- 底层复用
asyncio.run_coroutine_threadsafe跨线程调度 - 支持
asyncio.run()外部注入循环,方便单元测试
8. TCP Keep-Alive & 自动重连
bootstrap = Bootstrap.auto_reconnect(True)
心跳帧由框架自动完成,业务无感;也可继承 HeartbeatHandler 自定义帧格式。
9. TLS 支持 待处理
ServerBootstrap(ssl=True,
certfile='/path/cert.pem',
keyfile='/path/key.pem').bind('0.0.0.0', 9443)
底层使用 asyncio.create_server(ssl=ssl_ctx),支持 SNI、ALPN。
10. 性能数据(本地压测)
| 场景 | QPS | 延迟 P99 | CPU |
|---|---|---|---|
| 回显 (16B) | 110 万 | 0.9 ms | 1 核 100 % |
| 池化缓冲 1MB 流 | 2.3 Gbps | 14 ms | 1 核 100 % |
测试环境:Python 3.12 + CentOS 7.4(epoll)
11. 与 Netty 差异小结
| 维度 | Netty (Java) | pyboot-netty (Python) |
|---|---|---|
| 底层 IO | NIO/Epoll/Kqueue | asyncio + selectors |
| 线程 | 多线程 | 单线程事件循环 |
| 内存 | DirectByteBuf | memoryview + bytearray |
| 泛型 | 运行时擦除 | 原生 TypeVar |
| 零拷贝 | FileRegion |
sendfile 计划支持 |
12. 路线图
- HTTP/1.1 编解码
- WebSocket 握手帧
- HTTP/2 帧支持
- QUIC/UDP 传输
- sendfile 零拷贝文件传输
- 官方文档站点 + 示例仓库
13. 社区 & 贡献
GitHub:https://gitee.com/pyboot/netty
PyPI:https://pypi.org/project/pyboot_components_netty
欢迎 PR、Issue、Star,一起把“Python 版 Netty”做到极致!
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
File details
Details for the file pyboot_components_netty-1.3.3.tar.gz.
File metadata
- Download URL: pyboot_components_netty-1.3.3.tar.gz
- Upload date:
- Size: 38.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b6821ebb92e7f1c62aeac7d2cf349c16dc7050edb715c651fb47fa4ac815bf2e
|
|
| MD5 |
19c592d58cc044a5177004ac7a6ee8a1
|
|
| BLAKE2b-256 |
43e5f2b2086ae6ada12bea1c6a365ae564732d8cb4c21cd012446c38497b6668
|