Skip to main content

general purpose bidirectional packet stream connection

Project description

A general purpose bidirectional packet stream connection.

Latest release 20240630:

  • New ERQ_Packet to indicate end of client requests.
  • PacketConnection.startup_shutdown: send ERQ_Packet at stream shutdown.
  • New PacketConnection.end_requests() method to queue an ERQ_Packet to the send queue.
  • PacketConnection: send the EOF_Packet from the send worker instead of from startup_shutdown.
  • Rename PacketConnection.do to PacketConnection.call.
  • PacketStream: BREAKING: replace the recv and send parameters with a singe recv_send parameter.
  • PacketStream: do not close supplied connection handles - this allows reuse of an underlying binary connection.
  • Many many logic fixes for clean and orderly shutdown.
  • Rename PacketConnection.request to PacketConnection.submit.
  • New BaseRequest and HasPacketConnection helper classes to make it easy to use a PacketConnection for a protocol.

Class BaseRequest(cs.binary.AbstractBinary)

A base class for request classes to use with HasPacketConnection.

This is a mixin aimed at *Binary classes representing the request payload and supplies an __init__ method which saves the optional flags parameter as .flags and passes all other parameters to the superclass' __init__ (the *Binary superclass).

As such, it is important to define the subclass like this:

class AddRequest(
    BaseRequest,
    BinaryMultiValue('AddRequest', dict(hashenum=BSUInt, data=BSData)),
):

with BaseRequest first if you wish to omit an __init__ method.

Subclasses must implement the fulfil method to perform the request operation.

Often a subclass will also implement the decode_response_payload(flags,payload) method. This provides the result of a request, returned by HasPacketConnection.conn_do_remote or by the Result returned from HasPacketConnection.conn_submit. The default returns the response flags and payload directly.

This base class subclasses AbstractBinary to encode and decode the request payload and has an additions flags attribute for the Packet.flags. As such, subclasses have two main routes for implemetation:

1: Subclass an existing AbstractBinary subclass. For example, the cs.vt.stream.ContainsRequest looks up a hash code, and subclasses the cs.vt.hash.HashField class.

2: Provide a parse(bfr) factory method and transcribe() method to parse and transcribe the request payload like any other AbstractBinary subclass.

Approach 1 does not necessarily need a distinct class; a binary class can often be constructed in the class header. For example, the cs.vt.stream.AddRequest payload is an int representing the hash class and the data to add. The class header looks like this:

class AddRequest(
    BaseRequest,
    BinaryMultiValue('AddRequest', dict(hashenum=BSUInt, data=BSData)),
):

much as one might subclass a namedtuple in other circumstances.

Method BaseRequest.decode_response_payload(self, flags: int, payload: bytes): Decode a response flags and payload.

This default implementation returns the flags and payload unchanged.

Method BaseRequest.from_request_payload(flags: int, payload: bytes) -> 'BaseRequest': Decode a request flags and payload, return a BaseRequest instance.

This is called with the correct BaseRequest subclass derived from the received Packet.rq_type. It decodes the

This default implementation assumes that flags==0 and calls cls.from_bytes(payload).

Method BaseRequest.fulfil(self, context) -> Union[NoneType, int, bytes, str, Tuple[int, bytes]]: Fulfil this request at the receiving end of the connection using context, some outer object using the connection. Raise an exception if the request cannot be fulfilled.

Return values suitable for the response:

  • None: equivalent to (0,b'')
  • int: returned in the flags with b'' for the payload
  • bytes: returned as the payload with 0 as the flags
  • str: return encode(s,'ascii') as the payload with 0 as the flags
  • (int,bytes): the flags and payload

A typical implementation looks like this:

def fulfil(self, context):
    return context.come_method(params...)

where params come from the request attributes.

Class HasPacketConnection

This is a mixin class to aid writing classes which use a PacketConnection to communicate with some service.

The supported request/response packet types are provided as a mapping of int Packet.rq_type values to a class implementing that request type, a subclass of BaseRequest.

For example, a cs.vt.stream.StreamStore subclasses HasPacketConnection and initialises the mixin with this call:

HasPacketConnection.__init__(
    self,
    recv_send,
    name,
    { 0: AddRequest,  # add chunk, return hashcode
      1: GetRequest,  # get chunk from hashcode
      .......
    },
)

