Skip to main content

high level socket and pipe wrappers

Project description

socketwrapper

This package provides high level wrappers for sockets and pipes:

  • Thread-safe within both threads and asyncio realms, and between them.
  • Managed sync recv and send operations with timeouts.
  • Native asyncio recv_async and send_async operations.
  • Pluggable message protocol (headered variable length data) supporting variable-length header parsing, payload serialization and deserialization.
  • Pluggable I/O protocol (OS file descriptor still required).

Builtin message protocols (framing):

  • socketwrapper.framing.VarIntBytes: varint-headered bytes (default with framing=True).
  • socketwrapper.framing.MultiprocessingBytes (if platform is supported): multiprocessing.connection.Connection bytes.
  • socketwrapper.framing.Multiprocessing (if platform is supported): multiprocessing.connection.Connection pickled data.
  • socketwrapper.framing.MsgPack (if msgpack is available): unheadered msgpack data stream.

Motivation

  • There aren't a ton of high level asyncio socket wrappers out there providing all we need for IPC: header/payload message logic and support for both sockets and pipes.
  • Most implementations got either hardcoded messaging protocols or require a fixed-size header.
  • No implementation was thread-safe between both asyncio and threading realms.
  • No implementation directly supported wrapping multiprocessing Connections into an asyncio-native interface.

Installation

uv pip install socketwrapper

Or with optional msgpack support.

uv pip install 'socketwrapper[msgpack]'

Changelog

0.0.3 - 2025.11.08

  • New: optional support for recv_into in socket-like objects.
  • Optimization: recv operations will now use sock.recv_into (if available) to reduce memory allocation overhead.

0.0.2 - 2025.11.05

  • Breaking: MessageFraming.frames and MessageFraming.loads now receive io.BytesIO instead of bytearray.
  • New: optional msgpack support (msgpack extra).
  • Fix: busy reads no longer blocking asyncio event loop.
  • Typing: expose SizedBuffer and deprecate SendPayload (removal expected in 0.1.0).
  • Typing: SocketWriter.send and SocketWriter.send_async data is now SizedBuffer.

Documentation

None other than this README, life's too short and I'm too busy with real life, if you need better documentation consider donating to my ko-fi stating that as a tip message, check out how my docs look like at mstache docs and uactor docs.

Puggable I/O: SocketLike protocol

The protocols.SocketLike protocol, a small subset the socket interface, is all what's required for any object to be wrap-able by socketwrapper.

Additional methods defined in protocols.OptionalSocketLike can be optionally implemented enabling optimized code paths.

@typing.runtime_checkable
class SocketLike(typing.Protocol):
    """Protocol for socket-like objects accepted by socketwrapper socket classes."""

    def fileno(self) -> int: ...
    def send(self, data: collections.abc.Buffer, /) -> int: ...
    def recv(self, bufsize: int, /) -> bytes: ...
    def settimeout(self, timeout: float | None, /) -> None: ...
    def close(self) -> None: ...


@typing.runtime_checkable
class OptionalSocketLike(SocketLike, typing.Protocol):
    """Protocol for optional socket-like object methods accepted by socketwrapper socket classes."""

    def recv_into(self, buffer: collections.abc.Buffer, /) -> bytes: ...

Special attention to:

Usage

Simple IPC with pipe

import os
import socketwrapper

with socketwrapper.pipe(framing=True) as (parent_reader, child_writer):
    child_writer.inheritable = True
    child_pid = os.fork()  # replace with your own process fork/spawn logic
    child_writer.inheritable = False  # important, prevent socket leaks!

    if child_pid:
        print(f'Message {parent_reader.recv()!r} received')
    else:
        child_writer.send(b'Hello world!')
Message b'Hello world!' received

Simple IPC with pipe using msgpack

import os
import socketwrapper
import socketwrapper.framing as framing

with socketwrapper.pipe(framing=framing.MsgPack()) as (parent_reader, child_writer):
    child_writer.inheritable = True
    child_pid = os.fork()  # replace with your own process fork/spawn logic
    child_writer.inheritable = False  # important, prevent socket leaks!

    if child_pid:
        print(f'Message {parent_reader.recv()!r} received')
    else:
        child_writer.send({'data': b'Hello world!'})
Message {'data': b'Hello world!'} received

Simple asyncio IPC with pipe

import asyncio
import os
import socketwrapper

async def parent(readable: socketwrapper.MessageReader) -> None:
    print(f'Message {await readable.recv_async()!r} received')

async def child(writable: socketwrapper.MessageWriter) -> None:
    await writable.send_async(b'Hello world!')

with socketwrapper.pipe(framing=True) as (parent_reader, child_writer):
    child_writer.inheritable = True
    child_pid = os.fork()  # replace with your own process fork/spawn logic
    child_writer.inheritable = False  # important, prevent socket leaks!

    asyncio.run(parent(parent_reader) if child_pid else child(child_writer))
Message b'Hello world!' received

Simple bidirectional IPC with socketpair

import os
import socketwrapper

with socketwrapper.socketpair(framing=True) as (parent_duplex, child_duplex):
    child_duplex.inheritable = True
    child_pid = os.fork()  # replace with your own process fork/spawn logic
    child_duplex.inheritable = False  # important, prevent socket leaks!

    if child_pid:
        parent_duplex.send(b'Hello child!')
        print(f'Message {parent_duplex.recv()!r} received in parent')

    else:
        print(f'Message {child_duplex.recv()!r} received in child')
        child_duplex.send(b'Hello parent!')
