This module provides an inter-process lock implementation, eliminating the need to pass around objects for synchronization. Under the hood, the module leverages the shared_memory module.
Project description
shmqueue
Table of Contents
- About
- When to Use
- Installation
- Quick Dive
- Serialization
- Troubleshooting and Known Issues
- ToDos
- Release History
About
This module is currently under development and may undergo frequent changes on the master branch. It might not be free of bugs.
Note: The core architecture and initial implementation were written by hand. Due to time constraints, parts of the core code and the unit tests were completed with AI assistance (GitHub Copilot / Claude).
shmqueue is an inter-process FIFO queue backed by a shared-memory ring buffer. Multiple processes can put and get arbitrary Python objects without passing queue objects between processes — only the queue name is required. Under the hood it uses shmlock for synchronization and msgpack for serialization (with an optional pickle fallback).
When to Use
Good fit:
- IPC between unrelated processes on the same machine without a broker (no Redis, ZMQ, pipes).
- Low-to-moderate throughput where a 10 ms poll interval is acceptable.
- Heterogeneous payloads —
msgpackhandles most primitives;picklecovers arbitrary Python objects.
Not a good fit:
- Very high throughput / low latency (the lock uses a polling interval).
- Cross-machine communication.
- Situations where
pickledeserialization from untrusted data is a security concern — useallow_pickle=Falseand msgpack-compatible types only.
Notable mention: If performance is the primary concern, shaneyuee/shmqueue is worth a look — it is a C-based, lock-free shared-memory queue. I did not test it myself though.
Installation
git clone <repo-url>
cd shmqueue
pip install -r requirements.txt
pip install .
On Windows, optionally install pywin32 for console signal handling (set_console_handlers()).
Quick Dive
import shmqueue
# ── Producer process ──────────────────────────────────────────────────────────
q = shmqueue.ShmQueue("my_queue", buffer_size=shmqueue.SYSTEM_PAGESIZE)
q.put({"hello": "world"})
q.put(b"raw bytes")
q.put(42)
# ── Consumer process (same name, any process on the same machine) ─────────────
q = shmqueue.ShmQueue("my_queue")
item = q.get() # blocks until data is available
item = q.get(block=False) # raises ShmQueueEmpty if nothing there
# ── Non-blocking put ──────────────────────────────────────────────────────────
try:
q.put("data", block=False)
except shmqueue.exceptions.ShmQueueFull:
pass
# ── Timeout ───────────────────────────────────────────────────────────────────
item = q.get(block=True, timeout=2.0) # raises ShmQueueEmpty after 2 s
# ── Status ────────────────────────────────────────────────────────────────────
q.qsize() # number of items currently in queue
q.empty() # True / False
q.full() # True / False
q.buffer_occupancy() # bytes used in ring buffer
snap = q.snapshot() # all fields in one lock acquisition (ShmRefDataSnapshot)
# ── Cleanup ───────────────────────────────────────────────────────────────────
q.shutdown() # decrement ref count; last closer frees shared memory
Multi-process example
import multiprocessing, shmqueue, time
QUEUE_NAME = "demo_queue"
def producer():
q = shmqueue.ShmQueue(QUEUE_NAME)
for i in range(10):
q.put(i)
q.shutdown()
def consumer():
q = shmqueue.ShmQueue(QUEUE_NAME)
received = []
while len(received) < 10:
try:
received.append(q.get(block=True, timeout=5.0))
except shmqueue.exceptions.ShmQueueEmpty:
break
print(received)
q.shutdown()
if __name__ == "__main__":
q_main = shmqueue.ShmQueue(QUEUE_NAME) # keep shm alive for duration
p = multiprocessing.Process(target=producer)
c = multiprocessing.Process(target=consumer)
p.start(); c.start()
p.join(); c.join()
q_main.shutdown()
Optional: register exit handlers
q = shmqueue.ShmQueue("my_queue")
# Registers signal, atexit, and weakref handlers so shm is released on
# unexpected termination (SIGINT / SIGTERM / console close on Windows).
q.set_console_handlers(register_atexit=True)
Serialization
| Method | ID | Notes |
|---|---|---|
msgpack |
0 (default) | Fast; supports most primitives and bytes. |
pickle |
1 | Fallback for arbitrary Python objects. Disable with allow_pickle=False. |
| Custom | ≥ 1001 | Override custom_serialize / custom_deserialize on ShmCollection. |
# Disable pickle:
q = shmqueue.ShmQueue("my_queue", allow_pickle=False)
# Explicit serialization method per put. If picke is disabled, this will raise ShmQueueSerializationError:
q.put("test", serialization_method=shmqueue.SerializationMethods.PICKLE)
Custom serialization
Any method ID ≥ SerializationMethods.FIRST_PUBLIC_NUMBER + 1 is user-defined. Provide a serializer and deserializer:
import struct, shmqueue
from shmqueue import SerializationMethods
MY_METHOD = SerializationMethods.FIRST_PUBLIC_NUMBER + 1
q = shmqueue.ShmQueue("my_queue", buffer_size=1000)
def my_serializer(method: int, data) -> bytes:
if method == MY_METHOD:
return struct.pack("I", data)
raise NotImplementedError
def my_deserializer(method: int, chunks: list) -> int:
if method == MY_METHOD:
return struct.unpack("I", b"".join(chunks))[0]
raise NotImplementedError
q.custom_serialize(my_serializer)
q.custom_deserialize(my_deserializer)
q.put(42, serialization_method=MY_METHOD)
assert q.get() == 42
Troubleshooting and Known Issues
Resource Tracking (POSIX, Python < 3.13)
Python's resource_tracker may emit false-positive warnings about leaked shared-memory segments when multiple processes share the same block. shmqueue suppresses these warnings by default (track_resources=False). To opt back in:
q = shmqueue.ShmQueue("my_queue", track_resources=True)
Buffer size
buffer_size is rounded up to the nearest multiple of SYSTEM_PAGESIZE (typically 4096 bytes). A single item occupies SIZE_HEADER (8 B) + len(msgpack.packb(payload)) bytes. An item larger than buffer_size raises ShmQueueValueError immediately.
Abrupt process termination
If a process is killed during a put/get, the internal shmlock mutex may remain locked. Call set_console_handlers() and/or register_atexit=True to mitigate this. See the shmlock README for details on the underlying lock behavior.
Release History
0.0.2
- Fix README anchors for correct PyPI rendering
0.0.1
- Initial release
ToDos
TODO
- improve debug file json handling (see inline comments in
shmqueue_main.py) - add put_nowait/get_nowait methods
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file shmqueue-0.0.2.tar.gz.
File metadata
- Download URL: shmqueue-0.0.2.tar.gz
- Upload date:
- Size: 32.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3988ced5d6f789be2e246dbbf1978d71268c2c095aa203c92aa8446ba445b905
|
|
| MD5 |
3cbcfe63b96dd10bd13b3b8a9b65c720
|
|
| BLAKE2b-256 |
f15cbf64f81744d0fc534a74ef20d56dc78c354e31d3ada073c84980685e3c2c
|
Provenance
The following attestation bundles were made for shmqueue-0.0.2.tar.gz:
Publisher:
pypi_publish.yml on fwkrumm/shmqueue
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
shmqueue-0.0.2.tar.gz -
Subject digest:
3988ced5d6f789be2e246dbbf1978d71268c2c095aa203c92aa8446ba445b905 - Sigstore transparency entry: 1648945115
- Sigstore integration time:
-
Permalink:
fwkrumm/shmqueue@2b546ede4536d3d23473cf62da2e30bc401a1856 -
Branch / Tag:
refs/heads/master - Owner: https://github.com/fwkrumm
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
pypi_publish.yml@2b546ede4536d3d23473cf62da2e30bc401a1856 -
Trigger Event:
push
-
Statement type:
File details
Details for the file shmqueue-0.0.2-py3-none-any.whl.
File metadata
- Download URL: shmqueue-0.0.2-py3-none-any.whl
- Upload date:
- Size: 23.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e16b52af40b928207a6af16fd966632b767637418b7796283909ef720656f39e
|
|
| MD5 |
e332df77e66e98d0298584c3c0d6bac8
|
|
| BLAKE2b-256 |
ccd1624e13dc7afa7954f3dec746a4984e37720e8a9f74586e53a5f5463325e4
|
Provenance
The following attestation bundles were made for shmqueue-0.0.2-py3-none-any.whl:
Publisher:
pypi_publish.yml on fwkrumm/shmqueue
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
shmqueue-0.0.2-py3-none-any.whl -
Subject digest:
e16b52af40b928207a6af16fd966632b767637418b7796283909ef720656f39e - Sigstore transparency entry: 1648945308
- Sigstore integration time:
-
Permalink:
fwkrumm/shmqueue@2b546ede4536d3d23473cf62da2e30bc401a1856 -
Branch / Tag:
refs/heads/master - Owner: https://github.com/fwkrumm
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
pypi_publish.yml@2b546ede4536d3d23473cf62da2e30bc401a1856 -
Trigger Event:
push
-
Statement type: