Skip to main content

high level socket and pipe wrapper

Project description

socketwrapper

This package provides high level wrappers for sockets and pipes:

  • Thread-safe within both and between threads and asyncio realms.
  • Managed sync recv and send operations with timeouts.
  • Native asyncio recv_async and send_async operations.
  • Pluggable message protocol (headered variable length data) supporting header parsing, serialization and deserialization.
  • Pluggable I/O protocol (file descriptor still required due asyncio).

Motivation

There aren't a ton of high level asyncio socket wrappers out there providing all we need for IPC: header/payload logic and support for both sockets and pipes.

Most implementations either don't have pluggable messaging protocols or it doesn't support variable-size headers.

No other implementation is thread-safe across both asyncio and threading realms.

No other implementation directly supports wrapping multiprocessing Connections.

Installation

uv pip install socketwrapper

Documentation

None other than this README, life's too short and I'm too busy with real life, if you need better documentation consider donating to my ko-fi stating that as the tip message, check out how my docs look like at mstache docs and uactor docs.

Puggable I/O: SocketLike protocol

The socketwrapper.SocketLike protocol, an small subset the socket interface, is all what's required for any object to be wrap-able by socketwrapper.

@typing.runtime_checkable
class SocketLike(typing.Protocol):
    """Protocol for socket-like objects accepted by socketwrapper socket classes."""

    def fileno(self) -> int: ...
    def send(self, data: collections.abc.Buffer, /) -> int: ...
    def recv(self, bufsize: int, /) -> bytes: ...
    def settimeout(self, timeout: float | None, /) -> None: ...
    def close(self) -> None: ...

Special attention to:

Usage

Simple IPC with pipe

import os
import socketwrapper

with socketwrapper.pipe(framing=True) as (parent_reader, child_writer):
    child_writer.inheritable = True
    child_pid = os.fork()  # replace with your own process fork/spawn logic
    child_writer.inheritable = False  # important, prevent socket leaks!

    if child_pid:
        print(f'Message {parent_reader.recv()!r} received')
    else:
        child_writer.send(b'Hello world!')
Message b'Hello world!' received

Simple asyncio IPC with pipe

import asyncio
import os
import socketwrapper

async def parent(readable: socketwrapper.MessageReader) -> None:
    print(f'Message {await readable.recv_async()!r} received')

async def child(writable: socketwrapper.MessageWriter) -> None:
    await writable.send_async(b'Hello world!')

with socketwrapper.pipe(framing=True) as (parent_reader, child_writer):
    child_writer.inheritable = True
    child_pid = os.fork()  # replace with your own process fork/spawn logic
    child_writer.inheritable = False  # important, prevent socket leaks!

    asyncio.run(parent(parent_reader) if child_pid else child(child_writer))
Message b'Hello world!' received

Simple bidirectional IPC with socketpair

import os
import socketwrapper

with socketwrapper.socketpair(framing=True) as (parent_duplex, child_duplex):
    child_duplex.inheritable = True
    child_pid = os.fork()  # replace with your own process fork/spawn logic
    child_duplex.inheritable = False  # important, prevent socket leaks!

    if child_pid:
        parent_duplex.send(b'Hello child!')
        print(f'Message {parent_duplex.recv()!r} received in parent')

    else:
        print(f'Message {child_duplex.recv()!r} received in child')
        child_duplex.send(b'Hello parent!')
Message b'Hello child!' received in child
Message b'Hello parent!' received in parent

Socketwrapper with multiprocessing.Pipe and asyncio

import asyncio
import multiprocessing
import multiprocessing.connection
import socketwrapper
import socketwrapper.framing

def child(conn: multiprocessing.connection.Connection) -> None:

    async def main() -> None:
        with socketwrapper.MessageDuplex(conn, framing=socketwrapper.framing.MultiprocessingBytes) as child_duplex:
            print(f'Message {await child_duplex.recv_async()!r} received in child')
            await child_duplex.send_async(b'Hello parent!')

    asyncio.run(main())

parent_conn, child_conn = multiprocessing.Pipe()
with parent_conn, child_conn:
    child_process = multiprocessing.Process(target=child, args=(child_conn,))
    child_process.start()

    parent_conn.send_bytes(b'Hello child!')
    print(f'Message {parent_conn.recv_bytes()!r} received in parent')
    child_process.join(1)
Message b'Hello child!' received in child
Message b'Hello parent!' received in parent

Custom socketwrapper framing with progress

import collections.abc
import itertools
import os
import socketwrapper
import socketwrapper.framing

def progress(arrow: str, size: int, min_chunk: int = 1024) -> collections.abc.Generator[int, None, None]:
    part_size = max(min_chunk, size // 100)
    full_parts, last_size = divmod(size, part_size)
    percent = 100 / (full_parts + 1 if last_size else full_parts)

    for i in range(full_parts):
        print(f'{arrow} {i * percent:6.2f}%')
        yield part_size

    if last_size:
        print(f'{arrow} {full_parts * percent:6.2f}%')
        yield last_size

    print(f'{arrow} 100%')

class ProgressFraming(socketwrapper.framing.VarIntBytes):

    @classmethod
    def frames(cls, buffer: bytearray) -> collections.abc.Generator[int, None, None]:
        frames = super().frames(buffer)
        yield from itertools.islice(frames, 2)
        yield from progress('>', next(frames))

    @classmethod
    def dumps(cls, data: bytes) -> collections.abc.Generator[memoryview, None, None]:
        buffer = memoryview(b''.join(super().dumps(data)))
        for size in progress('<', len(buffer)):
            chunk, buffer = buffer[:size], buffer[size:]
            yield chunk

with socketwrapper.socketpair(framing=ProgressFraming) as (parent_duplex, child_duplex):
    child_duplex.inheritable = True
    child_pid = os.fork()  # replace with your own multiprocessing fork logic
    child_duplex.inheritable = False  # important, prevent socket leaks!

    if child_pid:
        payload = os.urandom(1024) * 1024
        print(f'Sending {len(payload)} bytes!')
        parent_duplex.send(payload)
    else:
        print(f'Received {len(child_duplex.recv())} bytes!')
Sending 1048576 bytes!
<   0.00%
<   0.99%
<   1.98%
<   2.97%
...
>   0.99%
<  13.86%
>   1.98%
<  14.85%
...
>  91.09%
<  99.01%
>  92.08%
< 100%
...
>  98.02%
>  99.01%
> 100%
Received 1048576 bytes!

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

socketwrapper-0.0.1.tar.gz (17.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

socketwrapper-0.0.1-py3-none-any.whl (15.0 kB view details)

Uploaded Python 3

File details

Details for the file socketwrapper-0.0.1.tar.gz.

File metadata

  • Download URL: socketwrapper-0.0.1.tar.gz
  • Upload date:
  • Size: 17.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for socketwrapper-0.0.1.tar.gz
Algorithm Hash digest
SHA256 a5c7691d8c168420bf1a43dd8ffb83623e8afcf5493bb3c5b0709459955e7314
MD5 820a71e08817adf718c733604e42970b
BLAKE2b-256 03f669b5d5426f1428e59c3cb03a7eb083650f6f6cc33aee09ca0b6f4cfdb73f

See more details on using hashes here.

File details

Details for the file socketwrapper-0.0.1-py3-none-any.whl.

File metadata

  • Download URL: socketwrapper-0.0.1-py3-none-any.whl
  • Upload date:
  • Size: 15.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for socketwrapper-0.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 3222cb79b41218183d1577f6be004fa3b34ff6be23babf92b96d797a1e7d2c48
MD5 063514a2fd9fbd02cb5b5b9091209a35
BLAKE2b-256 592f99c0c0d8f07ef061fadf3dbddbd6748a170479a0b5f78f15a6be08947e37

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page