general purpose bidirectional packet stream connection
Project description
A general purpose bidirectional packet stream connection.
Latest release 20240630:
- New ERQ_Packet to indicate end of client requests.
- PacketConnection.startup_shutdown: send ERQ_Packet at stream shutdown.
- New PacketConnection.end_requests() method to queue an ERQ_Packet to the send queue.
- PacketConnection: send the EOF_Packet from the send worker instead of from startup_shutdown.
- Rename PacketConnection.do to PacketConnection.call.
- PacketStream: BREAKING: replace the recv and send parameters with a singe recv_send parameter.
- PacketStream: do not close supplied connection handles - this allows reuse of an underlying binary connection.
- Many many logic fixes for clean and orderly shutdown.
- Rename PacketConnection.request to PacketConnection.submit.
- New BaseRequest and HasPacketConnection helper classes to make it easy to use a PacketConnection for a protocol.
Class BaseRequest(cs.binary.AbstractBinary)
A base class for request classes to use with HasPacketConnection
.
This is a mixin aimed at *Binary
classes representing the
request payload and supplies an __init__
method which saves
the optional flags
parameter as .flags
and passes all
other parameters to the superclass' __init__
(the *Binary
superclass).
As such, it is important to define the subclass like this:
class AddRequest(
BaseRequest,
BinaryMultiValue('AddRequest', dict(hashenum=BSUInt, data=BSData)),
):
with BaseRequest
first if you wish to omit an __init__
method.
Subclasses must implement the fulfil
method to perform the
request operation.
Often a subclass will also implement the
decode_response_payload(flags,payload)
method.
This provides the result of a request, returned by
HasPacketConnection.conn_do_remote
or by the Result
returned from HasPacketConnection.conn_submit
.
The default returns the response flags
and payload
directly.
This base class subclasses AbstractBinary
to encode and
decode the request payload
and has an additions flags
attribute for the Packet.flags
. As such, subclasses have
two main routes for implemetation:
1: Subclass an existing AbstractBinary
subclass. For example,
the cs.vt.stream.ContainsRequest
looks up a hash code, and
subclasses the cs.vt.hash.HashField
class.
2: Provide a parse(bfr)
factory method and transcribe()
method to parse and transcribe the request payload
like any
other AbstractBinary
subclass.
Approach 1 does not necessarily need a distinct class;
a binary class can often be constructed in the class header.
For example, the cs.vt.stream.AddRequest
payload is an int
representing the hash class and the data to add. The class
header looks like this:
class AddRequest(
BaseRequest,
BinaryMultiValue('AddRequest', dict(hashenum=BSUInt, data=BSData)),
):
much as one might subclass a namedtuple
in other circumstances.
Method BaseRequest.decode_response_payload(self, flags: int, payload: bytes)
:
Decode a response flags
and payload
.
This default implementation returns the flags
and payload
unchanged.
Method BaseRequest.from_request_payload(flags: int, payload: bytes) -> 'BaseRequest'
:
Decode a request flags
and payload
, return a BaseRequest
instance.
This is called with the correct BaseRequest
subclass
derived from the received Packet.rq_type
.
It decodes the
This default implementation assumes that flags==0
and calls cls.from_bytes(payload)
.
Method BaseRequest.fulfil(self, context) -> Union[NoneType, int, bytes, str, Tuple[int, bytes]]
:
Fulfil this request at the receiving end of the connection using
context
, some outer object using the connection.
Raise an exception if the request cannot be fulfilled.
Return values suitable for the response:
None
: equivalent to(0,b'')
int
: returned in the flags withb''
for the payloadbytes
: returned as the payload with0
as the flagsstr
: returnencode(s,'ascii')
as the payload with0
as the flags(int,bytes)
: the flags and payload
A typical implementation looks like this:
def fulfil(self, context):
return context.come_method(params...)
where params
come from the request attributes.
Class HasPacketConnection
This is a mixin class to aid writing classes which use a
PacketConnection
to communicate with some service.
The supported request/response packet types are provided as
a mapping of int
Packet.rq_type
values to a class
implementing that request type, a subclass of BaseRequest
.
For example, a cs.vt.stream.StreamStore
subclasses HasPacketConnection
and initialises the mixin with this call:
HasPacketConnection.__init__(
self,
recv_send,
name,
{ 0: AddRequest, # add chunk, return hashcode
1: GetRequest, # get chunk from hashcode
.......
},
)
See the BaseRequest
class for details on how to implement
each request type.
Method HasPacketConnection.__init__(self, recv_send: Union[Tuple[Union[int, cs.packetstream.ReadableFile, cs.buffer.CornuCopyBuffer], Union[int, cs.packetstream.SendableFile]], Callable[[], Tuple[Union[int, cs.packetstream.ReadableFile, cs.buffer.CornuCopyBuffer], Union[int, cs.packetstream.SendableFile], Callable[[], NoneType]]]], name: str = None, *, rq_type_map: Mapping[int, cs.packetstream.BaseRequest], **packet_kw)
:
Initialise self.conn
as a PacketConnection
.
Parameters:
recv_send
: as forPacketConnection
name
: an optional name for the connectionrq_type_map
: a mapping of request types toBaseRequest
subclasses
Other keyword arguments are passed to PacketConnection()
.
Method HasPacketConnection.conn_do_remote(self, rq: cs.packetstream.BaseRequest, **submit_kw)
:
Run rq
remotely.
Raises ValueError
if the response is not ok.
Otherwise returns rq.decode_response(flags, payload)
.
Method HasPacketConnection.conn_handle_request(self, rq_type: int, flags: int, payload: bytes)
:
Handle receipt of a request packet.
Decode the packet into a request rq
and return rq.fulfil(self)
.
Method HasPacketConnection.conn_submit(self, rq: cs.packetstream.BaseRequest, *, channel=0, label=None) -> cs.result.Result
:
Submit this request to the connection, return a Result
.
Class Packet(cs.binary.SimpleBinary)
A protocol packet.
Method Packet.__str__(self)
:
pylint: disable=signature-differs
Method Packet.parse(bfr, log=None)
:
Parse a Packet
from a buffer.
Method Packet.transcribe(self)
:
Transcribe this packet.
Method Packet.write(self, file, flush=False, log=None)
:
Write the Packet
to file
.
Class PacketConnection(cs.resources.MultiOpenMixin)
A bidirectional binary connection for exchanging requests and responses.
Method PacketConnection.__init__(self, recv_send: Union[Tuple[Union[int, cs.packetstream.ReadableFile, cs.buffer.CornuCopyBuffer], Union[int, cs.packetstream.SendableFile]], Callable[[], Tuple[Union[int, cs.packetstream.ReadableFile, cs.buffer.CornuCopyBuffer], Union[int, cs.packetstream.SendableFile], Callable[[], NoneType]]]], name=None, *, request_handler=None, packet_grace=None, trace_log: Optional[Callable] = None)
:
Initialise the PacketConnection
.
Parameters:
recv_send
: specify the receive and send streamspacket_grace
: default pause in the packet sending worker to allow another packet to be queued before flushing the output stream. Default:DEFAULT_PACKET_GRACE
s. A value of0
will flush immediately if the queue is empty.request_handler
: an optional callable accepting (rq_type
,flags
,payload
). The request_handler may return one of 5 values on success:None
: response will be 0 flags and an empty payload.int
: flags only. Response will be the flags and an empty payload.bytes
: payload only. Response will be 0 flags and the payload.str
: payload only. Response will be 0 flags and the str encoded as bytes using UTF-8.(int, bytes)
: Specify flags and payload for response. An unsuccessful request should raise an exception, which will cause a failure response packet.
The recv_send
parameter is used to prepare the connection.
It may take the following forms:
- a 2-tuple of
(recv,send)
specifying the receive and send streams - an
int
specifying a single file descriptor used for both receive and send - a callable returning a 3-tuple of
(recv,send,close)
as forPacketConnection
's callable mode The(recv,send)
pair indicate the inbound and outbound binary streams.
For preexisting streams such as pipes or sockets these can be:
recv
: anything acceptable toCornuCopyBuffer.promote()
, typically a file descriptor or a binary file with.write
and.flush
methods.send
: a file descriptor or a binary file with.write
and.flush
methods.
For "on demand" use, recv
may be a callable and send
may be None
.
In this case, recv()
must return a 3-tuple of
(recv,send,shutdown)
being values for recv
and send
as above, and a shutdown function to do the necessary "close"
of the new recv
and send
. shutdown
may be None
if there is
no meaningful close operation.
The PacketConnection
's startup_shutdown
method will
call recv()
to obtain the binary streams and call the
shutdown
on completion.
This supports use for on demand connections, eg:
P = PacketConnection(connect_to_server)
......
with P:
... use P to to work ...
where connect_to_server()
might connect to some remote service.
Method PacketConnection.__call__(self, rq_type, payload=b'', flags=0, *, decode_response=None, channel=0, label=None)
:
Calling the PacketConnection
performs a synchronous request.
Submits the request, then calls the Result
returned from the request.
Method PacketConnection.join(self)
:
Wait for the send and receive workers to terminate.
Method PacketConnection.join_recv(self)
:
Wait for the end of the receive worker.
Servers should call this.
Method PacketConnection.send_eof(self)
:
Queue the magic EOF Packet
.
Method PacketConnection.send_erq(self)
:
Queue the magic end-of-requests Packet
.
Method PacketConnection.submit(self, rq_type: int, flags: int = 0, payload: bytes = b'', *, decode_response=None, channel=0, label=None) -> cs.result.Result
:
Compose and dispatch a new request, returns a Result
.
Allocates a new tag, a Result
to deliver the response, and
records the response decode function for use when the
response arrives.
Parameters:
rq_type
: request type code, anint
flags
: optional flags to accompany the request, an int; default0
.payload
: optional bytes-like object to accompany the request; defaultb''
decode_response
: optional callable accepting (response_flags, response_payload_bytes) and returning the decoded response payload value; if unspecified, the response payload bytes are usedlabel
: optional label for this request to aid debugging
The Result
will yield an (ok, flags, payload)
tuple, where:
ok
: whether the request was successfulflags
: the response flagspayload
: the response payload, decoded by decode_response if specified
Class ReadableFile(typing.Protocol)
The requirements for a file used to receive.
Method ReadableFile.__subclasshook__(other)
:
Set (or override) the protocol subclass hook.
Method ReadableFile.read(self, size: int) -> bytes
:
Read up to size
bytes.
Class RequestState(RequestState)
A state object tracking a particular request.
Method RequestState.cancel(self)
:
Cancel this request.
Method RequestState.complete(self, flags, payload)
:
Complete the request from an "ok" flags
and payload
.
Method RequestState.fail(self, flags, payload)
:
Fail the request from a "not ok" flags
and payload
.
Class SendableFile(typing.Protocol)
The requirements for a file used to send.
Method SendableFile.__subclasshook__(other)
:
Set (or override) the protocol subclass hook.
Method SendableFile.flush(self) -> None
:
Flush any buffer of written bytes.
Method SendableFile.write(self, bs: bytes) -> int
:
Write bytes, return the number of bytes written.
Release Log
Release 20240630:
- New ERQ_Packet to indicate end of client requests.
- PacketConnection.startup_shutdown: send ERQ_Packet at stream shutdown.
- New PacketConnection.end_requests() method to queue an ERQ_Packet to the send queue.
- PacketConnection: send the EOF_Packet from the send worker instead of from startup_shutdown.
- Rename PacketConnection.do to PacketConnection.call.
- PacketStream: BREAKING: replace the recv and send parameters with a singe recv_send parameter.
- PacketStream: do not close supplied connection handles - this allows reuse of an underlying binary connection.
- Many many logic fixes for clean and orderly shutdown.
- Rename PacketConnection.request to PacketConnection.submit.
- New BaseRequest and HasPacketConnection helper classes to make it easy to use a PacketConnection for a protocol.
Release 20240412:
- PacketConnection: now subclasses MultiOpenMixin, big refactor.
- PacketConnection.init: use @promote to turn the recv parameter into a CornuCopyBuffer.
- Fix a deadlock.
Release 20211208:
- Packet.eq: only test .rq_type if .is_request.
- Update tests for changes.
Release 20210306:
- Port to new cs.binary.Binary* classes.
- Some refactors and small fixes.
Release 20191004:
- PacketConnection: new optional parameter
packet_grace
to tune the send delay for additional packets before a flush, default DEFAULT_PACKET_GRACE (0.01s), 0 for no delay. - Add a crude packet level activity ticker.
Release 20190221: DISTINFO requirement updates.
Release 20181228: Initial PyPI release.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for cs.packetstream-20240630-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | e3ffdc59a038299379af673b66ea81a9acfc9fbc12735b8c4460afd430e41c2c |
|
MD5 | ea1d37caf19e02491947a5f8882ea2a2 |
|
BLAKE2b-256 | 1a850a4cc48ea1b718a93c4e9360204101f4d84161f32af77a8118db07990ee3 |