high level socket and pipe wrapper
Project description
socketwrapper
This package provides high level wrappers for sockets and pipes:
- Thread-safe within both and between threads and asyncio realms.
- Managed sync recv and send operations with timeouts.
- Native asyncio recv_async and send_async operations.
- Pluggable message protocol (headered variable length data) supporting header parsing, serialization and deserialization.
- Pluggable I/O protocol (file descriptor still required due asyncio).
Motivation
There aren't a ton of high level asyncio socket wrappers out there providing all we need for IPC: header/payload logic and support for both sockets and pipes.
Most implementations either don't have pluggable messaging protocols or it doesn't support variable-size headers.
No other implementation is thread-safe across both asyncio and threading realms.
No other implementation directly supports wrapping multiprocessing Connections.
Installation
uv pip install socketwrapper
Documentation
None other than this README, life's too short and I'm too busy with real life, if you need better documentation consider donating to my ko-fi stating that as the tip message, check out how my docs look like at mstache docs and uactor docs.
Puggable I/O: SocketLike protocol
The socketwrapper.SocketLike protocol, an small subset the socket interface,
is all what's required for any object to be wrap-able by socketwrapper.
@typing.runtime_checkable
class SocketLike(typing.Protocol):
"""Protocol for socket-like objects accepted by socketwrapper socket classes."""
def fileno(self) -> int: ...
def send(self, data: collections.abc.Buffer, /) -> int: ...
def recv(self, bufsize: int, /) -> bytes: ...
def settimeout(self, timeout: float | None, /) -> None: ...
def close(self) -> None: ...
Special attention to:
- fileno has to be a valid OS file descriptor.
- settimeout
only requires support for
settimeout(.0)( non-blocking semantics ), raising ValueError for any other value is fully supported in which case selectors.DefaultSelector will be used for synchronous operations.
Usage
Simple IPC with pipe
import os
import socketwrapper
with socketwrapper.pipe(framing=True) as (parent_reader, child_writer):
child_writer.inheritable = True
child_pid = os.fork() # replace with your own process fork/spawn logic
child_writer.inheritable = False # important, prevent socket leaks!
if child_pid:
print(f'Message {parent_reader.recv()!r} received')
else:
child_writer.send(b'Hello world!')
Message b'Hello world!' received
Simple asyncio IPC with pipe
import asyncio
import os
import socketwrapper
async def parent(readable: socketwrapper.MessageReader) -> None:
print(f'Message {await readable.recv_async()!r} received')
async def child(writable: socketwrapper.MessageWriter) -> None:
await writable.send_async(b'Hello world!')
with socketwrapper.pipe(framing=True) as (parent_reader, child_writer):
child_writer.inheritable = True
child_pid = os.fork() # replace with your own process fork/spawn logic
child_writer.inheritable = False # important, prevent socket leaks!
asyncio.run(parent(parent_reader) if child_pid else child(child_writer))
Message b'Hello world!' received
Simple bidirectional IPC with socketpair
import os
import socketwrapper
with socketwrapper.socketpair(framing=True) as (parent_duplex, child_duplex):
child_duplex.inheritable = True
child_pid = os.fork() # replace with your own process fork/spawn logic
child_duplex.inheritable = False # important, prevent socket leaks!
if child_pid:
parent_duplex.send(b'Hello child!')
print(f'Message {parent_duplex.recv()!r} received in parent')
else:
print(f'Message {child_duplex.recv()!r} received in child')
child_duplex.send(b'Hello parent!')
Message b'Hello child!' received in child
Message b'Hello parent!' received in parent
Socketwrapper with multiprocessing.Pipe and asyncio
import asyncio
import multiprocessing
import multiprocessing.connection
import socketwrapper
import socketwrapper.framing
def child(conn: multiprocessing.connection.Connection) -> None:
async def main() -> None:
with socketwrapper.MessageDuplex(conn, framing=socketwrapper.framing.MultiprocessingBytes) as child_duplex:
print(f'Message {await child_duplex.recv_async()!r} received in child')
await child_duplex.send_async(b'Hello parent!')
asyncio.run(main())
parent_conn, child_conn = multiprocessing.Pipe()
with parent_conn, child_conn:
child_process = multiprocessing.Process(target=child, args=(child_conn,))
child_process.start()
parent_conn.send_bytes(b'Hello child!')
print(f'Message {parent_conn.recv_bytes()!r} received in parent')
child_process.join(1)
Message b'Hello child!' received in child
Message b'Hello parent!' received in parent
Custom socketwrapper framing with progress
import collections.abc
import itertools
import os
import socketwrapper
import socketwrapper.framing
def progress(arrow: str, size: int, min_chunk: int = 1024) -> collections.abc.Generator[int, None, None]:
part_size = max(min_chunk, size // 100)
full_parts, last_size = divmod(size, part_size)
percent = 100 / (full_parts + 1 if last_size else full_parts)
for i in range(full_parts):
print(f'{arrow} {i * percent:6.2f}%')
yield part_size
if last_size:
print(f'{arrow} {full_parts * percent:6.2f}%')
yield last_size
print(f'{arrow} 100%')
class ProgressFraming(socketwrapper.framing.VarIntBytes):
@classmethod
def frames(cls, buffer: bytearray) -> collections.abc.Generator[int, None, None]:
frames = super().frames(buffer)
yield from itertools.islice(frames, 2)
yield from progress('>', next(frames))
@classmethod
def dumps(cls, data: bytes) -> collections.abc.Generator[memoryview, None, None]:
buffer = memoryview(b''.join(super().dumps(data)))
for size in progress('<', len(buffer)):
chunk, buffer = buffer[:size], buffer[size:]
yield chunk
with socketwrapper.socketpair(framing=ProgressFraming) as (parent_duplex, child_duplex):
child_duplex.inheritable = True
child_pid = os.fork() # replace with your own multiprocessing fork logic
child_duplex.inheritable = False # important, prevent socket leaks!
if child_pid:
payload = os.urandom(1024) * 1024
print(f'Sending {len(payload)} bytes!')
parent_duplex.send(payload)
else:
print(f'Received {len(child_duplex.recv())} bytes!')
Sending 1048576 bytes!
< 0.00%
< 0.99%
< 1.98%
< 2.97%
...
> 0.99%
< 13.86%
> 1.98%
< 14.85%
...
> 91.09%
< 99.01%
> 92.08%
< 100%
...
> 98.02%
> 99.01%
> 100%
Received 1048576 bytes!
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file socketwrapper-0.0.1.tar.gz.
File metadata
- Download URL: socketwrapper-0.0.1.tar.gz
- Upload date:
- Size: 17.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a5c7691d8c168420bf1a43dd8ffb83623e8afcf5493bb3c5b0709459955e7314
|
|
| MD5 |
820a71e08817adf718c733604e42970b
|
|
| BLAKE2b-256 |
03f669b5d5426f1428e59c3cb03a7eb083650f6f6cc33aee09ca0b6f4cfdb73f
|
File details
Details for the file socketwrapper-0.0.1-py3-none-any.whl.
File metadata
- Download URL: socketwrapper-0.0.1-py3-none-any.whl
- Upload date:
- Size: 15.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3222cb79b41218183d1577f6be004fa3b34ff6be23babf92b96d797a1e7d2c48
|
|
| MD5 |
063514a2fd9fbd02cb5b5b9091209a35
|
|
| BLAKE2b-256 |
592f99c0c0d8f07ef061fadf3dbddbd6748a170479a0b5f78f15a6be08947e37
|