Skip to main content

Easy & fast network communications using queues

Project description

net-queue

Zero-copy & lock-free network communications using queues

Example

# server.py
import net_queue as nq

with nq.new(purpose=nq.Purpose.SERVER) as queue:
    message = queue.get()
    queue.put("Hello, Client!")
# client.py
import net_queue as nq

with nq.new(purpose=nq.Purpose.CLIENT) as queue:
    queue.put("Hello, Server!")
    message = queue.get()

Benchmark

Configuration Used
SW Python 3.13.5
OS Debian GNU/Linux 13 (trixie)
CPU 13th Gen Intel® Core™ i5-13400 × 16
RAM 64 GB
Test Transfer Operations Executed
Sync 17.18 GB 8.0 K python test/iops.py protocol purpose sync --step-size 0 --max-size 21 --reps 4_000
Async 17.18 GB 8.0 K python test/iops.py protocol purpose async --step-size 0 --max-size 21 --reps 4_000
Mix 8.59 GB 8.19 K python test/iops.py protocol purpose async --min-size 8 --step-size 2 --step-expo 0.5 --max-size 32
Time TCP MQTT gRPC
Sync 5.3 s 36.2 s 23.1 s
Async 8.5 s 27.0 s 20.9 s
Mix 9.2 s 23.8 s 20.2 s
Transfer TCP MQTT gRPC
Sync 25.24 Gbps 3.71 Gbps 5.81 Gbps
Async 15.77 Gbps 4.98 Gbps 6.43 Gbps
Mix 14.91 Gbps 5.78 Gbps 6.79 Gbps
Operations TCP MQTT gRPC
Sync 1500.00 IOPS 220.85 IOPS 346.37 IOPS
Async 939.73 IOPS 296.78 IOPS 383.04 IOPS
Mix 1780.00 IOPS 689.41 IOPS 809.01 IOPS
Memory TCP MQTT gRPC
Sync 21.94 MB 8377.33 MB 30.68 MB
Async 33.39 MB 7585.78 MB 41.46 MB
Mix 8639.32 MB 10481.62 MB 8642.59 MB

Install

Production

pip install net-queue

Development

git clone https://github.com/hpca-uji/net-queue.git
cd net-queue
pip install -e .

Documentation

Constants

  • Protocol:

    Communication protocol

    • TCP
    • MQTT (requires external broker)
    • GRPC
  • Purpose:

    Communication purpose

    • SERVER
    • CLIENT

Structures

  • CommunicatorOptions(...)

    Communicator options

    • id: uuid.UUID = uuid.uuid4() (random)

    • netloc: NetworkLocation = NetworkLocation('127.0.0.1', 51966)

    • connection: ConnectionOptions = ConnectionOptions()

    • serialization: SerializationOptions = SerializationOptions()

    • security: SecurityOptions | None = None

    • workers: int = 1

      Maximum number of threads to use for connection handling. Depending on the protocol 1~3 more maybe used, however they will be idle most of the time. On high throughput applications or high latency networks this may need increasing.

      Selection: Number of expected participants (less +efficiency, more +performance).
      Default: Minimum required resources.

  • ConnectionOptions(...)

    Connection options

    • get_merge: bool = True

      Merge message chunks to a contiguous memory block during receiving, this typically improves performance when processing large messages.

      Merged chunks are up to message_size size, internally a buffer of this size is dynamically allocated.

    • put_merge: bool = True

      Merge message chunks to a contiguous memory block during sending, this typically improves performance when processing small messages.

      Merged chunks are up to transport_size size, internally a buffer of this size is preallocated.

    • transport_size: int = 16 * 1024 ** 2 (16 MiB)

      Maximum chunk size to send to underlying backend before splitting.

      Selection: Maximum according to transport limits (less +streaming/management, more +bursty/merging).
      Default: Balanced CPU/RAM usage.

    • queue_size: int = 1 ** 1024 ** 3 (1 GiM)

      Maximum queued up messages before dropping incoming messages.

      Selection: Maximum according to memory limits.
      Default: Unlimited for typical usage.

    • message_size: int = 1 ** 1024 * 4 (1 TiB)

      Maximum message size to deserialize before attempting splitting. If the deserializer does not supports arbitrary sub-chunks, this setting may raise exceptions on message extraction.

      Selection: Maximum according to memory limits.
      Default: Unlimited for typical usage.

  • SerializationOptions(...)

    Serialization options

    • load: Callable[[Stream], Any] = PickleSerializer().load

      Message deserialization handler

    • dump: Callable[[Any], Stream] = PickleSerializer().dump

      Message serialization handler

  • SecurityOptions(...)

    Security options

    • key: Path | None = None

      Server's private key

      Required for servers, for clients always None.

    • certificate: Path | None = None

      Server's certificate chain or client's trust chain

      Required for servers, for clients if not provided, it defaults to the system's chain.

  • NetworkLocation(...)

    Network location

    Extends: NamedTuple

    • host: str = "127.0.0.1"
    • port: int = 51966

