Thread safe message queue
Project description
threadmsg
A lightweight Python library that runs an asyncio event loop in a background thread and lets you send messages to it safely from any other thread.
It is particularly useful for serializing work onto a single background thread — for example, managing a network connection, a database handle, or any resource that must be touched from one thread at a time.
pip install threadmsg
Table of contents
- How it works
- Quick start
- Examples
- Thread function return values
- API reference
- Running the tests
- Links
- Alternatives
How it works
ThreadMsg spins up a background thread that runs its own asyncio event loop. You write an async function — the thread function — that is called repeatedly by the loop. It receives a ctx handle back to the ThreadMsg object so it can pull messages off the queue, call helpers, and check whether the thread should keep running.
Main thread(s) Background thread
────────────── ─────────────────
t.addMsg(data) ──[queue]──► async def myThread(ctx):
t.call(...) ──[queue]──► msg = ctx.getMsg()
t.stop() ──[event]──► ...
Everything in the queue is processed sequentially on the background thread, so you never need locks around the resource you are protecting.
Quick start
import asyncio
import threadmsg as tm
async def myThread(ctx):
msg = ctx.getMsgData()
if msg:
print('received:', msg)
return -1 # stop after the first message
t = tm.ThreadMsg(myThread)
t.addMsg('hello')
t.join()
Examples
1 · Simple message passing
The thread function is called whenever a message arrives (or the wait period expires). Pulling a message off the queue with getMsgData() returns the raw data, or None if the queue is empty.
import threadmsg as tm
async def msgThread(ctx):
msg = ctx.getMsgData()
if msg:
print('got:', msg)
return -1 # exit after processing one message
t = tm.ThreadMsg(msgThread)
t.addMsg('Hello, thread!')
t.join()
2 · Passing parameters to the thread
Extra positional parameters passed to ThreadMsg are forwarded to the thread function on every call.
import threadmsg as tm
async def counterThread(ctx, limit):
if not ctx.run:
print('exiting')
return
print('loop', ctx.loops)
if ctx.loops >= limit - 1:
return -1 # done
return 0.1 # call again after 100 ms
t = tm.ThreadMsg(counterThread, (5,))
t.join()
# prints: loop 0 … loop 4, then 'exiting'
3 · Function mapping with call()
The most common pattern is to subclass ThreadMsg, define methods on the class, and use mapMsgAsync + call() to dispatch work. This keeps all resource access inside the background thread while the public API stays simple.
import asyncio
import threadmsg as tm
class MyWorker(tm.ThreadMsg):
def __init__(self):
self.callMap = {
'add': self.add,
'greet': self.greet,
}
super().__init__(self.msgThread, deffk='_fn')
@staticmethod
async def msgThread(ctx):
while msg := ctx.getMsg():
await ctx.mapMsgAsync(None, ctx.callMap, msg)
# returning None → wait until the next message arrives
def add(self, a, b):
return a + b
async def greet(self, name):
return f'hello, {name}!'
async def main():
worker = MyWorker()
# Fire-and-forget with a callback
def on_result(ctx, params, result, err):
print('add result:', result)
worker.call(on_result, 'add', a=3, b=4)
# Await the reply directly
reply = worker.call('greet', name='world')
if await reply.wait(5):
print(reply.getData()) # hello, world!
worker.join(True)
asyncio.run(main())
4 · Waiting for a reply (async)
call() returns a ThreadMsgReply when no callback is provided. Inside an async context the running event loop is captured automatically, and reply.wait(timeout) suspends the caller until the result is ready.
async def main():
worker = MyWorker()
reply = worker.call('add', a=10, b=20)
if await reply.wait(timeout=5):
print(reply.getData()) # 30
else:
print('timed out')
worker.join(True)
5 · Waiting for a reply (sync / no event loop)
Outside an async context there is no event loop to attach to, so use a callback or poll isData() / isError() directly.
import time
import threadmsg as tm
worker = MyWorker()
# Option A: callback
result = []
done = threading.Event()
def cb(ctx, params, ret, err):
result.append(ret)
done.set()
worker.call(cb, 'add', a=7, b=8)
done.wait(timeout=5)
print(result[0]) # 15
# Option B: pass a no-loop ThreadMsgReply and poll isData()
tmr = tm.ThreadMsg.ThreadMsgReply(None)
def deliver(ctx, params, ret, err):
tmr.setData(ret)
worker.call(deliver, 'add', a=1, b=2)
while not tmr.isData():
time.sleep(0.01)
print(tmr.data) # 3
worker.join(True)
Thread function return values
The value returned by the thread function controls when it is called again.
| Return value | Behaviour |
|---|---|
None (or no return) |
Wait indefinitely; wake up when a message arrives or notify() is called |
0 |
Call again immediately |
0.5 (any positive number) |
Wait that many seconds, then call again |
-1 (any negative number) |
Stop the thread |
After a stop is requested (via a negative return value, or stop() / join(True) from outside), the thread function is called one final time with ctx.run = False. Use this to release resources cleanly.
async def managed(ctx):
if not ctx.run:
print('cleaning up')
db.close()
return
# normal work …
API reference
ThreadMsg(f, p=(), start=True, deffk=None)
Creates a background thread running f.
| Parameter | Description |
|---|---|
f |
async def f(ctx, *p) — the thread function |
p |
Tuple of extra arguments forwarded to f on every call |
start |
Start the thread immediately (default True) |
deffk |
Default function-key name used by call() and the mapXxx helpers |
Thread control
| Method | Description |
|---|---|
start() |
Start the thread (only needed when start=False) |
stop() |
Signal the thread to stop; does not block |
join(stop=False) |
Wait for the thread to finish; pass True to stop it first |
notify() |
Wake the thread out of a timed wait without stopping it |
Sending messages
| Method | Description |
|---|---|
addMsg(data, cb=None) |
Push data onto the queue; cb(ctx, data, result, err) is called with the result if provided |
call(*args, **kwargs) |
High-level dispatch — extracts a callback, function name, and params dict from args in any order; returns a ThreadMsgReply when no callback is given |
call() accepts its arguments in any order:
# All of these are equivalent
worker.call(my_cb, 'add', a=1, b=2)
worker.call(my_cb, {'_fn': 'add', 'a': 1}, b=2)
worker.call(my_cb, _fn='add', a=1, b=2)
Reading messages (inside the thread function)
| Method | Description |
|---|---|
getMsg() |
Pop the next message as {'data': ..., 'cb': ...}, or None |
getMsgData() |
Pop the next message and return just the data, or None |
Function mapping (inside the thread function)
These helpers inspect the function signature and match parameters by name, so you do not have to unpack dicts manually. They also handle invoking the message callback.
| Method | Description |
|---|---|
mapCall(f, fm, params, **kw) |
Call f (or look it up in fm) with matching params |
mapCallAsync(f, fm, params, **kw) |
Same as above, but awaits the result if the function is async |
mapMsg(f, fm, msg) |
mapCall + invokes msg['cb'] with the result or error |
mapMsgAsync(f, fm, msg) |
mapCallAsync + invokes msg['cb'], awaiting it if async |
f can be:
- A callable — called directly.
- A string — used as the key to look up the function name inside
params. None— falls back to thedeffkdefault key.
ThreadMsgReply
Returned by call() when no callback is provided.
| Method | Description |
|---|---|
await wait(timeout) |
Wait up to timeout seconds; returns True if data arrived, False on timeout |
getData() |
Return the result, or None if not yet signalled |
getError() |
Return the exception, or None if not yet signalled |
isData() |
True if setData() has been called (works without an event loop) |
isError() |
True if setError() has been called (works without an event loop) |
getParams() |
Return the params dict that was associated with this reply |
Note:
getData()andgetError()require an async context (event loop) to confirm completion. In a sync context useisData()/isError()and access.data/.errdirectly.
Thread function context (ctx)
Inside the thread function ctx is the ThreadMsg instance itself, so all of the methods above are available. Two read-only attributes are also useful:
| Attribute | Description |
|---|---|
ctx.loops |
Number of times the thread function has completed a normal (non-exit) iteration |
ctx.run |
True while running; False during the final exit call |
Error handling
By default errors raised inside the thread function are printed to stdout. Replace on_threadmsg_error to handle them yourself:
t = tm.ThreadMsg(myThread)
t.on_threadmsg_error = lambda e: logging.error('thread error: %s', e)
Running the tests
pip install pytest pytest-asyncio
pytest
Links
- PyPI: https://pypi.org/project/threadmsg/
- Repository: https://github.com/wheresjames/threadmsg
- Issues: https://github.com/wheresjames/threadmsg/issues
- License: MIT
Alternatives
pykka
pykka implements the actor model. Each actor is a class whose instances run in their own thread. Callers interact with a transparent proxy object — proxy.my_method(a, b) queues the call behind the scenes and returns a Future for the result.
Key differences:
- Calls are made through a proxy (
actor_ref.proxy().method(...)) rather than by explicitly pushing messages onto a queue — there is nogetMsg()equivalent ThreadingActor(the default) uses a plain thread with no asyncio;AsyncioActor(v3+) is async-native but runs inside the caller's existing event loop, not a dedicated background thread- The actor has no mechanism to control its own call cadence — it cannot sleep, retry immediately, or schedule its next run via a return value
- pykka is more mature with broader adoption, more documentation, and a larger community
Choose pykka if: your design maps naturally to objects with named methods and you want to call them as if they were ordinary Python calls, just serialized onto a background thread.
Choose threadmsg if: you need asyncio available inside the background thread, want direct access to the message queue, or need the worker to control its own polling and scheduling via return values.
janus
janus provides a single queue object with two faces: a queue.Queue-compatible synchronous interface and an asyncio.Queue-compatible async interface. Any thread can put to one side while a coroutine gets from the other, safely.
Key differences:
- janus is a queue primitive only — it provides no thread lifecycle management, no dispatch, no reply mechanism, and no function routing
- You create and manage your own threads, event loops, and any reply/callback logic
- Very small, focused API; the entire library is essentially one class
Choose janus if: you are already managing your own threads and asyncio loop and need only a well-tested, minimal queue to pass data between them.
Choose threadmsg if: you want a complete solution — thread lifecycle, message dispatch, function routing, and reply objects — rather than a primitive you wire together yourself.
concurrent.futures (stdlib)
concurrent.futures submits callables to a managed pool of worker threads or processes and returns Future objects. The pool controls scheduling; you do not own any individual thread.
Key differences:
- Work is distributed across a pool of workers, not serialized onto a single dedicated thread — unsuitable for protecting a resource that must be touched from only one thread
- Workers run synchronous code; there is no asyncio event loop inside worker threads
- No persistent message queue — work is submitted as
executor.submit(fn, *args)rather than pushed onto a queue - Ships with Python's standard library; no extra dependencies required
Choose concurrent.futures if: you want to run many independent tasks in parallel, do not need a single dedicated thread, and want zero extra dependencies.
Choose threadmsg if: you need one background thread that exclusively owns a resource, need asyncio available inside that thread, or need the worker to receive structured messages with associated callbacks.
asyncio + threading (stdlib)
Python's stdlib includes the raw primitives to build any thread/async bridge yourself: asyncio.Queue, loop.call_soon_threadsafe(), loop.run_coroutine_threadsafe(), and asyncio.to_thread().
Key differences:
- No abstraction — all thread lifecycle management, queue wiring, reply handling, error propagation, and function dispatch must be implemented manually
asyncio.Queueis not safe to put to from a synchronous thread directly; doing so correctly requirescall_soon_threadsafe()- Maximum flexibility and zero extra dependencies, at the cost of significant boilerplate
Choose asyncio + threading if: you cannot add dependencies, are already deep inside the asyncio ecosystem and want direct control, or have requirements that an abstraction layer would prevent you from meeting.
Choose threadmsg if: you want the thread lifecycle, queue, reply objects, and function routing handled for you so you can focus on the work the thread actually performs.
Summary table
| threadmsg | pykka | janus | concurrent.futures | asyncio + threading | |
|---|---|---|---|---|---|
| Dedicated single background thread | Yes | Yes | No | No | No |
| asyncio event loop in that thread | Yes | No ¹ | No | No | Yes (manual) |
| Cross-thread message queue | Yes | Yes | Yes | No | No |
| Return value / reply to caller | Yes | Yes | No | Yes | Yes |
| Named function routing | Yes | Yes (proxy) | No | No | No |
| Worker controls its own schedule | Yes | No | No | No | No |
| No extra dependencies | No | No | No | Yes | Yes |
| Relative complexity | Low | Low | Low | Low | Medium |
¹ pykka's
ThreadingActoruses a plain thread with no asyncio.AsyncioActor(v3+) is async-native but shares the caller's event loop rather than running in a dedicated background thread.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file threadmsg-0.2.6.tar.gz.
File metadata
- Download URL: threadmsg-0.2.6.tar.gz
- Upload date:
- Size: 23.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d078aebf0e7ba04939e4db0e2b07b02f4ea57f32c2a7543c6f8bf70cac39d41b
|
|
| MD5 |
70f29ce1a83fa3bcd3e3077723f6a457
|
|
| BLAKE2b-256 |
84c74d57bd82ef95d6e329480f23ee8f9eb41f0c7eb7c15db9c705891050c2a3
|
File details
Details for the file threadmsg-0.2.6-py3-none-any.whl.
File metadata
- Download URL: threadmsg-0.2.6-py3-none-any.whl
- Upload date:
- Size: 12.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
50163a583819b773b43dd66c9d9967b45d676dc93de14c747ae8216f62b4237b
|
|
| MD5 |
eab51550903773826847370c12be62fb
|
|
| BLAKE2b-256 |
1da6e6198b8f54d04e62ab75576c9705c091d9d6d56efa6493f7f5071a2f49b1
|