Skip to main content

Simple asyncio TCP client and server and UDP node library inspired by fastapi

Project description

netaio

This is designed to be a simple and easy to use asyncio-based TCP client and server library inspired by fastapi but for non-HTTP use cases.

Status

This is currently a work-in-progress. Remaining work before the v0.1.0 release:

  • Authorization plugin system
  • Cipher plugin system
  • Optional authorization plugin using HMAC
  • Optional cipher plugin using simple symmetric stream cipher
  • UDP node with multicast
  • Automatic peer advertisement/discovery/management for UDP node
  • Error/errored message handling system
  • Optional authorization plugin using tapescript
  • Optional cipher plugin using Curve25519 asymmetric encryption
  • Optional authorization plugin using Hashcash/PoW for anti-spam DoS protection
  • Ephemeral handlers (i.e. handlers that are removed after first use)
  • Core daemon to proxy traffic for local apps
  • E2e encrypted chat app example

After that, issues will be tracked here.

Usage

Install with pip install netaio. Brief examples are shown below. For more documentation, see the dox.md file generated by autodox.

TCPServer

from netaio import TCPServer, Body, Message, MessageType, HMACAuthPlugin
import asyncio


server = TCPServer(port=8888, auth_plugin=HMACAuthPlugin(config={"secret": "test"}))

@server.on((MessageType.REQUEST_URI, b'something'))
async def something(msg: Message, writer: asyncio.StreamWriter):
    body = Body.prepare(b'This is it.', uri=b'something')
    return Message.prepare(body, MessageType.RESPOND_URI)

@server.on(MessageType.SUBSCRIBE_URI)
async def subscribe(msg: Message, writer: asyncio.StreamWriter):
    server.subscribe(msg.body.uri, writer)
    return Message.prepare(Body.prepare(b'', uri=msg.body.uri), MessageType.CONFIRM_SUBSCRIBE)

@server.on(MessageType.UNSUBSCRIBE_URI)
async def unsubscribe(msg: Message, writer: asyncio.StreamWriter):
    server.unsubscribe(msg.body.uri, writer)
    return Message.prepare(Body.prepare(b'', uri=msg.body.uri), MessageType.CONFIRM_UNSUBSCRIBE)

asyncio.run(server.start())

TCPClient

from netaio import TCPClient, Body, Message, MessageType, HMACAuthPlugin
import asyncio


client = TCPClient("127.0.0.1", 8888, auth_plugin=HMACAuthPlugin(config={"secret": "test"}))
received_resources = {}

@client.on(MessageType.RESPOND_URI)
def echo(msg: Message, writer: asyncio.StreamWriter):
    received_resources[msg.body.uri] = msg.body.content

async def run_client():
    request_body = Body.prepare(b'pls gibs me dat', uri=b'something')
    request_message = Message.prepare(request_body, MessageType.REQUEST_URI)
    await client.connect()
    await client.send(request_message)
    await client.receive_once()

asyncio.run(run_client())

print(received_resources)

UDPNode

from netaio import UDPNode, Peer, Body, Message, MessageType, HMACAuthPlugin
from os import urandom
import asyncio

local_peer = Peer(
    addrs={('127.0.0.1', 8888)},
    peer_id=urandom(16),
    peer_data=b''
)

echo_node = UDPNode(
    local_peer=local_peer,
    auth_plugin=HMACAuthPlugin(config={"secret": "test"})
)

@echo_node.on(MessageType.REQUEST_URI)
def request_uri(msg: Message, addr: tuple[str, int]):
    echo_node.logger.info("Sending echo to %s...", addr)
    return Message.prepare(msg.body, MessageType.OK)

@echo_node.on(MessageType.OK)
def echo(msg: Message, addr: tuple[str, int]):
    echo_node.logger.info("Received echo from %s.", addr)

echo_msg = Message.prepare(Body.prepare(b'echo'), MessageType.REQUEST_URI)

