Skip to main content

Simple asyncio TCP client and server and UDP node library inspired by fastapi

Project description

netaio

This is designed to be a simple and easy to use asyncio-based TCP client and server and UDP node library inspired by fastapi but for non-HTTP use cases.

Status

This is currently a work-in-progress. Remaining work before the v0.1.0 release:

  • Authorization plugin system
  • Cipher plugin system
  • Optional authorization plugin using HMAC
  • Optional cipher plugin using simple symmetric stream cipher
  • UDP node with multicast
  • Automatic peer advertisement/discovery/management for UDP node
  • Error/errored message handling system
  • Optional authorization plugin using tapescript
  • Optional cipher plugin using Curve25519 asymmetric encryption
  • Optional authorization plugin using Hashcash/PoW for anti-spam DoS protection
  • Ephemeral handlers (i.e. handlers that are removed after first use)
  • IPv6 support
  • Core daemon to proxy traffic for local apps
  • E2e encrypted chat app example

Issues are tracked here. Historical changes can be found in the changelog.

Usage

Install with pip install netaio. To use the optional asymmetric cryptography plugins, install with pip install netaio[asymmetric].

Brief examples are shown below. For more documentation, see the dox.md file generated by autodox.

TCPServer

from netaio import (
    TCPServer, Body, Message, MessageType, HMACAuthPlugin,
    make_respond_uri_msg, make_ok_msg,
    make_error_msg, make_not_found_msg, make_not_permitted_msg
)
import asyncio


server = TCPServer(port=8888, auth_plugin=HMACAuthPlugin(config={"secret": "test"}))
auth_plugin2 = HMACAuthPlugin(config={"secret": "adminpassword", "hmac_field": "camh"})
storage = {}

@server.on(MessageType.CREATE_URI, auth_plugin=auth_plugin2)
async def handle_create(msg: Message, writer: asyncio.StreamWriter):
    if msg.body.uri in storage:
        return make_error_msg('cannot overwrite', msg.body.uri)
    storage[msg.body.uri] = msg.body.content
    server.logger.info(f'CREATED resource: {msg.body.uri}')
    return make_ok_msg(b'stored', msg.body.uri)

@server.on(MessageType.REQUEST_URI)
async def handle_request(msg: Message, writer: asyncio.StreamWriter):
    server.logger.info(f'REQUESTED resource: {msg.body.uri}')
    if msg.body.uri not in storage:
        return make_not_found_msg(uri=msg.body.uri)
    return make_respond_uri_msg(storage[msg.body.uri], msg.body.uri)

@server.on((MessageType.REQUEST_URI, b'echo'))
async def handle_echo(msg: Message, writer: asyncio.StreamWriter):
    return make_respond_uri_msg(msg.body.content, b'echo')

@server.on(MessageType.UPDATE_URI, auth_plugin=auth_plugin2)
async def handle_update(msg: Message, writer: asyncio.StreamWriter):
    if msg.body.uri not in storage:
        return make_not_found_msg(uri=msg.body.uri)
    storage[msg.body.uri] = msg.body.content
    server.logger.info(f'UPDATED resource: {msg.body.uri}')
    return make_ok_msg(b'stored', msg.body.uri)

@server.on(MessageType.DELETE_URI, auth_plugin=auth_plugin2)
async def handle_delete(msg: Message, writer: asyncio.StreamWriter):
    if msg.body.uri not in storage:
        return make_not_found_msg(uri=msg.body.uri)
    storage.pop(msg.body.uri)
    server.logger.info(f'DELETED resource: {msg.body.uri}')
    return make_ok_msg(b'deleted', msg.body.uri)

@server.on(MessageType.SUBSCRIBE_URI)
async def subscribe(msg: Message, writer: asyncio.StreamWriter):
    server.subscribe(msg.body.uri, writer)
    return Message.prepare(
        Body.prepare(b'', uri=msg.body.uri),
        MessageType.CONFIRM_SUBSCRIBE
    )

@server.on(MessageType.UNSUBSCRIBE_URI)
async def unsubscribe(msg: Message, writer: asyncio.StreamWriter):
    server.unsubscribe(msg.body.uri, writer)
    return Message.prepare(
        Body.prepare(b'', uri=msg.body.uri),
        MessageType.CONFIRM_UNSUBSCRIBE
    )

asyncio.run(server.start())

TCPClient

from netaio import TCPClient, Body, Message, MessageType, HMACAuthPlugin
from secrets import token_hex
import asyncio


client = TCPClient(
    "127.0.0.1", 8888, auth_plugin=HMACAuthPlugin(config={"secret": "test"})
)
auth_plugin2 = HMACAuthPlugin(config={"secret": "adminpassword", "hmac_field": "camh"})

