high level socket and pipe wrappers
Project description
socketwrapper
High level wrappers for socket and pipe IO, thread-safe, asyncio-native and supporting multiprocessing.
Features:
- Thread-safe within both threads and asyncio realms, and between them.
- Managed sync
recvandsendoperations with timeouts. - Native asyncio
recv_asyncandsend_asyncoperations. - Pluggable message frame and serialization protocols for stream sockets, and serialization for pipes, datagram sockets and Windows message pipes.
- Pluggable I/O (backed by supported OS descriptors).
Built-in message protocols (framing):
socketwrapper.framing.VarIntBytes: varint-headered bytes (default withframing=True).socketwrapper.framing.MultiprocessingBytes: multiprocessing.connection.Connection bytes.socketwrapper.framing.Multiprocessing: multiprocessing.connection.Connection pickled data.socketwrapper.framing.MultiprocessingPipeBytes: framing for undocumented windows-onlymultiprocessing.connection.PipeConnectionbytes, or alias ofsocketwrapper.framing.MultiprocessingBytesin other platforms.socketwrapper.framing.MultiprocessingPipe: framing for undocumented windows-onlymultiprocessing.connection.PipeConnectionpickled data, or alias ofsocketwrapper.framing.Multiprocessingin other platforms.socketwrapper.framing.MsgPack(ifmsgpackis available): unheadered msgpack data.
Limitations on Windows
🛈 Due platform limitations,
sockerwrapperhas to resort on polling for non-overlapped pipes, and also overlapped named pipes when not using asyncio.ProactorEventLoop. See Python documentation about asyncio Platform Support for details.
⚠ Due performance considerations, avoid:
- Wrapping anonymous pipes (as returned by os.pipe) whenever possible.
- Asynchronous operations on overlapped named pipes when using asyncio.SelectorEventLoop.
For your convenience, socketwrapper.pipe will create overlapped named pipes on Windows.
asyncio.ProactorEventLoop (default on Windows) works with both sockets and overlapped named pipes. Non-overlapped pipes will use polling.
asyncio.SelectorEventLoop only natively supports wrapping sockets when manually configured as event loop, all pipes will use polling.
Motivation
- I just wanted to send/recv from sockets (and pipes) for IPC without having to dig up 40 years worth of quirks, with and without asyncio.
- There aren't a ton of high level asyncio socket wrappers out there providing all we need for IPC: header/payload message logic and support for both sockets and pipes.
- Most implementations got either hardcoded messaging protocols or require a fixed-size header, or just hardcode their own socket initialization like asyncio.streams.
- No implementation was thread-safe between both asyncio and threading realms.
- No implementation directly supported wrapping multiprocessing Connections into an asyncio-native interface.
Installation
uv pip install socketwrapper
Or with optional msgpack support.
uv pip install 'socketwrapper[msgpack]'
Changelog
0.1.0 - 2026.01.22
Breaking
- Type
SendPayloadis no longer exposed. SizedBufferprotocol is removed, as it was causing typing annoyances.SocketLikeprotocol now requiresrecv_into,get_inheritableandset_inheritableimplementations.
Features
- Added support for Windows sockets and pipes.
- Added support for datagram sockets.
- Added support for blind
recvoperations without explicit size (omitted orNone), returning the first chunk of data for stream sockets, or a whole datagram for datagram sockets whatever its size. - Added
asyncioevent loop feature detection with multiple strategies. - Added
StreamEOFErrorexception (inheriting and replacing EOFError), with adataattribute exposing partial result. - Added
StreamTimeoutErrorexception (inheriting and replacing TimeoutError), with adataattribute exposing partial result. - Added
StreamCancelledError: exception (inheriting and replacing asyncio.CancelledError), withdataattribute exposing partial result.- For Python 3.12 this is an alias of asyncio.CancelledError due cpython#113848.
Changes
- For datagram socket-likes (
protocols.ExtendedSocketLike.typedefined as anything other thansocket.SOCK_STREAM):- Protocol
protocols.SocketLike.recvrequires handing newflags=socket.MSG_PEEKparameter. - Protocol
protocols.SocketLike.recv_intorequires handing both newbufsize=0andflags=socket.MSG_PEEKparameters.
- Protocol
- Helper
socketwrapper.pipecreates overlapped named pipes on Windows instead ofos.pipeanonymous pipes to support asynchronous IO. - Improved compatibility for third party asyncio event loops (uvloop tested).
- SocketWrappers are now serializable by multiprocessing.
- Optimize
framing.MsgPackheadless stream read operation logic.
Bugfixes
SocketWriter.sendandSocketWriter.send_asyncnow return its written byte count.- Synchronous recv zero timeout
recv(..., timeout=0)is now handled consistently. socketwrapper.socketpairnow always returns full duplex socketwrappers.
0.0.3 - 2025.11.08
Features
- Optional protocol
protocols.ExtendedSocketLike.recv_into. recvoperations can now usesock.recv_into(if available) to reduce memory allocation overhead.
0.0.2 - 2025.11.05
Breaking
MessageFraming.framesandMessageFraming.loadsnow receiveio.BytesIOinstead ofbytearray.
Features
- Optional msgpack support (
msgpackextra).
Changes
SizedBuffertype is now exposed and deprecateSendPayload(removal expected in0.1.0).SocketWriter.sendandSocketWriter.send_asyncparameterdatatype is nowSizedBuffer.
Bugfixes
- Busy reads no longer blocking asyncio event loop.
Documentation
None other than this README, life's too short and I'm too busy with real life, if you need better documentation consider donating to my ko-fi stating that as a tip message, check out how my docs look like at mstache docs and uactor docs.
Puggable I/O: SocketLike protocol
The protocols.SocketLike protocol, a small subset the socket interface, is all what's required for any object to be wrap-able by socketwrapper.
@typing.runtime_checkable
class SocketLike(typing.Protocol):
"""Protocol for socket-like objects accepted by socketwrapper socket classes."""
def close(self) -> None: ...
def fileno(self) -> int: ...
def recv(self, bufsize: int, flags: int = 0, /) -> bytes: ...
def recv_into(self, buffer: collections.abc.Buffer, nbytes: int = 0, flags: int = 0, /) -> int: ...
def send(self, data: collections.abc.Buffer, /) -> int: ...
def settimeout(self, timeout: float | None, /) -> None: ...
def get_inheritable(self) -> bool: ...
def set_inheritable(self, inheritable: bool, /) -> None: ...
Special attention to:
- fileno has to be a valid OS file descriptor.
- settimeout must support
settimeout(.0)(non-blocking semantics), raising ValueError for any other value will be handled, relying on selectors.DefaultSelector for synchronous operations. - Unless
SocketLike.typeis socket.SOCK_STREAM (or missing), recv and recv_intoflagsmust supportsocket.MSG_PEEK.
Usage
Simple IPC with pipe
import os
import socketwrapper
with socketwrapper.pipe(framing=True) as (parent_reader, child_writer):
child_writer.inheritable = True
child_pid = os.fork() # replace with your own process fork/spawn logic
child_writer.inheritable = False # important, prevent socket leaks!
if child_pid:
print(f'Message {parent_reader.recv()!r} received')
else:
child_writer.send(b'Hello world!')
Message b'Hello world!' received
Simple IPC with pipe using msgpack
import os
import socketwrapper
import socketwrapper.framing as framing
with socketwrapper.pipe(framing=framing.MsgPack()) as (parent_reader, child_writer):
child_writer.inheritable = True
child_pid = os.fork() # replace with your own process fork/spawn logic
child_writer.inheritable = False # important, prevent socket leaks!
if child_pid:
print(f'Message {parent_reader.recv()!r} received')
else:
child_writer.send({'data': b'Hello world!'})
Message {'data': b'Hello world!'} received
Simple asyncio IPC with pipe
import asyncio
import os
import socketwrapper
async def parent(readable: socketwrapper.MessageReader) -> None:
print(f'Message {await readable.recv_async()!r} received')
async def child(writable: socketwrapper.MessageWriter) -> None:
await writable.send_async(b'Hello world!')
with socketwrapper.pipe(framing=True) as (parent_reader, child_writer):
child_writer.inheritable = True
child_pid = os.fork() # replace with your own process fork/spawn logic
child_writer.inheritable = False # important, prevent socket leaks!
asyncio.run(parent(parent_reader) if child_pid else child(child_writer))
Message b'Hello world!' received
Simple bidirectional IPC with socketpair
import os
import socketwrapper
with socketwrapper.socketpair(framing=True) as (parent_duplex, child_duplex):
child_duplex.inheritable = True
child_pid = os.fork() # replace with your own process fork/spawn logic
child_duplex.inheritable = False # important, prevent socket leaks!
if child_pid:
parent_duplex.send(b'Hello child!')
print(f'Message {parent_duplex.recv()!r} received in parent')
else:
print(f'Message {child_duplex.recv()!r} received in child')
child_duplex.send(b'Hello parent!')
Message b'Hello child!' received in child
Message b'Hello parent!' received in parent
Socketwrapper with multiprocessing.Pipe and asyncio
import asyncio
import multiprocessing
import multiprocessing.connection
import socketwrapper
import socketwrapper.framing
def child(conn: multiprocessing.connection.Connection) -> None:
async def main() -> None:
with socketwrapper.MessageDuplex(conn, framing=socketwrapper.framing.MultiprocessingBytes) as child_duplex:
print(f'Message {await child_duplex.recv_async()!r} received in child')
await child_duplex.send_async(b'Hello parent!')
asyncio.run(main())
if __name__ == '__main__':
parent_conn, child_conn = multiprocessing.Pipe()
with parent_conn, child_conn:
child_process = multiprocessing.Process(target=child, args=(child_conn,))
child_process.start()
parent_conn.send_bytes(b'Hello child!')
print(f'Message {parent_conn.recv_bytes()!r} received in parent')
child_process.join(1)
Message b'Hello child!' received in child
Message b'Hello parent!' received in parent
Socketwrapper for cross-interpreter communication
import concurrent.futures
import socketwrapper
def child(child_fileno: int) -> None:
child_writer = socketwrapper.MessageWriter(child_fileno)
child_writer.send(b'Hello World')
if __name__ == '__main__':
with (socketwrapper.pipe(framing=True) as (parent_reader, child_writer),
concurrent.futures.InterpreterPoolExecutor() as pool):
pool.submit(child, child_writer.fileno())
print(f'Message {parent_reader.recv()!r} received')
Message b'Hello World' received
Custom socketwrapper framing with progress
import collections.abc
import io
import itertools
import os
import socketwrapper
import socketwrapper.framing
def progress(arrow: str, size: int, min_chunk: int = 1024) -> collections.abc.Generator[int, None, None]:
part_size = max(min_chunk, size // 100)
full_parts, last_size = divmod(size, part_size)
percent = 100 / (full_parts + 1 if last_size else full_parts)
for i in range(full_parts):
print(f'{arrow} {i * percent:6.2f}%')
yield part_size
if last_size:
print(f'{arrow} {full_parts * percent:6.2f}%')
yield last_size
print(f'{arrow} 100%')
class ProgressFraming(socketwrapper.framing.VarIntBytes):
@classmethod
def frames(cls, buffer: io.BytesIO) -> collections.abc.Generator[int, None, None]:
frames = super().frames(buffer)
yield from itertools.islice(frames, 2)
yield from progress('>', next(frames))
@classmethod
def dumps(cls, data: bytes) -> collections.abc.Generator[memoryview, None, None]:
buffer = memoryview(b''.join(super().dumps(data)))
for size in progress('<', len(buffer)):
chunk, buffer = buffer[:size], buffer[size:]
yield chunk
with socketwrapper.socketpair(framing=ProgressFraming) as (parent_duplex, child_duplex):
child_duplex.inheritable = True
child_pid = os.fork() # replace with your own multiprocessing fork logic
child_duplex.inheritable = False # important, prevent socket leaks!
if child_pid:
payload = os.urandom(1024) * 1024
print(f'Sending {len(payload)} bytes!')
parent_duplex.send(payload)
else:
print(f'Received {len(child_duplex.recv())} bytes!')
Sending 1048576 bytes!
< 0.00%
< 0.99%
< 1.98%
< 2.97%
...
> 0.99%
< 13.86%
> 1.98%
< 14.85%
...
> 91.09%
< 99.01%
> 92.08%
< 100%
...
> 98.02%
> 99.01%
> 100%
Received 1048576 bytes!
Connection handling (socket.accept)
The connection concept is purposely left out of this library (may change in the future), reasoning:
- Already existing methods for accepting connections: socket.accept and loop.sock_accept, both simple and standard.
socketwrappercan be already used to wrap the connected socket (thread safe and way simpler than asyncio Streaming Protocols).- Server sockets aren't usually shared outside the connection loop between sync and async contexts (unnecessary
CrossLock).
So just wrap accepted connection sockets with socketwrapper.SocketDuplex.
import asyncio
import socket
import socketwrapper
async def listen(server: socket.socket) -> None:
loop = asyncio.get_running_loop()
while True:
sock, addr = await loop.sock_accept(server)
with socketwrapper.SocketDuplex(sock) as sock:
await sock.send_async(b'message')
async def main() -> None:
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('localhost', 8080))
server.listen(2)
server.setblocking(False) # asyncio requires a non-blocking server
listener = asyncio.create_task(listen(server))
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('localhost', 8080))
with socketwrapper.SocketDuplex(client) as client:
print(await client.recv_async())
listener.cancel() # stop connection handler
await asyncio.gather(listener, return_exceptions=True)
server.close()
if __name__ == '__main__':
asyncio.run(main())
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file socketwrapper-0.1.0.tar.gz.
File metadata
- Download URL: socketwrapper-0.1.0.tar.gz
- Upload date:
- Size: 34.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.26 {"installer":{"name":"uv","version":"0.9.26","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"EndeavourOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d0752eb53409979914e7d21cca71dc896f1943e015c8f960bcc08077c47ee925
|
|
| MD5 |
9669a47e7a5a5fc9444f168462b7168c
|
|
| BLAKE2b-256 |
9527410cf5e8909a7159b919ab3eb627a622a4a1f9b5ad8eb597827498cc1ff3
|
File details
Details for the file socketwrapper-0.1.0-py3-none-any.whl.
File metadata
- Download URL: socketwrapper-0.1.0-py3-none-any.whl
- Upload date:
- Size: 31.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.26 {"installer":{"name":"uv","version":"0.9.26","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"EndeavourOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e0add7d944648eed97ec6c028af46fefaa7061e566ed46528b65847a218acc5d
|
|
| MD5 |
7267f174ac56fe3e105553fc09fdc1aa
|
|
| BLAKE2b-256 |
a8d3ad3c51505fc8ff8b8d81e67ff378d4a3fcea95572b8cb9663594b94ee9d7
|