KCP for asyncio and socketserver, based on kcp
Project description
aiokcp
kcp的python实践, 提供了类似python中TCP相关的标准库相同的编程接口(asyncio, socket, socketserver),原tcp代码使用修改导入的方式可以轻松实现从tcp到kcp的平移替换。
什么是KCP?
KCP是一个致力于低延时的基于UDP自动重传的可靠传输协议。本身不包含任何网络传输的功能,使用回调的方式处理udp数据包的传输。详情见kcp
如何安装
aiokcp
是基于cython绑定调用的kcp库,打包可能需要安装平台的编译工具。windows系统需要安装vsbuilder
,linux系统需要安装gcc
。安装完成后,可以通过pip
安装
pip install aiokcp
如果需要数据包加密,可以选择安装cryptography
pip install aiokcp[crypto]
例子
例子详见aiokcp/examples
目录
asyncio 低级接口
这里实现了类似loop.create_connection
和loop.create_server
的功能
import asyncio
import time
from aiokcp import create_connection, create_server
# copy from document of asyncio.Protocol
class EchoServerProtocol(asyncio.Protocol):
def connection_made(self, transport):
peername = transport.get_extra_info('peername')
print('Connection from {}'.format(peername))
self.transport = transport
def data_received(self, data):
message = data.decode()
print('server: At {} Data received: {!r}'.format(time.time(), message))
print('server: At {} Send: {!r}'.format(time.time(), message))
self.transport.write(data)
print('server: Close the client socket')
self.transport.close()
# copy from document of asyncio.Protocol
class EchoClientProtocol(asyncio.Protocol):
def __init__(self, message, on_con_lost):
self.message = message
self.on_con_lost = on_con_lost
def connection_made(self, transport):
transport.write(self.message.encode())
print('client: At {} Data sent: {!r}'.format(time.time(), self.message))
def data_received(self, data):
print('client: At {} Data received: {!r}'.format(time.time(), data.decode()))
def connection_lost(self, exc):
print('client: The server closed the connection at {}'.format(time.time()))
self.on_con_lost.set_result(True)
async def server():
server = await create_server(EchoServerProtocol, '127.0.0.1', 8888,
kcp_kwargs={ # optional
# ...
})
async with server:
await server.serve_forever()
print('server done')
async def client():
on_con_lost = asyncio.Future()
transport, protocol = await create_connection(
lambda: EchoClientProtocol('Hello World!', on_con_lost),
'127.0.0.1', 8888,
kcp_kwargs={ # optional
# ...
}
)
try:
await on_con_lost
finally:
transport.close()
async def delay_client(delay = 1):
await asyncio.sleep(delay)
await client()
if __name__ == '__main__':
async def main():
await asyncio.gather(server(), client(), delay_client(1))
asyncio.run(main())
asyncio高级接口
import asyncio
import time
from aiokcp import open_connection, start_server
async def handle_echo(reader, writer):
n = 21000
while n:
data = await reader.read(100)
n -= len(data)
message = data.decode()
addr = writer.get_extra_info('peername')
print(f"server: At {time.time()} Received {message!r} from {addr!r}")
print(f"server: At {time.time()} Send: {message!r}")
writer.write(data)
await writer.drain()
print(f"server: At {time.time()} Close the connection")
await asyncio.sleep(10)
writer.close()
await writer.wait_closed()
print(f'server: At {time.time()} Done')
async def kcp_echo_client(message):
reader, writer = await open_connection(
'127.0.0.1', 8888, kcp_kwargs={
# ...
})
print(f'client: At {time.time()} Send: {message!r}', len(message))
writer.write(message.encode())
await writer.drain()
n = len(message)
while n > 0:
data = await reader.read(1000)
print(f'client: At {time.time()} Received: {data.decode()!r}', len(data), n)
n -= len(data)
print(f'client: At {time.time()} Close the connection', '#' * 20)
writer.close()
await writer.wait_closed()
async def server():
server = await start_server(
handle_echo, '127.0.0.1', 8888, kcp_kwargs={
# ...
})
async with server:
await server.serve_forever()
print('server done')
async def main():
await asyncio.gather(server(), kcp_echo_client('Hello World!'))
if __name__ == '__main__':
asyncio.run(main())
同步的kcp socketpair
from aiokcp.sync import KCPSocket
sock1, sock2 = KCPSocket.socket_pair()
sock1.send(b'123')
print(sock2.recv(100))
sock2.send(b'234')
print(sock1.recv(100))
同步的kcp 简单服务器-客户端
from aiokcp.sync import KCPSocket
sock1 = KCPSocket.create_server(('127.0.0.1', 18586))
sock2 = KCPSocket.create_connection(('127.0.0.1', 18586))
server_sock, _ = sock1.accept()
server_sock.send(b'123')
print(sock2.recv(100))
sock2.send(b'234')
print(server_sock.recv(100))
同步的kcp socketserver
import os
import threading
import time
from aiokcp.sync import (BaseRequestHandler, KCPSocket, KCPThreadingServer,
StreamRequestHandler)
class Handler(BaseRequestHandler):
def handle(self):
nbytes = 0
while True:
# self.request is the KCP socket connected to the client
data = self.request.recv(1024)
print("Received from {}:{}".format(*self.client_address))
# print("Data: {}".format(data))
# just send back the same data
# there is no mechanism to check if the connection is broken in kcp, but timeout.
# when timeout occurs, the connection will be closed, recv will return empty bytes
if not data:
break
nbytes += self.request.send(data)
print('server recved: {} sent: {}'.format(nbytes, len(data)))
print('server handle end ##################')
def server_thread(port):
kw = {
'kcp_kwargs': {
# ...
}
}
server = KCPThreadingServer(('127.0.0.1', port), Handler, **kw)
thread = threading.Thread(target=server.serve_forever)
thread.start()
def client_thread(port):
kw = {
'kcp_kwargs': {
# ...
}
}
sock = KCPSocket.create_connection(('127.0.0.1', port), **kw)
sent_buf = b'abc'
sock.send(sent_buf)
for _ in range(1):
b = os.urandom(7 * 1000)
sent_buf += b
sock.send(b)
print('###########', len(sent_buf), '###########')
n = len(sent_buf)
buf = b''
while n > 0:
data = sock.recv(1024)
buf += data
if data:
n -= len(data)
else:
break
print('client recv', len(buf), len(data)) # print(buf, len(data))
if buf[:7003-n] != sent_buf[:7003-n]:
# ensure sent in order
print('error')
print(buf[:7003-n])
print(sent_buf[:7003-n])
break
print('client handle end', '###############')
time.sleep(1)
sock.close()
if __name__ == '__main__':
def thread_test():
from random import randint
port = randint(10000, 20000)
server_thread(port)
client_thread(port)
thread_test()
同步的kcp socketserver 流处理
import os
import threading
import time
from aiokcp.sync import (BaseRequestHandler, KCPSocket, KCPThreadingServer,
StreamRequestHandler)
class StreamHandler(StreamRequestHandler):
def handle(self):
print('handling')
n = 0
while True:
# self.rfile is a file-like object created by the handler;
# we can now use e.g. readline() instead of raw recv() calls
data = self.rfile.readline().strip()
if not data or data == 'end':
break
n += len(data)
print('server recved: {} sent: {}'.format(n, len(data)))
# Likewise, self.wfile is a file-like object used to write back
# to the client
self.wfile.write(data)
self.wfile.flush()
print('server handle end ##################')
time.sleep(1)
self.wfile.close()
self.request.close()
def server_thread(port):
kw = {
'kcp_kwargs': {
# ...
},
'stream': 1
}
server = KCPThreadingServer(('127.0.0.1', port), StreamHandler, **kw)
thread = threading.Thread(target=server.serve_forever)
thread.start()
def client_thread(port):
kw = {
'kcp_kwargs': {
# ...
},
'stream': 1
}
sock = KCPSocket.create_connection(('127.0.0.1', port), **kw)
sent_buf = b'abc\ndef\nghi\njkl\nmno\npqr\nstu\nvwx\nyza\nend\n'
sock.send(sent_buf)
buf = b''
while len(buf) < 27:
buf += sock.recv(27)
print('client recv', buf)
if __name__ == '__main__':
def thread_test():
from random import randint
port = randint(10000, 20000)
server_thread(port)
client_thread(port)
thread_test()
可选的udp数据包加密
默认数据包不加密,但提供加密的方法和参数。内置的加密方法需要安装cryptography
, 采用的aes+cbc模式加密+hmac校验。也可以自定义加密对象, 只需要实现encrypt
和decrypt
方法即可
from aiokcp import (create_connection, create_server, open_connection,
start_server)
from aiokcp.crypto import get_crypto
from aiokcp.sync import KCPSocket
# need cryptography installed
key = b'12345678901234567890123456789012'
salt = b'1234567890123456'
crypto = get_crypto(key, salt)
# or
class Crypto:
# need to implement encrypt and decrypt method
def encrypt(self, data):
pass
def decrypt(self, data):
pass
crypto = Crypto()
create_connection(..., crypto=crypto)
create_server(..., crypto=crypto)
open_connection(..., crypto=crypto)
start_server(..., crypto=crypto)
KCPSocket(..., crypto=crypto)
KCPSocket.create_connection(..., crypto=crypto)
KCPSocket.create_server(..., crypto=crypto)
KCPSocket.socket_pair(crypto=crypto)
相关配置
kcp默认配置如下,可以通过传递kcp_kwargs
参数到相应的方法,改变相关配置,kcp_kwargs
不用每个参数都设置,没有设置的使用默认值
default_update_interval = 100 # ms
default_kcp_kwargs = {
'max_transmission': 1400,
'no_delay' : True,
'update_interval' : default_update_interval,
'resend_count' : 2,
'no_congestion_control': False,
'send_window_size': 32,
'receive_window_size': 128,
'stream': 0
}
default_timeout = 600
相关配置修改
KCPServer
, KCPSteamTransport
, sync.KCPServer
, sync.KCPSocket
均提供下面的方法修改相关的配置
def set_nodelay(self, no_delay: bool, update_interval: int, resend_count: int, no_congestion_control: bool):
pass
def set_wndsize(self, send: int, receive: int):
pass
def set_mtu(self, max_transmission: int):
pass
def set_stream(self, stream: bool):
pass
功能
- asyncio低级接口: Protocol
- asyncio高级接口: Stream
- 同步的kcp socket的实现
- 同步的kcp socketserver的实现
- 可选的udp数据包加密
- close时通知对方关闭socket
- 支持tls/ssl
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file aiokcp-0.0.4.tar.gz
.
File metadata
- Download URL: aiokcp-0.0.4.tar.gz
- Upload date:
- Size: 123.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.0 CPython/3.10.1
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | e1f8455d8e87bd711e993cead22191896336f8be1bd37212f515a26146bf152e |
|
MD5 | 3e1c4a6c8cbc87963a179429636e6311 |
|
BLAKE2b-256 | 2c160e0d03a4302569e3c6c917e441a2dcf6c3c155227d6c6fac6b509ea23e1a |
File details
Details for the file aiokcp-0.0.4-cp310-cp310-win_amd64.whl
.
File metadata
- Download URL: aiokcp-0.0.4-cp310-cp310-win_amd64.whl
- Upload date:
- Size: 176.6 kB
- Tags: CPython 3.10, Windows x86-64
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.0 CPython/3.10.1
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 4b3fa0aa67c7589637bd8432165cee087e9e3478af1bf7ee2fd64832d47778ce |
|
MD5 | 41032ea6ec86dc622d96d87a629fb580 |
|
BLAKE2b-256 | e691ea8aec253eaf86fdd92086d83de12fba31a1399cbc5ae8f952ed0a38d7a0 |