async def run_client():
    # connect, then test the echo endpoint
    await client.connect()
    response = await client.request(b'echo', content=b'test')
    print(response)
    # test full CRUD methods: starting with CREATE
    content = ('testing it' + token_hex(4)).encode()
    response = await client.create(b'test1', content, auth_plugin=auth_plugin2)
    print(response)
    # now READ
    response = await client.request(b'test1')
    assert response.body.content == content, (response.body.content, content)
    print(response)
    # now UPDATE
    new_content = ('testing it' + token_hex(4)).encode()
    response = await client.update(b'test1', new_content, auth_plugin=auth_plugin2)
    print(response)
    # now READ again
    response = await client.request(b'test1')
    assert response.body.content == new_content, (response.body.content, new_content)
    print(response)
    # now DELETE
    response = await client.delete(b'test1', auth_plugin=auth_plugin2)
    print(response)
    # now READ to prove deletion
    response = await client.request(b'test1')
    assert response.header.message_type == MessageType.NOT_FOUND, \
        response.header.message_type
    print(response)

asyncio.run(run_client())

UDPNode

from netaio import UDPNode, Peer, Body, Message, MessageType, HMACAuthPlugin
from os import urandom
import asyncio

local_peer = Peer(
    addrs={('127.0.0.1', 8888)},
    id=urandom(16),
    data=b''
)

echo_node = UDPNode(
    local_peer=local_peer,
    auth_plugin=HMACAuthPlugin(config={"secret": "test"})
)

@echo_node.on(MessageType.REQUEST_URI)
def request_uri(msg: Message, addr: tuple[str, int]):
    echo_node.logger.info("Sending echo to %s...", addr)
    return Message.prepare(msg.body, MessageType.OK)

@echo_node.on(MessageType.OK)
def echo(msg: Message, addr: tuple[str, int]):
    echo_node.logger.info("Received echo from %s.", addr)

echo_msg = Message.prepare(Body.prepare(b'echo'), MessageType.REQUEST_URI)

async def main(local_addr: tuple[str, int], remote_addr: tuple[str, int]|None = None):
    echo_node.interface = local_addr[0]
    echo_node.port = local_addr[1]
    await echo_node.start()
    await echo_node.manage_peers_automatically(advertise_every=1, peer_timeout=3)
    while True:
        await asyncio.sleep(1)
        if remote_addr:
            echo_node.logger.info("Sending message to %s...", remote_addr)
            echo_node.send(echo_msg, remote_addr)
        else:
            if len(echo_node.peers) > 0:
                echo_node.logger.info("Broadcasting message to all known peers...")
                echo_node.broadcast(echo_msg)
            else:
                echo_node.logger.info("No peers known, waiting to discover peers...")

local_addr = ("0.0.0.0", 8888)
remote_addr = None
asyncio.run(main(local_addr, remote_addr))

Note that to run this example on a single machine, the port must be different in the second node instance, e.g. local_addr = ("127.0.0.1", 8889), and then the remote address must be set to the first node's address, e.g. remote_addr = ("127.0.0.1", 8888). Multicast will not work locally because of the different ports. If the interface is set to "0.0.0.0", multicast will work across the LAN, but this will result in the node hearing its own multicast messages; hence, the request_uri handler ignores messages from the local machine.

(It is technically possible to get multicast to work in one direction on a single machine by changing the .port property after one has started.)

Note also that when a peer is removed from the node's peer list, it is also unsubscribed from all URIs.

Custom Message Types

Custom message type classes can be created for protocols. Two helper functions are provided for creating and validating custom message types:

  • make_message_type_class(name: str, new_message_types: dict[str, int]) Creates a new IntEnum that includes all default message types plus any custom types defined.

  • validate_message_type_class(message_type_class: type[IntEnum], suppress_errors: bool = False) Validates a message type class created declaratively. Returns True if valid, False if invalid with suppress_errors=True, or raises an exception otherwise.

Reserved Values: Values 0-30 are reserved for base protocol upgrades. Custom message types must use values >= 31.

Max Value: The maximum allowable value is 255. (Serialized as 1 byte.)

Example using make_message_type_class:

from netaio import make_message_type_class

CustomMessageType = make_message_type_class(
    "CustomMessageType",
    {"CUSTOM_ACTION": 50, "ANOTHER_ACTION": 51}
)

Example using declarative syntax with validation:

from netaio import validate_message_type_class
from enum import IntEnum

