Skip to main content

This module provides an inter-process lock implementation, eliminating the need to pass around objects for synchronization. Under the hood, the module leverages the shared_memory module.

Project description

shmqueue

Table of Contents


About

This module is currently under development and may undergo frequent changes on the master branch. It might not be free of bugs.

Note: The core architecture and initial implementation were written by hand. Due to time constraints, parts of the core code and the unit tests were completed with AI assistance (GitHub Copilot / Claude).

shmqueue is an inter-process FIFO queue backed by a shared-memory ring buffer. Multiple processes can put and get arbitrary Python objects without passing queue objects between processes — only the queue name is required. Under the hood it uses shmlock for synchronization and msgpack for serialization (with an optional pickle fallback).


When to Use

Good fit:

  • IPC between unrelated processes on the same machine without a broker (no Redis, ZMQ, pipes).
  • Low-to-moderate throughput where a 10 ms poll interval is acceptable.
  • Heterogeneous payloads — msgpack handles most primitives; pickle covers arbitrary Python objects.

Not a good fit:

  • Very high throughput / low latency (the lock uses a polling interval).
  • Cross-machine communication.
  • Situations where pickle deserialization from untrusted data is a security concern — use allow_pickle=False and msgpack-compatible types only.

Notable mention: If performance is the primary concern, shaneyuee/shmqueue is worth a look — it is a C-based, lock-free shared-memory queue. I did not test it myself though.


Installation

git clone <repo-url>
cd shmqueue
pip install -r requirements.txt
pip install .

On Windows, optionally install pywin32 for console signal handling (set_console_handlers()).


Quick Dive

import shmqueue

# ── Producer process ──────────────────────────────────────────────────────────
q = shmqueue.ShmQueue("my_queue", buffer_size=shmqueue.SYSTEM_PAGESIZE)
q.put({"hello": "world"})
q.put(b"raw bytes")
q.put(42)

# ── Consumer process (same name, any process on the same machine) ─────────────
q = shmqueue.ShmQueue("my_queue")
item = q.get()           # blocks until data is available
item = q.get(block=False)  # raises ShmQueueEmpty if nothing there

# ── Non-blocking put ──────────────────────────────────────────────────────────
try:
    q.put("data", block=False)
except shmqueue.exceptions.ShmQueueFull:
    pass

# ── Timeout ───────────────────────────────────────────────────────────────────
item = q.get(block=True, timeout=2.0)   # raises ShmQueueEmpty after 2 s

# ── Status ────────────────────────────────────────────────────────────────────
q.qsize()            # number of items currently in queue
q.empty()            # True / False
q.full()             # True / False
q.buffer_occupancy() # bytes used in ring buffer
snap = q.snapshot()  # all fields in one lock acquisition (ShmRefDataSnapshot)

# ── Cleanup ───────────────────────────────────────────────────────────────────
q.shutdown()         # decrement ref count; last closer frees shared memory

Multi-process example

import multiprocessing, shmqueue, time

QUEUE_NAME = "demo_queue"

def producer():
    q = shmqueue.ShmQueue(QUEUE_NAME)
    for i in range(10):
        q.put(i)
    q.shutdown()

def consumer():
    q = shmqueue.ShmQueue(QUEUE_NAME)
    received = []
    while len(received) < 10:
        try:
            received.append(q.get(block=True, timeout=5.0))
        except shmqueue.exceptions.ShmQueueEmpty:
            break
    print(received)
    q.shutdown()

if __name__ == "__main__":
    q_main = shmqueue.ShmQueue(QUEUE_NAME)   # keep shm alive for duration
    p = multiprocessing.Process(target=producer)
    c = multiprocessing.Process(target=consumer)
    p.start(); c.start()
    p.join();  c.join()
    q_main.shutdown()

Optional: register exit handlers

q = shmqueue.ShmQueue("my_queue")
# Registers signal, atexit, and weakref handlers so shm is released on
# unexpected termination (SIGINT / SIGTERM / console close on Windows).
q.set_console_handlers(register_atexit=True)

Serialization

