Primary methods
These methods are used most common use cases.
register_queue(q: UNION_SUPPORTED_QUEUES, direction: DIRECTION, start_method: str=None) -> client id: str
stop
Connect and link one or more Queues together or to files
Route messages between any combination of Python queues — fan-out, fan-in, or many-to-many — without the boilerplate.
Connecting queue.Queue, multiprocessing.Queue, and multiprocessing.Manager().Queue by hand means writing your own publisher loops, handling thread-vs-process selection, and dealing with edge cases like pipe size limits and clean shutdown. QueueLink handles all of that:
Automatic thread/process selection — detects whether your queues are thread-based or process-based and creates the right kind of link.
Fan-out and fan-in — one source to many destinations, many sources to one destination, or any combination.
Handle adapters — bridge subprocess pipes, file handles, and multiprocessing connections directly into your queue graph.
Large-message spill-to-disk — transparently buffers oversized objects to disk to avoid pipe size limits.
Tested across fork, forkserver, and spawn — CI runs a 25-job matrix across Linux, macOS, and Python 3.9–3.13.
pip install queuelink
The link() factory function inspects your source and destination and wires everything up automatically — no need to choose between QueueLink, QueueHandleAdapterReader, or QueueHandleAdapterWriter by hand:
import queue
from queuelink import link
src = queue.Queue()
dst = queue.Queue()
# link() returns a result with stop(), close(), and is_alive()
result = link(src, dst)
src.put("hello")
print(dst.get()) # "hello"
result.stop()
link() accepts queues, file handles, file paths, and multiprocessing.connection.Connection as source or destination. Pass a list for fan-out:
result = link(src, [dst1, dst2]) # fan-out to two queues
Reading from a subprocess pipe into a queue:
import queue
from subprocess import Popen, PIPE
from queuelink import link
dest_q = queue.Queue()
proc = Popen(['myprogram'], stdout=PIPE, universal_newlines=True)
result = link(proc.stdout, dest_q)
line = dest_q.get()
result.stop()
Writing from a queue to a file:
import queue
from queuelink import link
src_q = queue.Queue()
with open("output.txt", "w") as f:
result = link(src_q, f)
src_q.put("hello\n")
result.stop()
Use QueueLink directly when you need fine-grained control (registering/unregistering queues at runtime, accessing metrics via get_metrics()).
A QueueLink is a one-way process that connects queues together. When two or more queues are linked, a sub-process is started to read from the “source” queue and write into the “destination” queue.
Circular references are not allowed.
Users create each queue from the Queue or Multiprocessing libraries. Those queues can then be added to a QueueLink instance as either the source or destination.
from queue import Queue
from queuelink import QueueLink
# Source and destination queues
source_q = Queue()
dest_q = Queue()
# Create the QueueLink
queue_link = QueueLink(name="my link")
# Connect queues to the QueueLink
source_id = queue_link.read(q=source_q)
dest_id = queue_link.write(q=dest_q)
# Text to send
text_in = "a😂" * 10
# Add text to the source queue
source_q.put(text_in)
# Retrieve the text from the destination queue!
text_out = dest_q.get()
print(text_out)
from multiprocessing import Manager
from queuelink import QueueLink
# Create the multiprocessing.Manager
manager = Manager()
# Source and destination queues
source_q = manager.JoinableQueue()
dest_q = manager.JoinableQueue()
# Create the QueueLink
queue_link = QueueLink(name="my link")
# Connect queues to the QueueLink
source_id = queue_link.read(q=source_q)
dest_id = queue_link.write(q=dest_q)
# Text to send
text_in = "a😂" * 10
# Add text to the source queue
source_q.put(text_in)
# Retrieve the text from the destination queue!
text_out = dest_q.get()
print(text_out)
These methods are used most common use cases.
register_queue(q: UNION_SUPPORTED_QUEUES, direction: DIRECTION, start_method: str=None) -> client id: str
stop
These methods are less common.
destructive_audit(direction: str)
get_metrics() -> dict — returns latency and message-count data; see Metrics for details.
get_queue(queue_id: [str, int])
is_alive
is_drained
is_empty(queue_id:str =None)
unregister_queue(queue_id: str, direction: str, start_method: str=None)
QueueLink is tested against multiple native Queue implementations. When a source or destination queue is thread-based, the link will be created as a Thread instance. When all involved queues are process-based, the link will also be a Process instance.
Note that in thread-based situations throughput might be limited by the Python GIL.
Two thread-based queues in different processes cannot be bridged directly. They would require an intermediate multiprocessing queue that can be accessed across processes.
Tested against the following queue implementations:
SyncManager.Queue (multiprocessing.Manager)
SyncManager.JoinableQueue (multiprocessing.Manager)
multiprocessing.Queue
multiprocessing.JoinableQueue
multiprocessing.SimpleQueue
queue.Queue
queue.LifoQueue
queue.PriorityQueue
queue.SimpleQueue
QueueLink creates a new thread or process for each source queue, regardless of the number of downstream queues. The linking thread/process gets each element of the source queue and iterates over and puts to the set of destination queues.
Start Method: QueueLink is tested against fork, forkserver, and spawn start methods. It defaults to the system preference, but can be overridden by passing the preferred start method name to the class “start_method” parameter.
QueueLink includes two “adapters” to link queues with inbound and outbound connections.
To quickly link a pipe or handle with a queue, use QueueHandleAdapterReader. The Reader Adapter is tested against Multiprocessing Connections and Subprocess pipes. It calls flush and readline to consume from handles, so it should work against any object implementing those methods, with readline returning a string or byte array. For Multiprocessing Connections, the adapter injects a no-op flush method and a custom readline method.
# Text to send
text_in = "a😂" * 10
# Destination queue
dest_q = multiprocessing.Queue() # Process-based
# Subprocess, simple example sending some text to stdout
# from subprocess import Popen, PIPE
proc = Popen(['echo', '-n', text_in], # -n prevents echo from adding a newline character
stdout=PIPE,
universal_newlines=True,
close_fds=True)
# Connect the reader
# from queuelink import QueueHandleAdapterReader
read_adapter = QueueHandleAdapterReader(queue=dest_q,
handle=proc.stdout)
# Get the text from the queue
text_out = dest_q.get()
print(text_out)
Tuning link_timeout
Under heavily loaded conditions the “publisher” process/thread can thrash when trying to retrieve records from the source queue. Tuning link_timeout higher (default 0.1 seconds) can improve responsiveness. Higher values might be less responsive to stop requests and throw warnings during shutdown.
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
Details for the file queuelink-2.2.3.tar.gz.
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e2462e058f099e338e38de72e112d6bbab55442a0e23fa59019332f6bb9552fc
|
|
| MD5 |
c4995906f4d203a063faaa8d2d0907f9
|
|
| BLAKE2b-256 |
c19ae9d787c74430594f6bdec37a7716b03bb03bc3ec88425b62068d4735c688
|
The following attestation bundles were made for queuelink-2.2.3.tar.gz:
Publisher:
publish.yaml on arobb/python-queuelink
https://in-toto.io/Statement/v1
https://docs.pypi.org/attestations/publish/v1
queuelink-2.2.3.tar.gz
e2462e058f099e338e38de72e112d6bbab55442a0e23fa59019332f6bb9552fc
arobb/python-queuelink@4f5dbf57431df7dce7af755973aef73e9952a088
refs/tags/v2.2.3
public
https://token.actions.githubusercontent.com
github-hosted
publish.yaml@4f5dbf57431df7dce7af755973aef73e9952a088
push
Details for the file queuelink-2.2.3-py3-none-any.whl.
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2e758bdce85e8fbee65bde179105b57e8ebc1348c9cea24d72882fb018e959b7
|
|
| MD5 |
3d2d9adbd77136ab1a54c806a1dc3171
|
|
| BLAKE2b-256 |
9d5ff20ac90a8b8f2a00409b049b4e013b804ca63b264e7ef35323bea6722f8c
|
The following attestation bundles were made for queuelink-2.2.3-py3-none-any.whl:
Publisher:
publish.yaml on arobb/python-queuelink
https://in-toto.io/Statement/v1
https://docs.pypi.org/attestations/publish/v1
queuelink-2.2.3-py3-none-any.whl
2e758bdce85e8fbee65bde179105b57e8ebc1348c9cea24d72882fb018e959b7
arobb/python-queuelink@4f5dbf57431df7dce7af755973aef73e9952a088
refs/tags/v2.2.3
public
https://token.actions.githubusercontent.com
github-hosted
publish.yaml@4f5dbf57431df7dce7af755973aef73e9952a088
push