class MyMessageType(IntEnum):
    REQUEST_URI = 0
    RESPOND_URI = 1
    # ... all required default types ...
    DISCONNECT = 30
    CUSTOM_ACTION = 50  # Must be >= 31

validate_message_type_class(MyMessageType)  # Raises ValueError if invalid

Using a reserved value (0-30) for a custom message type causes functions to raise a ValueError with a clear message indicating the issue.

Plugin System

The plugin system is designed to be simple and easy to understand. Each plugin implements a specific protocol, and the TCPServer, TCPClient, and UDPNode classes automatically apply plugins to messages in the correct order to preserve the encapsulation model (see below).

There are three types of plugins defined with typing.Protocol interfaces:

  • AuthPluginProtocol
  • CipherPluginProtocol
  • PeerPluginProtocol

Authentication/Authorization

TCPServer, TCPClient, and UDPNode support an optional authentication/authorization plugin. Each plugin is instantiated with a dict of configuration parameters, and it must implement the AuthPluginProtocol. Once the plugin has been instantiated, it can be passed to the TCPServer, TCPClient, and UDPNode constructors or set on the instances themselves. An auth plugin can also be set on a per-handler basis by passing the plugin as a keyword argument auth_plugin to the on or once decorations, or the add_handler or add_ephemeral_handler methods (and a handful of others). Currently, if an auth plugin is set both on the instance and per-handler, both will be checked before the handler function is called, and both will be applied to the response body; the per-handler plugin will be able to overwrite any auth fields set by the instance plugin, which may break communication -- each plugin instantiation should be configured to use its own writeable auth data fields. Fields that are safe to reuse are nonce and ts, as long as plugins are configured to only write those fields if they are not already populated; see the bundled plugins for example implementations.

Currently, netaio includes an HMACAuthPlugin that can be used by the server and client to authenticate and authorize messages. This uses a shared secret to generate and check HMACs over message bodies.

The TapescriptAuthPlugin is included in the optional netaio.asymmetric submodule, which requires tapescript as a dependency and allows for customizable authentication/authorization models using the tapescript DSL for access controls.

Example of additional auth layer
from netaio import TCPServer, TCPClient, HMACAuthPlugin, MessageType, Body, Message
import asyncio

outer_auth_plugin = HMACAuthPlugin(config={"secret": "test"})
inner_auth_plugin = HMACAuthPlugin(config={
    "secret": "tset",
    "hmac_field": "camh" # must be different to avoid overwriting the outer auth field
})
server = TCPServer(port=8888, auth_plugin=outer_auth_plugin)
client = TCPClient(host="127.0.0.1", port=8888, auth_plugin=outer_auth_plugin)

@server.on(MessageType.CREATE_URI, auth_plugin=inner_auth_plugin)
async def put_uri(msg: Message, writer: asyncio.StreamWriter):
    body = Body.prepare(b'Resource saved.', uri=msg.body.uri)
    return Message.prepare(body, MessageType.OK)

async def main():
    task = asyncio.create_task(server.start())
    await asyncio.sleep(0.1)
    await client.connect()
    await client.send(
        Message.prepare(Body.prepare(b'test'), MessageType.CREATE_URI),
        auth_plugin=inner_auth_plugin
    )
    result = await client.receive_once()
    await client.close()
    task.cancel()
    return result

response = asyncio.run(main())
print(response)

Cipher (encryption/decryption)

TCPServer, TCPClient, and UDPNode support an optional cipher plugin. Each plugin is instantiated with a dict of configuration parameters, and it must implement the CipherPluginProtocol. Once the plugin has been instantiated, it can be passed to the TCPServer, TCPClient, and UDPNode constructors or set on the instances themselves. A cipher plugin can also be set on a per-handler basis by passing the plugin as a keyword argument cipher_plugin to the on or once decorations, or the add_handler or add_ephemeral_handler methods (and a handful of others). If a cipher plugin is set both on the instance and per-handler, both will be applied to the message, and care must be taken to configure the inner plugin so that it does not overwrite the fields of the outer plugin.

Currently, netaio includes a Sha256StreamCipherPlugin that can be used by the server and client to encrypt and decrypt messages using a simple symmetric stream cipher. This uses a shared secret key and per-message IVs. Note that the encrypt_uri config option should be False to prevent the URI from being encrypted when using this as an additional, inner layer of encryption, else the URI will not be usable for routing requests/determining responses when the default key extractor is used (i.e. handlers set on the tuple of MessageType and URI).

The X25519CipherPlugin is included in the optional netaio.asymmetric submodule, which requires PyNaCl as a dependency and allows for asymmetric encryption using ECDHE (Elliptic Curve Diffie-Hellman Exchange). Note that this plugin should be used as an inner layer of encryption with automatic peer management and local peer data including {'pubkey': SigningKey(seed).verify_key}.