Functions

  • new(protocol, purpose, options)

    Create a communicator.

    • protocol: Protocol = Protocol.TCP
    • purpose: Purpose = Purpose.Client
    • options: CommunicatorOptions = CommunicatorOptions()

Classes

  • core.Communicator(options)

    Communicator implementation

    Operations are thread-safe.

    Communicator has with support.

    • options: CommunicatorOptions = CommunicatorOptions()

    • id: uuid.UUID (helper for options.id)

    • options: CommunicatorOptions

    • put(data: Any, *peers: uuid.UUID) -> Future[None]

      Publish data to peers

      For clients if no peers are defined, data is send to the server. For servers if no peers are defined, data is send to all clients.

      It is preferred to specify multiple peers instead of issuing multiple puts, as data will only be serialized once and protocols may use optimized routes.

      Note: Only servers can send to a particular client.

      Future is resolved when data is safe to mutate again. Future may raise ConnectionError(uuid.UUID) if the peer or itself are closed. Future may raise protocol specific exceptions.

    • get(*peers: uuid.UUID) -> Any

      Get data from peers

      If no peers are defined, data is returned from the first available peer.

      Note: Currently peers can not be specified.

    • close() -> None

      Close the communicator

  • {protocol}.{purpose}.Communicator(options)

    Concrete communicator implementation for the given protocol and purpose

  • utils.stream.PickleSerializer(...)

    Pickle-stream serializer

    Warning: The pickle module is not secure. Only unpickle data you trust.

    • allow: Callable[[str], bool] | None = None

      Returns if global name is trusted and can be loaded.

      Default: allow everything

      Note: Some builtins may be implicitly allowed due to optimizations.

    • dump: Callable[[Any], Any] | None = None

      Returns if the data is persistent, an identifier, otherwise None

      Default: everything transient

    • load: Callable[[Any], Any] | None = None

      Load data from a persistent identifier

      Default: raise exception


    • load(data: Any) -> Stream

      Transform a data into a stream

    • dump(data: Stream) -> Any

      Transform a stream into useful data


    • allow_by_name(*names: str) -> Callable[[str], bool]

      Generate an allow function filtering by global names

      Example: ["builtins"] for a whole module
      Example: ["uuid.UUID"] for a single class

  • utils.stream.BytesSerializer(...)

    Bytes-stream serializer

    • load(data: bytes) -> Stream

      Transform bytes into a stream

    • dump(data: Stream) -> bytes

      Transform a stream into bytes

Notes

Communication conventions

  • ini: connection start (identify)
  • fin: connection stop (flush)
  • com: message exchange (generic)
  • c2s: message exchange (client -> server)
  • s2c: message exchange (server -> client)

Communication handshakes

Ini:

  • Server & client sends ID
  • Server & client wait for ID
  • Server create session or continues session

Fin:

  • Server & client flushes message queue
  • Server & client sends ID
  • Server & client wait for ID

Communication persistency

Ini:

  • Must be done on first or changing connection

Fin:

  • Must be done on session end (not connection)

Communication contract

Constructor

  • Never blocks
  • Only one communicator per ID
  • Reusing ID retain server queues

Put

  • Never blocks
  • Communication will not modify object
  • Consumer must not modify object util future resolved
  • Resolved futures acknowledge peer reception
  • ConnectionError error futures indicates peer disconnected

