Simple asyncio TCP client and server and UDP node library inspired by fastapi
Project description
netaio
This is designed to be a simple and easy to use asyncio-based TCP client and server and UDP node library inspired by fastapi but for non-HTTP use cases.
Status
This is currently a work-in-progress. Remaining work before the v0.1.0 release:
- Authorization plugin system
- Cipher plugin system
- Optional authorization plugin using HMAC
- Optional cipher plugin using simple symmetric stream cipher
- UDP node with multicast
- Automatic peer advertisement/discovery/management for UDP node
- Error/errored message handling system
- Optional authorization plugin using tapescript
- Optional cipher plugin using Curve25519 asymmetric encryption
- Optional authorization plugin using Hashcash/PoW for anti-spam DoS protection
- Ephemeral handlers (i.e. handlers that are removed after first use)
- IPv6 support
- Core daemon to proxy traffic for local apps
- E2e encrypted chat app example
Issues are tracked here. Historical changes can be found in the changelog.
Usage
Install with pip install netaio. To use the optional asymmetric cryptography
plugins, install with pip install netaio[asymmetric].
Brief examples are shown below. For more documentation, see the dox.md file generated by autodox.
TCPServer
from netaio import (
TCPServer, Body, Message, MessageType, HMACAuthPlugin,
make_respond_uri_msg, make_ok_msg,
make_error_msg, make_not_found_msg, make_not_permitted_msg
)
import asyncio
server = TCPServer(port=8888, auth_plugin=HMACAuthPlugin(config={"secret": "test"}))
auth_plugin2 = HMACAuthPlugin(config={"secret": "adminpassword", "hmac_field": "camh"})
storage = {}
@server.on(MessageType.CREATE_URI, auth_plugin=auth_plugin2)
async def handle_create(msg: Message, writer: asyncio.StreamWriter):
if msg.body.uri in storage:
return make_error_msg('cannot overwrite', msg.body.uri)
storage[msg.body.uri] = msg.body.content
server.logger.info(f'CREATED resource: {msg.body.uri}')
return make_ok_msg(b'stored', msg.body.uri)
@server.on(MessageType.REQUEST_URI)
async def handle_request(msg: Message, writer: asyncio.StreamWriter):
server.logger.info(f'REQUESTED resource: {msg.body.uri}')
if msg.body.uri not in storage:
return make_not_found_msg(uri=msg.body.uri)
return make_respond_uri_msg(storage[msg.body.uri], msg.body.uri)
@server.on((MessageType.REQUEST_URI, b'echo'))
async def handle_echo(msg: Message, writer: asyncio.StreamWriter):
return make_respond_uri_msg(msg.body.content, b'echo')
@server.on(MessageType.UPDATE_URI, auth_plugin=auth_plugin2)
async def handle_update(msg: Message, writer: asyncio.StreamWriter):
if msg.body.uri not in storage:
return make_not_found_msg(uri=msg.body.uri)
storage[msg.body.uri] = msg.body.content
server.logger.info(f'UPDATED resource: {msg.body.uri}')
return make_ok_msg(b'stored', msg.body.uri)
@server.on(MessageType.DELETE_URI, auth_plugin=auth_plugin2)
async def handle_delete(msg: Message, writer: asyncio.StreamWriter):
if msg.body.uri not in storage:
return make_not_found_msg(uri=msg.body.uri)
storage.pop(msg.body.uri)
server.logger.info(f'DELETED resource: {msg.body.uri}')
return make_ok_msg(b'deleted', msg.body.uri)
@server.on(MessageType.SUBSCRIBE_URI)
async def subscribe(msg: Message, writer: asyncio.StreamWriter):
server.subscribe(msg.body.uri, writer)
return Message.prepare(
Body.prepare(b'', uri=msg.body.uri),
MessageType.CONFIRM_SUBSCRIBE
)
@server.on(MessageType.UNSUBSCRIBE_URI)
async def unsubscribe(msg: Message, writer: asyncio.StreamWriter):
server.unsubscribe(msg.body.uri, writer)
return Message.prepare(
Body.prepare(b'', uri=msg.body.uri),
MessageType.CONFIRM_UNSUBSCRIBE
)
asyncio.run(server.start())
TCPClient
from netaio import TCPClient, Body, Message, MessageType, HMACAuthPlugin
from secrets import token_hex
import asyncio
client = TCPClient(
"127.0.0.1", 8888, auth_plugin=HMACAuthPlugin(config={"secret": "test"})
)
auth_plugin2 = HMACAuthPlugin(config={"secret": "adminpassword", "hmac_field": "camh"})
async def run_client():
# connect, then test the echo endpoint
await client.connect()
response = await client.request(b'echo', content=b'test')
print(response)
# test full CRUD methods: starting with CREATE
content = ('testing it' + token_hex(4)).encode()
response = await client.create(b'test1', content, auth_plugin=auth_plugin2)
print(response)
# now READ
response = await client.request(b'test1')
assert response.body.content == content, (response.body.content, content)
print(response)
# now UPDATE
new_content = ('testing it' + token_hex(4)).encode()
response = await client.update(b'test1', new_content, auth_plugin=auth_plugin2)
print(response)
# now READ again
response = await client.request(b'test1')
assert response.body.content == new_content, (response.body.content, new_content)
print(response)
# now DELETE
response = await client.delete(b'test1', auth_plugin=auth_plugin2)
print(response)
# now READ to prove deletion
response = await client.request(b'test1')
assert response.header.message_type == MessageType.NOT_FOUND, \
response.header.message_type
print(response)
asyncio.run(run_client())
UDPNode
from netaio import UDPNode, Peer, Body, Message, MessageType, HMACAuthPlugin
from os import urandom
import asyncio
local_peer = Peer(
addrs={('127.0.0.1', 8888)},
id=urandom(16),
data=b''
)
echo_node = UDPNode(
local_peer=local_peer,
auth_plugin=HMACAuthPlugin(config={"secret": "test"})
)
@echo_node.on(MessageType.REQUEST_URI)
def request_uri(msg: Message, addr: tuple[str, int]):
echo_node.logger.info("Sending echo to %s...", addr)
return Message.prepare(msg.body, MessageType.OK)
@echo_node.on(MessageType.OK)
def echo(msg: Message, addr: tuple[str, int]):
echo_node.logger.info("Received echo from %s.", addr)
echo_msg = Message.prepare(Body.prepare(b'echo'), MessageType.REQUEST_URI)
async def main(local_addr: tuple[str, int], remote_addr: tuple[str, int]|None = None):
echo_node.interface = local_addr[0]
echo_node.port = local_addr[1]
await echo_node.start()
await echo_node.manage_peers_automatically(advertise_every=1, peer_timeout=3)
while True:
await asyncio.sleep(1)
if remote_addr:
echo_node.logger.info("Sending message to %s...", remote_addr)
echo_node.send(echo_msg, remote_addr)
else:
if len(echo_node.peers) > 0:
echo_node.logger.info("Broadcasting message to all known peers...")
echo_node.broadcast(echo_msg)
else:
echo_node.logger.info("No peers known, waiting to discover peers...")
local_addr = ("0.0.0.0", 8888)
remote_addr = None
asyncio.run(main(local_addr, remote_addr))
Note that to run this example on a single machine, the port must be different
in the second node instance, e.g. local_addr = ("127.0.0.1", 8889), and then
the remote address must be set to the first node's address, e.g.
remote_addr = ("127.0.0.1", 8888). Multicast will not work locally because of
the different ports. If the interface is set to "0.0.0.0", multicast will work
across the LAN, but this will result in the node hearing its own multicast
messages; hence, the request_uri handler ignores messages from the local
machine.
(It is technically possible to get multicast to work in one direction on a
single machine by changing the .port property after one has started.)
Note also that when a peer is removed from the node's peer list, it is also unsubscribed from all URIs.
Custom Message Types
Custom message type classes can be created for protocols. Two helper functions are provided for creating and validating custom message types:
-
make_message_type_class(name: str, new_message_types: dict[str, int])Creates a new IntEnum that includes all default message types plus any custom types defined. -
validate_message_type_class(message_type_class: type[IntEnum], suppress_errors: bool = False)Validates a message type class created declaratively. ReturnsTrueif valid,Falseif invalid withsuppress_errors=True, or raises an exception otherwise.
Reserved Values: Values 0-30 are reserved for base protocol upgrades. Custom message types must use values >= 31.
Max Value: The maximum allowable value is 255. (Serialized as 1 byte.)
Example using make_message_type_class:
from netaio import make_message_type_class
CustomMessageType = make_message_type_class(
"CustomMessageType",
{"CUSTOM_ACTION": 50, "ANOTHER_ACTION": 51}
)
Example using declarative syntax with validation:
from netaio import validate_message_type_class
from enum import IntEnum
class MyMessageType(IntEnum):
REQUEST_URI = 0
RESPOND_URI = 1
# ... all required default types ...
DISCONNECT = 30
CUSTOM_ACTION = 50 # Must be >= 31
validate_message_type_class(MyMessageType) # Raises ValueError if invalid
Using a reserved value (0-30) for a custom message type causes
functions to raise a ValueError with a clear message indicating the
issue.
Plugin System
The plugin system is designed to be simple and easy to understand. Each plugin
implements a specific protocol, and the TCPServer, TCPClient, and
UDPNode classes automatically apply plugins to messages in the correct order
to preserve the encapsulation model (see below).
There are three types of plugins defined with typing.Protocol interfaces:
AuthPluginProtocolCipherPluginProtocolPeerPluginProtocol
Authentication/Authorization
TCPServer, TCPClient, and UDPNode support an optional
authentication/authorization plugin.
Each plugin is instantiated with a dict of configuration parameters, and it must
implement the AuthPluginProtocol. Once the plugin has been instantiated, it
can be passed to the TCPServer, TCPClient, and UDPNode constructors or set
on the instances themselves. An auth plugin can also be set on a per-handler
basis by passing the plugin as a keyword argument auth_plugin to the on or
once decorations, or the add_handler or add_ephemeral_handler methods (and
a handful of others). Currently, if an auth plugin is set both on the instance
and per-handler, both will be checked before the handler function is called, and
both will be applied to the response body; the per-handler plugin will be able to
overwrite any auth fields set by the instance plugin, which may break
communication -- each plugin instantiation should be configured to use its own
writeable auth data fields. Fields that are safe to reuse are nonce and ts,
as long as plugins are configured to only write those fields if they are not
already populated; see the bundled plugins for example implementations.
Currently, netaio includes an HMACAuthPlugin that can be used by the server
and client to authenticate and authorize messages. This uses a shared secret to
generate and check HMACs over message bodies.
The TapescriptAuthPlugin is included in the optional netaio.asymmetric
submodule, which requires tapescript
as a dependency and allows for customizable authentication/authorization models
using the tapescript DSL for access controls.
Example of additional auth layer
from netaio import TCPServer, TCPClient, HMACAuthPlugin, MessageType, Body, Message
import asyncio
outer_auth_plugin = HMACAuthPlugin(config={"secret": "test"})
inner_auth_plugin = HMACAuthPlugin(config={
"secret": "tset",
"hmac_field": "camh" # must be different to avoid overwriting the outer auth field
})
server = TCPServer(port=8888, auth_plugin=outer_auth_plugin)
client = TCPClient(host="127.0.0.1", port=8888, auth_plugin=outer_auth_plugin)
@server.on(MessageType.CREATE_URI, auth_plugin=inner_auth_plugin)
async def put_uri(msg: Message, writer: asyncio.StreamWriter):
body = Body.prepare(b'Resource saved.', uri=msg.body.uri)
return Message.prepare(body, MessageType.OK)
async def main():
task = asyncio.create_task(server.start())
await asyncio.sleep(0.1)
await client.connect()
await client.send(
Message.prepare(Body.prepare(b'test'), MessageType.CREATE_URI),
auth_plugin=inner_auth_plugin
)
result = await client.receive_once()
await client.close()
task.cancel()
return result
response = asyncio.run(main())
print(response)
Cipher (encryption/decryption)
TCPServer, TCPClient, and UDPNode support an optional cipher plugin. Each
plugin is instantiated with a dict of configuration parameters, and it must
implement the CipherPluginProtocol. Once the plugin has been instantiated, it
can be passed to the TCPServer, TCPClient, and UDPNode constructors or set
on the instances themselves. A cipher plugin can also be set on a per-handler
basis by passing the plugin as a keyword argument cipher_plugin to the on or
once decorations, or the add_handler or add_ephemeral_handler methods (and
a handful of others). If a cipher plugin is set both on the instance and
per-handler, both will be applied to the message, and care must be taken to
configure the inner plugin so that it does not overwrite the fields of the outer
plugin.
Currently, netaio includes a Sha256StreamCipherPlugin that can be used by
the server and client to encrypt and decrypt messages using a simple symmetric
stream cipher. This uses a shared secret key and per-message IVs. Note that the
encrypt_uri config option should be False to prevent the URI from being
encrypted when using this as an additional, inner layer of encryption, else the
URI will not be usable for routing requests/determining responses when the
default key extractor is used (i.e. handlers set on the tuple of MessageType
and URI).
The X25519CipherPlugin is included in the optional netaio.asymmetric
submodule, which requires PyNaCl as a
dependency and allows for asymmetric encryption using ECDHE (Elliptic Curve
Diffie-Hellman Exchange). Note that this plugin should be used as an inner
layer of encryption with automatic peer management and local peer data including
{'pubkey': SigningKey(seed).verify_key}.
Example of additional encryption layer
from netaio import (
TCPServer, TCPClient, Sha256StreamCipherPlugin, MessageType, Body, Message
)
import asyncio
outer_cipher_plugin = Sha256StreamCipherPlugin(config={"key": "test"})
inner_cipher_plugin = Sha256StreamCipherPlugin(config={
"key": "tset",
"iv_field": "iv2" # must be changed to avoid overwriting the outer auth field
})
server = TCPServer(port=8888, cipher_plugin=outer_cipher_plugin)
client = TCPClient(host="127.0.0.1", port=8888, cipher_plugin=outer_cipher_plugin)
@server.on(MessageType.REQUEST_URI, cipher_plugin=inner_cipher_plugin)
async def request_uri(msg: Message, writer: asyncio.StreamWriter):
body = Body.prepare(b'Super secret data.', uri=msg.body.uri)
return Message.prepare(body, MessageType.RESPOND_URI)
async def main():
task = asyncio.create_task(server.start())
await asyncio.sleep(0.1)
await client.connect()
await client.send(
Message.prepare(
Body.prepare(b'psst gimme the secret', uri=b'something'),
MessageType.REQUEST_URI
),
cipher_plugin=inner_cipher_plugin
)
result = await client.receive_once(cipher_plugin=inner_cipher_plugin)
await client.close()
task.cancel()
return result
response = asyncio.run(main())
print(response)
Peer Data Encoding/Decoding
To use the automatic peer management system, nodes accept a peer plugin. Each
plugin is instantiated with a dict of configuration parameters, and it must
implement the PeerPluginProtocol. Once the plugin has been instantiated, it
can be passed to the TCPServer, TCPClient, and UDPNode constructors or set
on the instances themselves. Unlike the auth and cipher plugins, this is used
only for encoding and decoding the data stored in Peer.data, and it is not
settable on a per-handler basis.
Currently, netaio includes a DefaultPeerPlugin that is used to encode and
decode peer data using the packify library
to encode/decode dicts.
Included Plugins
The following plugins are included for convenience and as references for making custom plugins:
HMACAuthPluginSha256StreamCipherPluginDefaultPeerPluginTapescriptAuthPluginX25519CipherPlugin
Example instantiation of HMACAuthPlugin
from netaio import HMACAuthPlugin
auth_plugin = HMACAuthPlugin({
"secret": "we attack at dawn",
"hmac_field": "hmac", # default; must be changed for inner plugin
"nonce_field": "nonce", # default; can be used by all plugins at all layers
"ts_field": "ts", # default; can be used by all plugins at all layers
})
Example instantiation of Sha256StreamCipherPlugin
from netaio import Sha256StreamCipherPlugin
cipher_plugin = Sha256StreamCipherPlugin({
"key": "the key to success is held by people better than you",
"iv_field": "iv", # default; must be changed for inner cipher plugins
"encrypt_uri": True, # default; should be False for inner cipher plugins
})
Example instantiation of TapescriptAuthPlugin
from nacl.signing import SigningKey
from netaio.asymmetric import TapescriptAuthPlugin
import os
import tapescript
seed = os.urandom(32)
vkey = SigningKey(seed).verify_key
# pretending there is an admin key to allow admin auth bypass
if os.path.exists('admin.vkey.hex'):
with open('admin.vkey.hex', 'r') as f:
admin_vkey = bytes.fromhex(f.read())
else:
admin_vkey = vkey
inner_script = tapescript.make_single_sig_lock(admin_vkey)
lock = tapescript.make_taproot_lock(vkey, script=inner_script)
witness_func = lambda seed, sigfields: tapescript.make_taproot_witness_keyspend(
seed, sigfields, committed_script=inner_script
)
auth_plugin = TapescriptAuthPlugin({
'seed': seed,
'lock': lock,
'nonce_field': 'nonce', # default; can be used by all plugins at all layers
'ts_field': 'ts', # default; can be used by all plugins at all layers
'witness_field': 'witness', # default; must be changed for inner plugin
'witness_func': witness_func,
'contracts': {}, # default
'plugins': {}, # default
})
Example instantiation of X25519CipherPlugin
from netaio.asymmetric import X25519CipherPlugin, Peer, DefaultPeerPlugin
import os
seed = os.urandom(32)
cipher_plugin = X25519CipherPlugin({
'seed': seed,
'encrypt_uri': False, # default
# should not be True unless peers are set manually and this is the outer cipher
})
# for use with peer management, the pubkey should be sent to peers
local_peer = Peer(
addrs={('0.0.0.0', 8888)},
id=bytes(cipher_plugin.pubk),
data=DefaultPeerPlugin().encode_data({
"pubkey": bytes(cipher_plugin.pubk),
"vkey": bytes(cipher_plugin.vkey), # optional
}),
)
More documentation on the asymmetric plugins can be found in asymmetric.md.
Encapsulation Model
The encapsulation model for plugin interactions with messages is as follows:
Send
- Per-handler/injected
cipher_plugin.encrypt - Per-handler/injected
auth_plugin.make - Instance
self.cipher_plugin.encrypt - Instance
self.auth_plugin.make
Receive
- Instance
self.auth_plugin.check - Instance
self.cipher_plugin.decrypt - Per-handler/injected
auth_plugin.check - Per-handler/injected
cipher_plugin.decrypt
Peer Management
This package includes an optional automatic peer management system, which can be
enabled on TCPServer, TCPClient, and UDPNode by awaiting a call to the
manage_peers_automatically() method. For TCP, this will cause clients and
servers to send ADVERTISE_PEER/PEER_DISCOVERED messages to each other upon
connection or upon enabling of peer management for existing connections. For UDP,
this will cause nodes to multicast ADVERTISE_PEER messages every 20 seconds by
default, and any node that receives such a message will respond to that peer
with a PEER_DISCOVERED message that includes its own information. This will
populate the local peer lists on each server/client/node, enabling the
broadcast method to send messages to all known peers.
The UDPNode.manage_peers_automatically method can accept optional arguments
advertise_every: int = 20 and peer_timeout: int = 60. All three node types
will accept the following optional arguments:
app_id: bytes = b'netaio'auth_plugin: AuthPluginProtocol|None = Nonecipher_plugin: CipherPluginProtocol|None = None
The auth_plugin and cipher_plugin provided here will be used only for the
peer advertisement and response traffic.
Upon calling await client_or_server_or_node.stop_peer_management(), a
DISCONNECT message will be sent by TCP nodes or multicast by UDP nodes; any
node that receives a DISCONNECT message will remove that peer from the local
peer lists and all subscriptions.
Testing
To test, clone the repo, install the dependencies (preferably within a virtual
environment) using pip install -r requirements.txt, and run
python -m unittest discover -s tests. Or to run the individual tests and see
the output separated by test file, instead run
find tests/ -name test_*.py -print -exec python {} \;.
Currently, there are 20 unit tests and 17 e2e tests. The unit tests cover the bundled plugins and miscellaneous features. The e2e tests start a server and client (or 2 clients), then send messages from the client to the server and receive responses; the UDP e2e test suite starts 2 nodes and treats them like a server and client to make testing a bit simpler and easier to follow. The automatic peer management system is also tested in both TCP and UDP. The bundled plugins are used for the e2e tests, and authentication failure cases are also tested.
License
Copyright (c) 2026 Jonathan Voss (k98kurz)
Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee is hereby granted, provided that the above copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file netaio-0.0.9.tar.gz.
File metadata
- Download URL: netaio-0.0.9.tar.gz
- Upload date:
- Size: 73.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e85f1e20925ee0a78b143c421332c3582f173591c9644c5653577f7cef8a91d8
|
|
| MD5 |
0ae08682b47b714974e44085952aa3ad
|
|
| BLAKE2b-256 |
337b2fea8f367818f5f905eacf4d5deb10d099f7fc8f1be148408431b582bb0a
|
File details
Details for the file netaio-0.0.9-py3-none-any.whl.
File metadata
- Download URL: netaio-0.0.9-py3-none-any.whl
- Upload date:
- Size: 48.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f0c33e397fe2d12976b0e5df0e77dd6a5da9b62befcf5f244e4bc51b2550695c
|
|
| MD5 |
43d5c5219b2cb1c902630f1a8eef547e
|
|
| BLAKE2b-256 |
2ba79bc5defe7d6c3e34b2e75faf0d3bf6cc91f69b2096b06501f42aadf6addf
|