Example of additional encryption layer
from netaio import (
    TCPServer, TCPClient, Sha256StreamCipherPlugin, MessageType, Body, Message
)
import asyncio

outer_cipher_plugin = Sha256StreamCipherPlugin(config={"key": "test"})
inner_cipher_plugin = Sha256StreamCipherPlugin(config={
    "key": "tset",
    "iv_field": "iv2" # must be changed to avoid overwriting the outer auth field
})
server = TCPServer(port=8888, cipher_plugin=outer_cipher_plugin)
client = TCPClient(host="127.0.0.1", port=8888, cipher_plugin=outer_cipher_plugin)

@server.on(MessageType.REQUEST_URI, cipher_plugin=inner_cipher_plugin)
async def request_uri(msg: Message, writer: asyncio.StreamWriter):
    body = Body.prepare(b'Super secret data.', uri=msg.body.uri)
    return Message.prepare(body, MessageType.RESPOND_URI)

async def main():
    task = asyncio.create_task(server.start())
    await asyncio.sleep(0.1)
    await client.connect()
    await client.send(
        Message.prepare(
            Body.prepare(b'psst gimme the secret', uri=b'something'),
            MessageType.REQUEST_URI
        ),
        cipher_plugin=inner_cipher_plugin
    )
    result = await client.receive_once(cipher_plugin=inner_cipher_plugin)
    await client.close()
    task.cancel()
    return result

response = asyncio.run(main())
print(response)

Peer Data Encoding/Decoding

To use the automatic peer management system, nodes accept a peer plugin. Each plugin is instantiated with a dict of configuration parameters, and it must implement the PeerPluginProtocol. Once the plugin has been instantiated, it can be passed to the TCPServer, TCPClient, and UDPNode constructors or set on the instances themselves. Unlike the auth and cipher plugins, this is used only for encoding and decoding the data stored in Peer.data, and it is not settable on a per-handler basis.

Currently, netaio includes a DefaultPeerPlugin that is used to encode and decode peer data using the packify library to encode/decode dicts.

Included Plugins

The following plugins are included for convenience and as references for making custom plugins:

  • HMACAuthPlugin
  • Sha256StreamCipherPlugin
  • DefaultPeerPlugin
  • TapescriptAuthPlugin
  • X25519CipherPlugin
Example instantiation of HMACAuthPlugin
from netaio import HMACAuthPlugin
auth_plugin = HMACAuthPlugin({
    "secret": "we attack at dawn",
    "hmac_field": "hmac", # default; must be changed for inner plugin
    "nonce_field": "nonce", # default; can be used by all plugins at all layers
    "ts_field": "ts", # default; can be used by all plugins at all layers
})
Example instantiation of Sha256StreamCipherPlugin
from netaio import Sha256StreamCipherPlugin
cipher_plugin = Sha256StreamCipherPlugin({
    "key": "the key to success is held by people better than you",
    "iv_field": "iv", # default; must be changed for inner cipher plugins
    "encrypt_uri": True, # default; should be False for inner cipher plugins
})
Example instantiation of TapescriptAuthPlugin
from nacl.signing import SigningKey
from netaio.asymmetric import TapescriptAuthPlugin
import os
import tapescript

seed = os.urandom(32)
vkey = SigningKey(seed).verify_key

# pretending there is an admin key to allow admin auth bypass
if os.path.exists('admin.vkey.hex'):
    with open('admin.vkey.hex', 'r') as f:
        admin_vkey = bytes.fromhex(f.read())
else:
    admin_vkey = vkey

inner_script = tapescript.make_single_sig_lock(admin_vkey)

lock = tapescript.make_taproot_lock(vkey, script=inner_script)
witness_func = lambda seed, sigfields: tapescript.make_taproot_witness_keyspend(
    seed, sigfields, committed_script=inner_script
)
auth_plugin = TapescriptAuthPlugin({
    'seed': seed,
    'lock': lock,
    'nonce_field': 'nonce', # default; can be used by all plugins at all layers
    'ts_field': 'ts', # default; can be used by all plugins at all layers
    'witness_field': 'witness', # default; must be changed for inner plugin
    'witness_func': witness_func,
    'contracts': {}, # default
    'plugins': {}, # default
})
Example instantiation of X25519CipherPlugin
from netaio.asymmetric import X25519CipherPlugin, Peer, DefaultPeerPlugin
import os