async def main(local_addr: tuple[str, int], remote_addr: tuple[str, int]|None = None):
    echo_node.interface = local_addr[0]
    echo_node.port = local_addr[1]
    await echo_node.start()
    await echo_node.manage_peers_automatically(advertise_every=1, peer_timeout=3)
    while True:
        await asyncio.sleep(1)
        if remote_addr:
            echo_node.logger.info("Sending message to %s...", remote_addr)
            echo_node.send(echo_msg, remote_addr)
        else:
            if len(echo_node.peers) > 0:
                echo_node.logger.info("Broadcasting message to all known peers...")
                echo_node.broadcast(echo_msg)
            else:
                echo_node.logger.info("No peers known, waiting to discover peers...")

local_addr = ("0.0.0.0", 8888)
remote_addr = None
asyncio.run(main(local_addr, remote_addr))

Note that to run this example on a single machine, the port must be different in the second node instance, e.g. local_addr = ("127.0.0.1", 8889), and then the remote address must be set to the first node's address, e.g. remote_addr = ("127.0.0.1", 8888). Multicast will not work locally because of the different ports. If the interface is set to "0.0.0.0", multicast will work across the LAN, but this will result in the node hearing its own multicast messages; hence, the request_uri handler ignores messages from the local machine.

(It is technically possible to get multicast to work in one direction on a single machine by changing the .port property after one has started.)

Authentication/Authorization

The server and client support an optional authentication/authorization plugin. Each plugin is instantiated with a dict of configuration parameters, and it must implement the AuthPluginProtocol (i.e. have make, check, and error methods). Once the plugin has been instantiated, it can be passed to the TCPServer and TCPClient constructors or set on the client or server instances themselves. An auth plugin can also be set on a per-handler basis by passing the plugin as a second argument to the on method. Currently, if an auth plugin is set both on the instance and per-handler, both will be checked before the handler function is called, and both will be applied to the response body; the per-handler plugin will be able to overwrite any auth fields set by the instance plugin.

Currently, netaio includes an HMACAuthPlugin that can be used by the server and client to authenticate and authorize requests. This uses a shared secret to generate and check HMACs over message bodies.

Example of additional auth layer
from netaio import TCPServer, TCPClient, HMACAuthPlugin, MessageType, Body, Message
import asyncio

outer_auth_plugin = HMACAuthPlugin(config={"secret": "test"})
inner_auth_plugin = HMACAuthPlugin(config={"secret": "tset", "hmac_field": "camh"})
server = TCPServer(port=8888, auth_plugin=outer_auth_plugin)
client = TCPClient(host="127.0.0.1", port=8888, auth_plugin=outer_auth_plugin)

@server.on(MessageType.CREATE_URI, auth_plugin=inner_auth_plugin)
async def put_uri(msg: Message, writer: asyncio.StreamWriter):
    body = Body.prepare(b'Resource saved.', uri=msg.body.uri)
    return Message.prepare(body, MessageType.OK)

async def main():
    task = asyncio.create_task(server.start())
    await asyncio.sleep(0.1)
    await client.connect()
    await client.send(
        Message.prepare(Body.prepare(b'test'), MessageType.CREATE_URI),
        auth_plugin=inner_auth_plugin
    )
    result = await client.receive_once()
    await client.close()
    task.cancel()
    return result

response = asyncio.run(main())
print(response)

Cipher (encryption/decryption)

The server and client support an optional cipher plugin. Each plugin is instantiated with a dict of configuration parameters, and it must implement the CipherPluginProtocol (i.e. have encrypt and decrypt methods). Once the plugin has been instantiated, it can be passed to the TCPServer and TCPClient constructors or set on the client or server instances themselves. a cipher plugin can also be set on a per-handler basis by passing the plugin as a third argument to the on method. If a cipher plugin is set both on the instance and per-handler, both will be applied to the message.

Currently, netaio includes a Sha256StreamCipherPlugin that can be used by the server and client to encrypt and decrypt messages using a simple symmetric stream cipher. This uses a shared secret key and per-message IVs. Note that the encrypt_uri config option should be False to prevent the URI from being encrypted when using this as an additional, inner layer of encryption, else the URI will not be usable for routing requests/determining responses when the default key extractor is used (i.e. handlers set on the tuple of MessageType and URI).