See the BaseRequest class for details on how to implement each request type.

Method HasPacketConnection.__init__(self, recv_send: Union[Tuple[Union[int, cs.packetstream.ReadableFile, cs.buffer.CornuCopyBuffer], Union[int, cs.packetstream.SendableFile]], Callable[[], Tuple[Union[int, cs.packetstream.ReadableFile, cs.buffer.CornuCopyBuffer], Union[int, cs.packetstream.SendableFile], Callable[[], NoneType]]]], name: str = None, *, rq_type_map: Mapping[int, cs.packetstream.BaseRequest], **packet_kw): Initialise self.conn as a PacketConnection.

Parameters:

  • recv_send: as for PacketConnection
  • name: an optional name for the connection
  • rq_type_map: a mapping of request types to BaseRequest subclasses

Other keyword arguments are passed to PacketConnection().

Method HasPacketConnection.conn_do_remote(self, rq: cs.packetstream.BaseRequest, **submit_kw): Run rq remotely. Raises ValueError if the response is not ok. Otherwise returns rq.decode_response(flags, payload).

Method HasPacketConnection.conn_handle_request(self, rq_type: int, flags: int, payload: bytes): Handle receipt of a request packet. Decode the packet into a request rq and return rq.fulfil(self).

Method HasPacketConnection.conn_submit(self, rq: cs.packetstream.BaseRequest, *, channel=0, label=None) -> cs.result.Result: Submit this request to the connection, return a Result.

Class Packet(cs.binary.SimpleBinary)

A protocol packet.

Method Packet.__str__(self): pylint: disable=signature-differs

Method Packet.parse(bfr, log=None): Parse a Packet from a buffer.

Method Packet.transcribe(self): Transcribe this packet.

Method Packet.write(self, file, flush=False, log=None): Write the Packet to file.

Class PacketConnection(cs.resources.MultiOpenMixin)

A bidirectional binary connection for exchanging requests and responses.

Method PacketConnection.__init__(self, recv_send: Union[Tuple[Union[int, cs.packetstream.ReadableFile, cs.buffer.CornuCopyBuffer], Union[int, cs.packetstream.SendableFile]], Callable[[], Tuple[Union[int, cs.packetstream.ReadableFile, cs.buffer.CornuCopyBuffer], Union[int, cs.packetstream.SendableFile], Callable[[], NoneType]]]], name=None, *, request_handler=None, packet_grace=None, trace_log: Optional[Callable] = None): Initialise the PacketConnection.

Parameters:

  • recv_send: specify the receive and send streams
  • packet_grace: default pause in the packet sending worker to allow another packet to be queued before flushing the output stream. Default: DEFAULT_PACKET_GRACEs. A value of 0 will flush immediately if the queue is empty.
  • request_handler: an optional callable accepting (rq_type, flags, payload). The request_handler may return one of 5 values on success:
    • None: response will be 0 flags and an empty payload.
    • int: flags only. Response will be the flags and an empty payload.
    • bytes: payload only. Response will be 0 flags and the payload.
    • str: payload only. Response will be 0 flags and the str encoded as bytes using UTF-8.
    • (int, bytes): Specify flags and payload for response. An unsuccessful request should raise an exception, which will cause a failure response packet.

The recv_send parameter is used to prepare the connection. It may take the following forms:

  • a 2-tuple of (recv,send) specifying the receive and send streams
  • an int specifying a single file descriptor used for both receive and send
  • a callable returning a 3-tuple of (recv,send,close) as for PacketConnection's callable mode The (recv,send) pair indicate the inbound and outbound binary streams.

For preexisting streams such as pipes or sockets these can be:

  • recv: anything acceptable to CornuCopyBuffer.promote(), typically a file descriptor or a binary file with .write and .flush methods.
  • send: a file descriptor or a binary file with .write and .flush methods.

For "on demand" use, recv may be a callable and send may be None. In this case, recv() must return a 3-tuple of (recv,send,shutdown) being values for recv and send as above, and a shutdown function to do the necessary "close" of the new recv and send. shutdown may be None if there is no meaningful close operation. The PacketConnection's startup_shutdown method will call recv() to obtain the binary streams and call the shutdown on completion. This supports use for on demand connections, eg:

