Skip to main content

high level socket and pipe wrappers

Project description

socketwrapper

High level wrappers for socket and pipe IO, thread-safe, asyncio-native and supporting multiprocessing.

Features:

  • Thread-safe within both threads and asyncio realms, and between them.
  • Managed sync recv and send operations with timeouts.
  • Native asyncio recv_async and send_async operations.
  • Pluggable message frame and serialization protocols for stream sockets, and serialization for pipes, datagram sockets and Windows message pipes.
  • Pluggable I/O (backed by supported OS descriptors).

Built-in message protocols (framing):

  • socketwrapper.framing.VarIntBytes: varint-headered bytes (default with framing=True).
  • socketwrapper.framing.MultiprocessingBytes: multiprocessing.connection.Connection bytes.
  • socketwrapper.framing.Multiprocessing: multiprocessing.connection.Connection pickled data.
  • socketwrapper.framing.MultiprocessingPipeBytes: framing for undocumented windows-only multiprocessing.connection.PipeConnection bytes, or alias of socketwrapper.framing.MultiprocessingBytes in other platforms.
  • socketwrapper.framing.MultiprocessingPipe: framing for undocumented windows-only multiprocessing.connection.PipeConnection pickled data, or alias of socketwrapper.framing.Multiprocessing in other platforms.
  • socketwrapper.framing.MsgPack (if msgpack is available): unheadered msgpack data.

Limitations on Windows

🛈 Due platform limitations, sockerwrapper has to resort on polling for non-overlapped pipes, and also overlapped named pipes when not using asyncio.ProactorEventLoop. See Python documentation about asyncio Platform Support for details.

⚠ Due performance considerations, avoid:

For your convenience, socketwrapper.pipe will create overlapped named pipes on Windows.

asyncio.ProactorEventLoop (default on Windows) works with both sockets and overlapped named pipes. Non-overlapped pipes will use polling.

asyncio.SelectorEventLoop only natively supports wrapping sockets when manually configured as event loop, all pipes will use polling.

Motivation

  • I just wanted to send/recv from sockets (and pipes) for IPC without having to dig up 40 years worth of quirks, with and without asyncio.
  • There aren't a ton of high level asyncio socket wrappers out there providing all we need for IPC: header/payload message logic and support for both sockets and pipes.
  • Most implementations got either hardcoded messaging protocols or require a fixed-size header, or just hardcode their own socket initialization like asyncio.streams.
  • No implementation was thread-safe between both asyncio and threading realms.
  • No implementation directly supported wrapping multiprocessing Connections into an asyncio-native interface.

Installation

uv pip install socketwrapper

Or with optional msgpack support.

uv pip install 'socketwrapper[msgpack]'

Changelog

0.1.0 - 2026.01.22

Breaking

  • Type SendPayload is no longer exposed.
  • SizedBuffer protocol is removed, as it was causing typing annoyances.
  • SocketLike protocol now requires recv_into, get_inheritable and set_inheritable implementations.

Features

  • Added support for Windows sockets and pipes.
  • Added support for datagram sockets.
  • Added support for blind recv operations without explicit size (omitted or None), returning the first chunk of data for stream sockets, or a whole datagram for datagram sockets whatever its size.
  • Added asyncio event loop feature detection with multiple strategies.
  • Added StreamEOFError exception (inheriting and replacing EOFError), with a data attribute exposing partial result.
  • Added StreamTimeoutError exception (inheriting and replacing TimeoutError), with a data attribute exposing partial result.
  • Added StreamCancelledError: exception (inheriting and replacing asyncio.CancelledError), with data attribute exposing partial result.

Changes

  • For datagram socket-likes (protocols.ExtendedSocketLike.type defined as anything other than socket.SOCK_STREAM):
    • Protocol protocols.SocketLike.recv requires handing new flags=socket.MSG_PEEK parameter.
    • Protocol protocols.SocketLike.recv_into requires handing both new bufsize=0 and flags=socket.MSG_PEEK parameters.
  • Helper socketwrapper.pipecreates overlapped named pipes on Windows instead of os.pipe anonymous pipes to support asynchronous IO.
  • Improved compatibility for third party asyncio event loops (uvloop tested).
  • SocketWrappers are now serializable by multiprocessing.
  • Optimize framing.MsgPack headless stream read operation logic.

Bugfixes

  • SocketWriter.send and SocketWriter.send_async now return its written byte count.
  • Synchronous recv zero timeout recv(..., timeout=0) is now handled consistently.
  • socketwrapper.socketpair now always returns full duplex socketwrappers.

0.0.3 - 2025.11.08

Features

  • Optional protocol protocols.ExtendedSocketLike.recv_into.
  • recv operations can now use sock.recv_into (if available) to reduce memory allocation overhead.

0.0.2 - 2025.11.05

Breaking

  • MessageFraming.frames and MessageFraming.loads now receive io.BytesIO instead of bytearray.

Features

  • Optional msgpack support (msgpack extra).