Message b'Hello child!' received in child
Message b'Hello parent!' received in parent

Socketwrapper with multiprocessing.Pipe and asyncio

import asyncio
import multiprocessing
import multiprocessing.connection
import socketwrapper
import socketwrapper.framing

def child(conn: multiprocessing.connection.Connection) -> None:

    async def main() -> None:
        with socketwrapper.MessageDuplex(conn, framing=socketwrapper.framing.MultiprocessingBytes) as child_duplex:
            print(f'Message {await child_duplex.recv_async()!r} received in child')
            await child_duplex.send_async(b'Hello parent!')

    asyncio.run(main())

if __name__ == '__main__':
    parent_conn, child_conn = multiprocessing.Pipe()
    with parent_conn, child_conn:
        child_process = multiprocessing.Process(target=child, args=(child_conn,))
        child_process.start()

        parent_conn.send_bytes(b'Hello child!')
        print(f'Message {parent_conn.recv_bytes()!r} received in parent')
        child_process.join(1)
Message b'Hello child!' received in child
Message b'Hello parent!' received in parent

Socketwrapper for cross-interpreter communication

import concurrent.futures
import socketwrapper

def child(child_fileno: int) -> None:
    child_writer = socketwrapper.MessageWriter(child_fileno)
    child_writer.send(b'Hello World')

if __name__ == '__main__':
    with (socketwrapper.pipe(framing=True) as (parent_reader, child_writer),
          concurrent.futures.InterpreterPoolExecutor() as pool):
        pool.submit(child, child_writer.fileno())
        print(f'Message {parent_reader.recv()!r} received')
Message b'Hello World' received

Custom socketwrapper framing with progress

import collections.abc
import io
import itertools
import os
import socketwrapper
import socketwrapper.framing

def progress(arrow: str, size: int, min_chunk: int = 1024) -> collections.abc.Generator[int, None, None]:
    part_size = max(min_chunk, size // 100)
    full_parts, last_size = divmod(size, part_size)
    percent = 100 / (full_parts + 1 if last_size else full_parts)

    for i in range(full_parts):
        print(f'{arrow} {i * percent:6.2f}%')
        yield part_size

    if last_size:
        print(f'{arrow} {full_parts * percent:6.2f}%')
        yield last_size

    print(f'{arrow} 100%')

class ProgressFraming(socketwrapper.framing.VarIntBytes):

    @classmethod
    def frames(cls, buffer: io.BytesIO) -> collections.abc.Generator[int, None, None]:
        frames = super().frames(buffer)
        yield from itertools.islice(frames, 2)
        yield from progress('>', next(frames))

    @classmethod
    def dumps(cls, data: bytes) -> collections.abc.Generator[memoryview, None, None]:
        buffer = memoryview(b''.join(super().dumps(data)))
        for size in progress('<', len(buffer)):
            chunk, buffer = buffer[:size], buffer[size:]
            yield chunk

with socketwrapper.socketpair(framing=ProgressFraming) as (parent_duplex, child_duplex):
    child_duplex.inheritable = True
    child_pid = os.fork()  # replace with your own multiprocessing fork logic
    child_duplex.inheritable = False  # important, prevent socket leaks!

    if child_pid:
        payload = os.urandom(1024) * 1024
        print(f'Sending {len(payload)} bytes!')
        parent_duplex.send(payload)
    else:
        print(f'Received {len(child_duplex.recv())} bytes!')
Sending 1048576 bytes!
<   0.00%
<   0.99%
<   1.98%
<   2.97%
...
>   0.99%
<  13.86%
>   1.98%
<  14.85%
...
>  91.09%
<  99.01%
>  92.08%
< 100%
...
>  98.02%
>  99.01%
> 100%
Received 1048576 bytes!

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

socketwrapper-0.0.3.tar.gz (20.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

socketwrapper-0.0.3-py3-none-any.whl (17.8 kB view details)

Uploaded Python 3

File details

Details for the file socketwrapper-0.0.3.tar.gz.

File metadata

  • Download URL: socketwrapper-0.0.3.tar.gz
  • Upload date:
  • Size: 20.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for socketwrapper-0.0.3.tar.gz
Algorithm Hash digest
SHA256 72e41bc96e0193efdee6e09cb3452c2c24564df74c469ef122896b772222e4e3
MD5 7179333dd259ca80b9e27e095cef63a2
BLAKE2b-256 580ee6c4f81ebe019ee738e3d1b27b4d1ca43b9a4ba062a370db8fc1dbb178ff

See more details on using hashes here.

File details

Details for the file socketwrapper-0.0.3-py3-none-any.whl.

File metadata

  • Download URL: socketwrapper-0.0.3-py3-none-any.whl
  • Upload date:
  • Size: 17.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for socketwrapper-0.0.3-py3-none-any.whl
Algorithm Hash digest
SHA256 8c1f84299e44057ca7a90ad5ee55735796df76c7e8b2375b754d3036826d3701
MD5 b58c6a564c58ad9ba66ce28d34ec1edd
BLAKE2b-256 db469e00a05afc696ba8cd08aec2f1c1e9afd51843c70cc265c322646b6ee80c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page