Get

  • Always block
  • Returns a message or raises ConnectionError
  • Once closed it continues working until exhausted then it raises ConnectionError

Close

  • Always block
  • Server waits for peers to disconnect

TCP

Library: socket
Parallelism: Thread pool (n+1+1 threads)

MQTT

Library: paho-mqtt
Options: tcp transport, 0 QOS, 3.1.1 protocol
Parallelism: Single threaded (1+1+1 threads)

MQTT broker implementations are not common, so the server provided here is actually another client. Therefore the address and port provided to both, the client and server, should be the one of the actual broker, not where the server is running.

The MQTT library handles communications single-threaded, therefore operations on related callbacks are limited to pushing or pulling data from queues without blocking, so all operations are minimal and fast.

Peer-groups and global communications are not optimized.

First, chunked message ordering must be resolved. Single chunk order it is guaranteed by the protocol, even on with different topics. Second, peer-groups could be implemented using grouping requests that generate new UUID per group. This would reduce also reduce load on the broker.

gRPC

Library: grpcio
Options: compression disabled, protobuf disabled
Parallelism: Thread pool (n+1+? threads)

gRPC does not conform well to a async send & async receive model, it expects remote procedure calls to be called, processed and responded. To simulate this model we created a bidirectional streaming procedure. Sent data is queued at the server, received data is polled until available.

This also means there is no eager client reception or eager server send, so polling is required.

Polling is implemented with a exponential backoff time and a limit. The gRPC library queues requests, so requests would always be replied in a timely manner, but we do not want to hogg the CPU or network with useless requests.

It is important to not hold the procedures indefinitely, since this could starve the server of threads. Additionally, if a streaming direction was already closed, messages could end up queued forever if not restarted.

To alleviate network latency queues are flushed unidirectionally in turns, instead of interleaving directions. However on high throughput applications this could lead to a very bursty receive pattern.

Planned

Implement reconnection support. The protocol already has support for it, server support is done, clients can reconnect but can not yet disconnect without flushing.

Implement two-way connection expiration and keep-alives. There is no reliable way to track connection drops between communication implementations. Most of them end up with memory leaks. If desired expiration periods could be long and automatic client reconnections could be allowed, enabling MQTT-like reliability without the cost.

Implement message cancelling support. It is already plausible to cancel a message if it is queued but not buffered. However changing the future from a pending state to running would cause a lock acquire.

Acknowledgments

The library has been partially supported by:

  • Project PID2023-146569NB-C22 "Inteligencia sostenible en el Borde-UJI" funded by the Spanish Ministry of Science, Innovation and Universities.
  • Project C121/23 Convenio "CIBERseguridad post-Cuántica para el Aprendizaje FEderado en procesadores de bajo consumo y aceleradores (CIBER-CAFE)" funded by the Spanish National Cybersecurity Institute (INCIBE).

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

net_queue-2.4.2.tar.gz (38.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

net_queue-2.4.2-py3-none-any.whl (43.0 kB view details)

Uploaded Python 3

File details

Details for the file net_queue-2.4.2.tar.gz.

File metadata

  • Download URL: net_queue-2.4.2.tar.gz
  • Upload date:
  • Size: 38.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for net_queue-2.4.2.tar.gz
Algorithm Hash digest
SHA256 83004daf74ba5cd2b4d99dda28cffe5632a0e7f849a7a6d9d84eb1edb864088a
MD5 b57df8ab4a9d65528960dfc4e1e0db6e
BLAKE2b-256 7b76041890e6aa793f35f170405dc6b8d41c326a525e1497089f4c5e239e1b40

See more details on using hashes here.

File details

Details for the file net_queue-2.4.2-py3-none-any.whl.

File metadata

  • Download URL: net_queue-2.4.2-py3-none-any.whl
  • Upload date:
  • Size: 43.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for net_queue-2.4.2-py3-none-any.whl
Algorithm Hash digest
SHA256 df3ef3f8c9c733c095861be3a3867ef0afca54d165c4479c52b875cc4de0f8a7
MD5 8ad6c6aefeaa506bca2cc655dd21f248
BLAKE2b-256 3a094cb7a4861ad8fd64b87ff8e10f6dd9f881ab24a351227de2ed4cd0ec1be8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page