Method ID Notes
msgpack 0 (default) Fast; supports most primitives and bytes.
pickle 1 Fallback for arbitrary Python objects. Disable with allow_pickle=False.
Custom ≥ 1001 Override custom_serialize / custom_deserialize on ShmCollection.
# Disable pickle:
q = shmqueue.ShmQueue("my_queue", allow_pickle=False)

# Explicit serialization method per put. If picke is disabled, this will raise ShmQueueSerializationError:
q.put("test", serialization_method=shmqueue.SerializationMethods.PICKLE)

Custom serialization

Any method ID ≥ SerializationMethods.FIRST_PUBLIC_NUMBER + 1 is user-defined. Provide a serializer and deserializer:

import struct, shmqueue
from shmqueue import SerializationMethods

MY_METHOD = SerializationMethods.FIRST_PUBLIC_NUMBER + 1

q = shmqueue.ShmQueue("my_queue", buffer_size=1000)

def my_serializer(method: int, data) -> bytes:
    if method == MY_METHOD:
        return struct.pack("I", data)
    raise NotImplementedError

def my_deserializer(method: int, chunks: list) -> int:
    if method == MY_METHOD:
        return struct.unpack("I", b"".join(chunks))[0]
    raise NotImplementedError

q.custom_serialize(my_serializer)
q.custom_deserialize(my_deserializer)

q.put(42, serialization_method=MY_METHOD)
assert q.get() == 42

Troubleshooting and Known Issues

Resource Tracking (POSIX, Python < 3.13)

Python's resource_tracker may emit false-positive warnings about leaked shared-memory segments when multiple processes share the same block. shmqueue suppresses these warnings by default (track_resources=False). To opt back in:

q = shmqueue.ShmQueue("my_queue", track_resources=True)

Buffer size

buffer_size is rounded up to the nearest multiple of SYSTEM_PAGESIZE (typically 4096 bytes). A single item occupies SIZE_HEADER (8 B) + len(msgpack.packb(payload)) bytes. An item larger than buffer_size raises ShmQueueValueError immediately.

Abrupt process termination

If a process is killed during a put/get, the internal shmlock mutex may remain locked. Call set_console_handlers() and/or register_atexit=True to mitigate this. See the shmlock README for details on the underlying lock behavior.


Release History

0.0.2

  • Fix README anchors for correct PyPI rendering

0.0.1

  • Initial release

ToDos

TODO

  • improve debug file json handling (see inline comments in shmqueue_main.py)
  • add put_nowait/get_nowait methods

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

shmqueue-0.0.2.tar.gz (32.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

shmqueue-0.0.2-py3-none-any.whl (23.8 kB view details)

Uploaded Python 3

File details

Details for the file shmqueue-0.0.2.tar.gz.

File metadata

  • Download URL: shmqueue-0.0.2.tar.gz
  • Upload date:
  • Size: 32.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for shmqueue-0.0.2.tar.gz
Algorithm Hash digest
SHA256 3988ced5d6f789be2e246dbbf1978d71268c2c095aa203c92aa8446ba445b905
MD5 3cbcfe63b96dd10bd13b3b8a9b65c720
BLAKE2b-256 f15cbf64f81744d0fc534a74ef20d56dc78c354e31d3ada073c84980685e3c2c

See more details on using hashes here.

Provenance

The following attestation bundles were made for shmqueue-0.0.2.tar.gz:

Publisher: pypi_publish.yml on fwkrumm/shmqueue

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file shmqueue-0.0.2-py3-none-any.whl.

File metadata

  • Download URL: shmqueue-0.0.2-py3-none-any.whl
  • Upload date:
  • Size: 23.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for shmqueue-0.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 e16b52af40b928207a6af16fd966632b767637418b7796283909ef720656f39e
MD5 e332df77e66e98d0298584c3c0d6bac8
BLAKE2b-256 ccd1624e13dc7afa7954f3dec746a4984e37720e8a9f74586e53a5f5463325e4

See more details on using hashes here.

Provenance

The following attestation bundles were made for shmqueue-0.0.2-py3-none-any.whl:

Publisher: pypi_publish.yml on fwkrumm/shmqueue

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page