Changes

  • SizedBuffer type is now exposed and deprecate SendPayload (removal expected in 0.1.0).
  • SocketWriter.send and SocketWriter.send_async parameter data type is now SizedBuffer.

Bugfixes

  • Busy reads no longer blocking asyncio event loop.

Documentation

None other than this README, life's too short and I'm too busy with real life, if you need better documentation consider donating to my ko-fi stating that as a tip message, check out how my docs look like at mstache docs and uactor docs.

Puggable I/O: SocketLike protocol

The protocols.SocketLike protocol, a small subset the socket interface, is all what's required for any object to be wrap-able by socketwrapper.

@typing.runtime_checkable
class SocketLike(typing.Protocol):
    """Protocol for socket-like objects accepted by socketwrapper socket classes."""

    def close(self) -> None: ...
    def fileno(self) -> int: ...
    def recv(self, bufsize: int, flags: int = 0, /) -> bytes: ...
    def recv_into(self, buffer: collections.abc.Buffer, nbytes: int = 0, flags: int = 0, /) -> int: ...
    def send(self, data: collections.abc.Buffer, /) -> int: ...
    def settimeout(self, timeout: float | None, /) -> None: ...
    def get_inheritable(self) -> bool: ...
    def set_inheritable(self, inheritable: bool, /) -> None: ...

Special attention to:

Usage

Simple IPC with pipe

import os
import socketwrapper

with socketwrapper.pipe(framing=True) as (parent_reader, child_writer):
    child_writer.inheritable = True
    child_pid = os.fork()  # replace with your own process fork/spawn logic
    child_writer.inheritable = False  # important, prevent socket leaks!

    if child_pid:
        print(f'Message {parent_reader.recv()!r} received')
    else:
        child_writer.send(b'Hello world!')
Message b'Hello world!' received

Simple IPC with pipe using msgpack

import os
import socketwrapper
import socketwrapper.framing as framing

with socketwrapper.pipe(framing=framing.MsgPack()) as (parent_reader, child_writer):
    child_writer.inheritable = True
    child_pid = os.fork()  # replace with your own process fork/spawn logic
    child_writer.inheritable = False  # important, prevent socket leaks!

    if child_pid:
        print(f'Message {parent_reader.recv()!r} received')
    else:
        child_writer.send({'data': b'Hello world!'})
Message {'data': b'Hello world!'} received

Simple asyncio IPC with pipe

import asyncio
import os
import socketwrapper

async def parent(readable: socketwrapper.MessageReader) -> None:
    print(f'Message {await readable.recv_async()!r} received')

async def child(writable: socketwrapper.MessageWriter) -> None:
    await writable.send_async(b'Hello world!')

with socketwrapper.pipe(framing=True) as (parent_reader, child_writer):
    child_writer.inheritable = True
    child_pid = os.fork()  # replace with your own process fork/spawn logic
    child_writer.inheritable = False  # important, prevent socket leaks!

    asyncio.run(parent(parent_reader) if child_pid else child(child_writer))
Message b'Hello world!' received

Simple bidirectional IPC with socketpair

import os
import socketwrapper

with socketwrapper.socketpair(framing=True) as (parent_duplex, child_duplex):
    child_duplex.inheritable = True
    child_pid = os.fork()  # replace with your own process fork/spawn logic
    child_duplex.inheritable = False  # important, prevent socket leaks!

    if child_pid:
        parent_duplex.send(b'Hello child!')
        print(f'Message {parent_duplex.recv()!r} received in parent')

    else:
        print(f'Message {child_duplex.recv()!r} received in child')
        child_duplex.send(b'Hello parent!')
Message b'Hello child!' received in child
Message b'Hello parent!' received in parent

Socketwrapper with multiprocessing.Pipe and asyncio

import asyncio
import multiprocessing
import multiprocessing.connection
import socketwrapper
import socketwrapper.framing

def child(conn: multiprocessing.connection.Connection) -> None:

    async def main() -> None:
        with socketwrapper.MessageDuplex(conn, framing=socketwrapper.framing.MultiprocessingBytes) as child_duplex:
            print(f'Message {await child_duplex.recv_async()!r} received in child')
            await child_duplex.send_async(b'Hello parent!')

    asyncio.run(main())

if __name__ == '__main__':
    parent_conn, child_conn = multiprocessing.Pipe()
    with parent_conn, child_conn:
        child_process = multiprocessing.Process(target=child, args=(child_conn,))
        child_process.start()

        parent_conn.send_bytes(b'Hello child!')
        print(f'Message {parent_conn.recv_bytes()!r} received in parent')
        child_process.join(1)
Message b'Hello child!' received in child
Message b'Hello parent!' received in parent

Socketwrapper for cross-interpreter communication

import concurrent.futures
import socketwrapper

def child(child_fileno: int) -> None:
    child_writer = socketwrapper.MessageWriter(child_fileno)
    child_writer.send(b'Hello World')

if __name__ == '__main__':
    with (socketwrapper.pipe(framing=True) as (parent_reader, child_writer),
          concurrent.futures.InterpreterPoolExecutor() as pool):
        pool.submit(child, child_writer.fileno())
        print(f'Message {parent_reader.recv()!r} received')
Message b'Hello World' received

Custom socketwrapper framing with progress

import collections.abc
import io
import itertools
import os
import socketwrapper
import socketwrapper.framing

def progress(arrow: str, size: int, min_chunk: int = 1024) -> collections.abc.Generator[int, None, None]:
    part_size = max(min_chunk, size // 100)
    full_parts, last_size = divmod(size, part_size)
    percent = 100 / (full_parts + 1 if last_size else full_parts)

    for i in range(full_parts):
        print(f'{arrow} {i * percent:6.2f}%')
        yield part_size

    if last_size:
        print(f'{arrow} {full_parts * percent:6.2f}%')
        yield last_size

    print(f'{arrow} 100%')

class ProgressFraming(socketwrapper.framing.VarIntBytes):

    @classmethod
    def frames(cls, buffer: io.BytesIO) -> collections.abc.Generator[int, None, None]:
        frames = super().frames(buffer)
        yield from itertools.islice(frames, 2)
        yield from progress('>', next(frames))

    @classmethod
    def dumps(cls, data: bytes) -> collections.abc.Generator[memoryview, None, None]:
        buffer = memoryview(b''.join(super().dumps(data)))
        for size in progress('<', len(buffer)):
            chunk, buffer = buffer[:size], buffer[size:]
            yield chunk

with socketwrapper.socketpair(framing=ProgressFraming) as (parent_duplex, child_duplex):
    child_duplex.inheritable = True
    child_pid = os.fork()  # replace with your own multiprocessing fork logic
    child_duplex.inheritable = False  # important, prevent socket leaks!

    if child_pid:
        payload = os.urandom(1024) * 1024
        print(f'Sending {len(payload)} bytes!')
        parent_duplex.send(payload)
    else:
        print(f'Received {len(child_duplex.recv())} bytes!')
Sending 1048576 bytes!
<   0.00%
<   0.99%
<   1.98%
<   2.97%
...
>   0.99%
<  13.86%
>   1.98%
<  14.85%
...
>  91.09%
<  99.01%
>  92.08%
< 100%
...
>  98.02%
>  99.01%
> 100%
Received 1048576 bytes!

Connection handling (socket.accept)

The connection concept is purposely left out of this library (may change in the future), reasoning:

  • Already existing methods for accepting connections: socket.accept and loop.sock_accept, both simple and standard.
  • socketwrapper can be already used to wrap the connected socket (thread safe and way simpler than asyncio Streaming Protocols).
  • Server sockets aren't usually shared outside the connection loop between sync and async contexts (unnecessary CrossLock).

So just wrap accepted connection sockets with socketwrapper.SocketDuplex.

import asyncio
import socket
import socketwrapper

async def listen(server: socket.socket) -> None:
    loop = asyncio.get_running_loop()
    while True:
        sock, addr = await loop.sock_accept(server)
        with socketwrapper.SocketDuplex(sock) as sock:
            await sock.send_async(b'message')

async def main() -> None:
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind(('localhost', 8080))
    server.listen(2)
    server.setblocking(False)  # asyncio requires a non-blocking server
    listener = asyncio.create_task(listen(server))

    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client.connect(('localhost', 8080))
    with socketwrapper.SocketDuplex(client) as client:
        print(await client.recv_async())

    listener.cancel()  # stop connection handler
    await asyncio.gather(listener, return_exceptions=True)
    server.close()

if __name__ == '__main__':
    asyncio.run(main())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

socketwrapper-0.1.0.tar.gz (34.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

socketwrapper-0.1.0-py3-none-any.whl (31.3 kB view details)

Uploaded Python 3

File details

Details for the file socketwrapper-0.1.0.tar.gz.

File metadata

  • Download URL: socketwrapper-0.1.0.tar.gz
  • Upload date:
  • Size: 34.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.26 {"installer":{"name":"uv","version":"0.9.26","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"EndeavourOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for socketwrapper-0.1.0.tar.gz
Algorithm Hash digest
SHA256 d0752eb53409979914e7d21cca71dc896f1943e015c8f960bcc08077c47ee925
MD5 9669a47e7a5a5fc9444f168462b7168c
BLAKE2b-256 9527410cf5e8909a7159b919ab3eb627a622a4a1f9b5ad8eb597827498cc1ff3

See more details on using hashes here.

File details

Details for the file socketwrapper-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: socketwrapper-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 31.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.26 {"installer":{"name":"uv","version":"0.9.26","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"EndeavourOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for socketwrapper-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 e0add7d944648eed97ec6c028af46fefaa7061e566ed46528b65847a218acc5d
MD5 7267f174ac56fe3e105553fc09fdc1aa
BLAKE2b-256 a8d3ad3c51505fc8ff8b8d81e67ff378d4a3fcea95572b8cb9663594b94ee9d7

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page