seed = os.urandom(32)
cipher_plugin = X25519CipherPlugin({
    'seed': seed,
    'encrypt_uri': False, # default
    # should not be True unless peers are set manually and this is the outer cipher
})

# for use with peer management, the pubkey should be sent to peers
local_peer = Peer(
    addrs={('0.0.0.0', 8888)},
    id=bytes(cipher_plugin.pubk),
    data=DefaultPeerPlugin().encode_data({
        "pubkey": bytes(cipher_plugin.pubk),
        "vkey": bytes(cipher_plugin.vkey), # optional
    }),
)

More documentation on the asymmetric plugins can be found in asymmetric.md.

Encapsulation Model

The encapsulation model for plugin interactions with messages is as follows:

Send
  1. Per-handler/injected cipher_plugin.encrypt
  2. Per-handler/injected auth_plugin.make
  3. Instance self.cipher_plugin.encrypt
  4. Instance self.auth_plugin.make
Receive
  1. Instance self.auth_plugin.check
  2. Instance self.cipher_plugin.decrypt
  3. Per-handler/injected auth_plugin.check
  4. Per-handler/injected cipher_plugin.decrypt

Peer Management

This package includes an optional automatic peer management system, which can be enabled on TCPServer, TCPClient, and UDPNode by awaiting a call to the manage_peers_automatically() method. For TCP, this will cause clients and servers to send ADVERTISE_PEER/PEER_DISCOVERED messages to each other upon connection or upon enabling of peer management for existing connections. For UDP, this will cause nodes to multicast ADVERTISE_PEER messages every 20 seconds by default, and any node that receives such a message will respond to that peer with a PEER_DISCOVERED message that includes its own information. This will populate the local peer lists on each server/client/node, enabling the broadcast method to send messages to all known peers.

The UDPNode.manage_peers_automatically method can accept optional arguments advertise_every: int = 20 and peer_timeout: int = 60. All three node types will accept the following optional arguments:

  • app_id: bytes = b'netaio'
  • auth_plugin: AuthPluginProtocol|None = None
  • cipher_plugin: CipherPluginProtocol|None = None

The auth_plugin and cipher_plugin provided here will be used only for the peer advertisement and response traffic.

Upon calling await client_or_server_or_node.stop_peer_management(), a DISCONNECT message will be sent by TCP nodes or multicast by UDP nodes; any node that receives a DISCONNECT message will remove that peer from the local peer lists and all subscriptions.

Testing

To test, clone the repo, install the dependencies (preferably within a virtual environment) using pip install -r requirements.txt, and run python -m unittest discover -s tests. Or to run the individual tests and see the output separated by test file, instead run find tests/ -name test_*.py -print -exec python {} \;.

Currently, there are 20 unit tests and 17 e2e tests. The unit tests cover the bundled plugins and miscellaneous features. The e2e tests start a server and client (or 2 clients), then send messages from the client to the server and receive responses; the UDP e2e test suite starts 2 nodes and treats them like a server and client to make testing a bit simpler and easier to follow. The automatic peer management system is also tested in both TCP and UDP. The bundled plugins are used for the e2e tests, and authentication failure cases are also tested.

License

Copyright (c) 2026 Jonathan Voss (k98kurz)

Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee is hereby granted, provided that the above copyright notice and this permission notice appear in all copies.

THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

netaio-0.0.9.tar.gz (73.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

netaio-0.0.9-py3-none-any.whl (48.4 kB view details)

Uploaded Python 3

File details

Details for the file netaio-0.0.9.tar.gz.

File metadata

  • Download URL: netaio-0.0.9.tar.gz
  • Upload date:
  • Size: 73.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for netaio-0.0.9.tar.gz
Algorithm Hash digest
SHA256 e85f1e20925ee0a78b143c421332c3582f173591c9644c5653577f7cef8a91d8
MD5 0ae08682b47b714974e44085952aa3ad
BLAKE2b-256 337b2fea8f367818f5f905eacf4d5deb10d099f7fc8f1be148408431b582bb0a

See more details on using hashes here.

File details

Details for the file netaio-0.0.9-py3-none-any.whl.

File metadata

  • Download URL: netaio-0.0.9-py3-none-any.whl
  • Upload date:
  • Size: 48.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for netaio-0.0.9-py3-none-any.whl
Algorithm Hash digest
SHA256 f0c33e397fe2d12976b0e5df0e77dd6a5da9b62befcf5f244e4bc51b2550695c
MD5 43d5c5219b2cb1c902630f1a8eef547e
BLAKE2b-256 2ba79bc5defe7d6c3e34b2e75faf0d3bf6cc91f69b2096b06501f42aadf6addf

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page