Example of additional encryption layer
from netaio import TCPServer, TCPClient, Sha256StreamCipherPlugin, MessageType, Body, Message
import asyncio

outer_cipher_plugin = Sha256StreamCipherPlugin(config={"key": "test"})
inner_cipher_plugin = Sha256StreamCipherPlugin(config={"key": "tset", "iv_field": "iv2"})
server = TCPServer(port=8888, cipher_plugin=outer_cipher_plugin)
client = TCPClient(host="127.0.0.1", port=8888, cipher_plugin=outer_cipher_plugin)

@server.on(MessageType.REQUEST_URI, cipher_plugin=inner_cipher_plugin)
async def request_uri(msg: Message, writer: asyncio.StreamWriter):
    body = Body.prepare(b'Super secret data.', uri=msg.body.uri)
    return Message.prepare(body, MessageType.RESPOND_URI)

async def main():
    task = asyncio.create_task(server.start())
    await asyncio.sleep(0.1)
    await client.connect()
    await client.send(
        Message.prepare(
            Body.prepare(b'psst gimme the secret', uri=b'something'),
            MessageType.REQUEST_URI
        ),
        cipher_plugin=inner_cipher_plugin
    )
    result = await client.receive_once(cipher_plugin=inner_cipher_plugin)
    await client.close()
    task.cancel()
    return result

response = asyncio.run(main())
print(response)

Encapsulation

The encapsulation model for plugin interactions with messages is as follows:

Send

  1. Per-handler/injected cipher_plugin.encrypt
  2. Per-handler/injected auth_plugin.make
  3. Instance self.cipher_plugin.encrypt
  4. Instance self.auth_plugin.make

Receive

  1. Instance self.auth_plugin.check
  2. Instance self.cipher_plugin.decrypt
  3. Per-handler/injected auth_plugin.check
  4. Per-handler/injected cipher_plugin.decrypt

Testing

To test, clone the repo and run python -m unittest discover -s tests. Or to run the individual tests and see the output separated by test file, instead run find tests/ -name test_*.py -print -exec python {} \;.

Currently, there are 7 unit tests and 9 e2e tests. The unit tests cover the bundled plugins and miscellaneous features. The e2e tests start a server and client, then send messages from the client to the server and receive responses; the UDP e2e test suite starts 2 nodes and treats them like a server and client to make testing a bit simpler and easier to follow, and it tests the automatic peer management system. The bundled plugins are used for the e2e tests, and authentication failure cases are also tested.

License

Copyright (c) 2025 Jonathan Voss (k98kurz)

Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee is hereby granted, provided that the above copyright notice and this permission notice appear in all copies.

THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

netaio-0.0.5.tar.gz (36.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

netaio-0.0.5-py3-none-any.whl (28.4 kB view details)

Uploaded Python 3

File details

Details for the file netaio-0.0.5.tar.gz.

File metadata

  • Download URL: netaio-0.0.5.tar.gz
  • Upload date:
  • Size: 36.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.10.12

File hashes

Hashes for netaio-0.0.5.tar.gz
Algorithm Hash digest
SHA256 da3e3f6170e8c2473269d4139f4b5ad2bf544b652596d90e9ec44dc828e48d13
MD5 bca32f80a85a4b3fb5e4d371e9fd2ef2
BLAKE2b-256 6262509dfa18874138eff77a919239b88d063afbe3992c0c53a934a635a55677

See more details on using hashes here.

File details

Details for the file netaio-0.0.5-py3-none-any.whl.

File metadata

  • Download URL: netaio-0.0.5-py3-none-any.whl
  • Upload date:
  • Size: 28.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.10.12

File hashes

Hashes for netaio-0.0.5-py3-none-any.whl
Algorithm Hash digest
SHA256 f41fc6070ca19cd652db4028272027d3ded76e9774665c625215dc3ee6c0c336
MD5 214bf541c7e0b1705a2599914c3c0bdf
BLAKE2b-256 218ea684ad8fb78a82dac07920837cf829c9c629521aeffb0afd8940cb8da1c7

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page