P = PacketConnection(connect_to_server)
......
with P:
    ... use P to to work ...

where connect_to_server() might connect to some remote service.

Method PacketConnection.__call__(self, rq_type, payload=b'', flags=0, *, decode_response=None, channel=0, label=None): Calling the PacketConnection performs a synchronous request. Submits the request, then calls the Result returned from the request.

Method PacketConnection.join(self): Wait for the send and receive workers to terminate.

Method PacketConnection.join_recv(self): Wait for the end of the receive worker. Servers should call this.

Method PacketConnection.send_eof(self): Queue the magic EOF Packet.

Method PacketConnection.send_erq(self): Queue the magic end-of-requests Packet.

Method PacketConnection.submit(self, rq_type: int, flags: int = 0, payload: bytes = b'', *, decode_response=None, channel=0, label=None) -> cs.result.Result: Compose and dispatch a new request, returns a Result.

Allocates a new tag, a Result to deliver the response, and records the response decode function for use when the response arrives.

Parameters:

  • rq_type: request type code, an int
  • flags: optional flags to accompany the request, an int; default 0.
  • payload: optional bytes-like object to accompany the request; default b''
  • decode_response: optional callable accepting (response_flags, response_payload_bytes) and returning the decoded response payload value; if unspecified, the response payload bytes are used
  • label: optional label for this request to aid debugging

The Result will yield an (ok, flags, payload) tuple, where:

  • ok: whether the request was successful
  • flags: the response flags
  • payload: the response payload, decoded by decode_response if specified

Class ReadableFile(typing.Protocol)

The requirements for a file used to receive.

Method ReadableFile.__subclasshook__(other): Set (or override) the protocol subclass hook.

Method ReadableFile.read(self, size: int) -> bytes: Read up to size bytes.

Class RequestState(RequestState)

A state object tracking a particular request.

Method RequestState.cancel(self): Cancel this request.

Method RequestState.complete(self, flags, payload): Complete the request from an "ok" flags and payload.

Method RequestState.fail(self, flags, payload): Fail the request from a "not ok" flags and payload.

Class SendableFile(typing.Protocol)

The requirements for a file used to send.

Method SendableFile.__subclasshook__(other): Set (or override) the protocol subclass hook.

Method SendableFile.flush(self) -> None: Flush any buffer of written bytes.

Method SendableFile.write(self, bs: bytes) -> int: Write bytes, return the number of bytes written.

Release Log

Release 20240630:

  • New ERQ_Packet to indicate end of client requests.
  • PacketConnection.startup_shutdown: send ERQ_Packet at stream shutdown.
  • New PacketConnection.end_requests() method to queue an ERQ_Packet to the send queue.
  • PacketConnection: send the EOF_Packet from the send worker instead of from startup_shutdown.
  • Rename PacketConnection.do to PacketConnection.call.
  • PacketStream: BREAKING: replace the recv and send parameters with a singe recv_send parameter.
  • PacketStream: do not close supplied connection handles - this allows reuse of an underlying binary connection.
  • Many many logic fixes for clean and orderly shutdown.
  • Rename PacketConnection.request to PacketConnection.submit.
  • New BaseRequest and HasPacketConnection helper classes to make it easy to use a PacketConnection for a protocol.

Release 20240412:

  • PacketConnection: now subclasses MultiOpenMixin, big refactor.
  • PacketConnection.init: use @promote to turn the recv parameter into a CornuCopyBuffer.
  • Fix a deadlock.

Release 20211208:

  • Packet.eq: only test .rq_type if .is_request.
  • Update tests for changes.

Release 20210306:

  • Port to new cs.binary.Binary* classes.
  • Some refactors and small fixes.

Release 20191004:

  • PacketConnection: new optional parameter packet_grace to tune the send delay for additional packets before a flush, default DEFAULT_PACKET_GRACE (0.01s), 0 for no delay.
  • Add a crude packet level activity ticker.

Release 20190221: DISTINFO requirement updates.

Release 20181228: Initial PyPI release.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cs.packetstream-20240630.tar.gz (19.7 kB view hashes)

Uploaded Source

Built Distribution

cs.packetstream-20240630-py3-none-any.whl (17.3 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page