Skip to main content

Thread safe message queue

Project description

threadmsg

A lightweight Python library that runs an asyncio event loop in a background thread and lets you send messages to it safely from any other thread.

It is particularly useful for serializing work onto a single background thread — for example, managing a network connection, a database handle, or any resource that must be touched from one thread at a time.

pip install threadmsg

 

Table of contents

 


How it works

ThreadMsg spins up a background thread that runs its own asyncio event loop. You write an async function — the thread function — that is called repeatedly by the loop. It receives a ctx handle back to the ThreadMsg object so it can pull messages off the queue, call helpers, and check whether the thread should keep running.

  Main thread(s)                 Background thread
  ──────────────                 ─────────────────
  t.addMsg(data)   ──[queue]──►  async def myThread(ctx):
  t.call(...)      ──[queue]──►      msg = ctx.getMsg()
  t.stop()         ──[event]──►      ...

Everything in the queue is processed sequentially on the background thread, so you never need locks around the resource you are protecting.

 


Quick start

import asyncio
import threadmsg as tm

async def myThread(ctx):
    msg = ctx.getMsgData()
    if msg:
        print('received:', msg)
        return -1   # stop after the first message

t = tm.ThreadMsg(myThread)
t.addMsg('hello')
t.join()

 


Examples

1 · Simple message passing

The thread function is called whenever a message arrives (or the wait period expires). Pulling a message off the queue with getMsgData() returns the raw data, or None if the queue is empty.

import threadmsg as tm

async def msgThread(ctx):
    msg = ctx.getMsgData()
    if msg:
        print('got:', msg)
        return -1   # exit after processing one message

t = tm.ThreadMsg(msgThread)
t.addMsg('Hello, thread!')
t.join()

 

2 · Passing parameters to the thread

Extra positional parameters passed to ThreadMsg are forwarded to the thread function on every call.

import threadmsg as tm

async def counterThread(ctx, limit):
    if not ctx.run:
        print('exiting')
        return

    print('loop', ctx.loops)

    if ctx.loops >= limit - 1:
        return -1   # done

    return 0.1      # call again after 100 ms

t = tm.ThreadMsg(counterThread, (5,))
t.join()
# prints: loop 0 … loop 4, then 'exiting'

 

3 · Function mapping with call()

The most common pattern is to subclass ThreadMsg, define methods on the class, and use mapMsgAsync + call() to dispatch work. This keeps all resource access inside the background thread while the public API stays simple.

import asyncio
import threadmsg as tm

class MyWorker(tm.ThreadMsg):

    def __init__(self):
        self.callMap = {
            'add': self.add,
            'greet': self.greet,
        }
        super().__init__(self.msgThread, deffk='_fn')

    @staticmethod
    async def msgThread(ctx):
        while msg := ctx.getMsg():
            await ctx.mapMsgAsync(None, ctx.callMap, msg)
        # returning None → wait until the next message arrives

    def add(self, a, b):
        return a + b

    async def greet(self, name):
        return f'hello, {name}!'


async def main():
    worker = MyWorker()

    # Fire-and-forget with a callback
    def on_result(ctx, params, result, err):
        print('add result:', result)

    worker.call(on_result, 'add', a=3, b=4)

    # Await the reply directly
    reply = worker.call('greet', name='world')
    if await reply.wait(5):
        print(reply.getData())   # hello, world!

    worker.join(True)

asyncio.run(main())

 

4 · Waiting for a reply (async)

call() returns a ThreadMsgReply when no callback is provided. Inside an async context the running event loop is captured automatically, and reply.wait(timeout) suspends the caller until the result is ready.

async def main():
    worker = MyWorker()

    reply = worker.call('add', a=10, b=20)

    if await reply.wait(timeout=5):
        print(reply.getData())    # 30
    else:
        print('timed out')

    worker.join(True)

 

5 · Waiting for a reply (sync / no event loop)

Outside an async context there is no event loop to attach to, so use a callback or poll isData() / isError() directly.

import time
import threadmsg as tm

worker = MyWorker()

# Option A: callback
result = []
done = threading.Event()

def cb(ctx, params, ret, err):
    result.append(ret)
    done.set()

worker.call(cb, 'add', a=7, b=8)
done.wait(timeout=5)
print(result[0])   # 15

# Option B: pass a no-loop ThreadMsgReply and poll isData()
tmr = tm.ThreadMsg.ThreadMsgReply(None)

def deliver(ctx, params, ret, err):
    tmr.setData(ret)

worker.call(deliver, 'add', a=1, b=2)

while not tmr.isData():
    time.sleep(0.01)

print(tmr.data)   # 3

worker.join(True)

 


Thread function return values

The value returned by the thread function controls when it is called again.

Return value Behaviour
None (or no return) Wait indefinitely; wake up when a message arrives or notify() is called
0 Call again immediately
0.5 (any positive number) Wait that many seconds, then call again
-1 (any negative number) Stop the thread

After a stop is requested (via a negative return value, or stop() / join(True) from outside), the thread function is called one final time with ctx.run = False. Use this to release resources cleanly.

async def managed(ctx):
    if not ctx.run:
        print('cleaning up')
        db.close()
        return

    # normal work …

 


API reference

ThreadMsg(f, p=(), start=True, deffk=None)

Creates a background thread running f.

Parameter Description
f async def f(ctx, *p) — the thread function
p Tuple of extra arguments forwarded to f on every call
start Start the thread immediately (default True)
deffk Default function-key name used by call() and the mapXxx helpers

Thread control

Method Description
start() Start the thread (only needed when start=False)
stop() Signal the thread to stop; does not block
join(stop=False) Wait for the thread to finish; pass True to stop it first
notify() Wake the thread out of a timed wait without stopping it

Sending messages

Method Description
addMsg(data, cb=None) Push data onto the queue; cb(ctx, data, result, err) is called with the result if provided
call(*args, **kwargs) High-level dispatch — extracts a callback, function name, and params dict from args in any order; returns a ThreadMsgReply when no callback is given

call() accepts its arguments in any order:

# All of these are equivalent
worker.call(my_cb, 'add', a=1, b=2)
worker.call(my_cb, {'_fn': 'add', 'a': 1}, b=2)
worker.call(my_cb, _fn='add', a=1, b=2)

Reading messages (inside the thread function)

Method Description
getMsg() Pop the next message as {'data': ..., 'cb': ...}, or None
getMsgData() Pop the next message and return just the data, or None

Function mapping (inside the thread function)

These helpers inspect the function signature and match parameters by name, so you do not have to unpack dicts manually. They also handle invoking the message callback.

Method Description
mapCall(f, fm, params, **kw) Call f (or look it up in fm) with matching params
mapCallAsync(f, fm, params, **kw) Same as above, but awaits the result if the function is async
mapMsg(f, fm, msg) mapCall + invokes msg['cb'] with the result or error
mapMsgAsync(f, fm, msg) mapCallAsync + invokes msg['cb'], awaiting it if async

f can be:

  • A callable — called directly.
  • A string — used as the key to look up the function name inside params.
  • None — falls back to the deffk default key.

ThreadMsgReply

Returned by call() when no callback is provided.

Method Description
await wait(timeout) Wait up to timeout seconds; returns True if data arrived, False on timeout
getData() Return the result, or None if not yet signalled
getError() Return the exception, or None if not yet signalled
isData() True if setData() has been called (works without an event loop)
isError() True if setError() has been called (works without an event loop)
getParams() Return the params dict that was associated with this reply

Note: getData() and getError() require an async context (event loop) to confirm completion. In a sync context use isData() / isError() and access .data / .err directly.


Thread function context (ctx)

Inside the thread function ctx is the ThreadMsg instance itself, so all of the methods above are available. Two read-only attributes are also useful:

Attribute Description
ctx.loops Number of times the thread function has completed a normal (non-exit) iteration
ctx.run True while running; False during the final exit call

Error handling

By default errors raised inside the thread function are printed to stdout. Replace on_threadmsg_error to handle them yourself:

t = tm.ThreadMsg(myThread)
t.on_threadmsg_error = lambda e: logging.error('thread error: %s', e)

 


Running the tests

pip install pytest pytest-asyncio
pytest

 


Links

 


Alternatives

pykka

pykka implements the actor model. Each actor is a class whose instances run in their own thread. Callers interact with a transparent proxy object — proxy.my_method(a, b) queues the call behind the scenes and returns a Future for the result.

Key differences:

  • Calls are made through a proxy (actor_ref.proxy().method(...)) rather than by explicitly pushing messages onto a queue — there is no getMsg() equivalent
  • ThreadingActor (the default) uses a plain thread with no asyncio; AsyncioActor (v3+) is async-native but runs inside the caller's existing event loop, not a dedicated background thread
  • The actor has no mechanism to control its own call cadence — it cannot sleep, retry immediately, or schedule its next run via a return value
  • pykka is more mature with broader adoption, more documentation, and a larger community

Choose pykka if: your design maps naturally to objects with named methods and you want to call them as if they were ordinary Python calls, just serialized onto a background thread.

Choose threadmsg if: you need asyncio available inside the background thread, want direct access to the message queue, or need the worker to control its own polling and scheduling via return values.


janus

janus provides a single queue object with two faces: a queue.Queue-compatible synchronous interface and an asyncio.Queue-compatible async interface. Any thread can put to one side while a coroutine gets from the other, safely.

Key differences:

  • janus is a queue primitive only — it provides no thread lifecycle management, no dispatch, no reply mechanism, and no function routing
  • You create and manage your own threads, event loops, and any reply/callback logic
  • Very small, focused API; the entire library is essentially one class

Choose janus if: you are already managing your own threads and asyncio loop and need only a well-tested, minimal queue to pass data between them.

Choose threadmsg if: you want a complete solution — thread lifecycle, message dispatch, function routing, and reply objects — rather than a primitive you wire together yourself.


concurrent.futures (stdlib)

concurrent.futures submits callables to a managed pool of worker threads or processes and returns Future objects. The pool controls scheduling; you do not own any individual thread.

Key differences:

  • Work is distributed across a pool of workers, not serialized onto a single dedicated thread — unsuitable for protecting a resource that must be touched from only one thread
  • Workers run synchronous code; there is no asyncio event loop inside worker threads
  • No persistent message queue — work is submitted as executor.submit(fn, *args) rather than pushed onto a queue
  • Ships with Python's standard library; no extra dependencies required

Choose concurrent.futures if: you want to run many independent tasks in parallel, do not need a single dedicated thread, and want zero extra dependencies.

Choose threadmsg if: you need one background thread that exclusively owns a resource, need asyncio available inside that thread, or need the worker to receive structured messages with associated callbacks.


asyncio + threading (stdlib)

Python's stdlib includes the raw primitives to build any thread/async bridge yourself: asyncio.Queue, loop.call_soon_threadsafe(), loop.run_coroutine_threadsafe(), and asyncio.to_thread().

Key differences:

  • No abstraction — all thread lifecycle management, queue wiring, reply handling, error propagation, and function dispatch must be implemented manually
  • asyncio.Queue is not safe to put to from a synchronous thread directly; doing so correctly requires call_soon_threadsafe()
  • Maximum flexibility and zero extra dependencies, at the cost of significant boilerplate

Choose asyncio + threading if: you cannot add dependencies, are already deep inside the asyncio ecosystem and want direct control, or have requirements that an abstraction layer would prevent you from meeting.

Choose threadmsg if: you want the thread lifecycle, queue, reply objects, and function routing handled for you so you can focus on the work the thread actually performs.

 

Summary table

threadmsg pykka janus concurrent.futures asyncio + threading
Dedicated single background thread Yes Yes No No No
asyncio event loop in that thread Yes No ¹ No No Yes (manual)
Cross-thread message queue Yes Yes Yes No No
Return value / reply to caller Yes Yes No Yes Yes
Named function routing Yes Yes (proxy) No No No
Worker controls its own schedule Yes No No No No
No extra dependencies No No No Yes Yes
Relative complexity Low Low Low Low Medium

¹ pykka's ThreadingActor uses a plain thread with no asyncio. AsyncioActor (v3+) is async-native but shares the caller's event loop rather than running in a dedicated background thread.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

threadmsg-0.2.5.tar.gz (22.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

threadmsg-0.2.5-py3-none-any.whl (12.2 kB view details)

Uploaded Python 3

File details

Details for the file threadmsg-0.2.5.tar.gz.

File metadata

  • Download URL: threadmsg-0.2.5.tar.gz
  • Upload date:
  • Size: 22.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.11

File hashes

Hashes for threadmsg-0.2.5.tar.gz
Algorithm Hash digest
SHA256 99c4fa504884fc224248c0e5435a132a14b5dd38349e0759c8eb787c11f1a855
MD5 9322806a34447625b015a6d06a624547
BLAKE2b-256 93eb96263cc88c1db7e215b41309755513dfbf03f21a484ee61f55ca1e29fbd3

See more details on using hashes here.

File details

Details for the file threadmsg-0.2.5-py3-none-any.whl.

File metadata

  • Download URL: threadmsg-0.2.5-py3-none-any.whl
  • Upload date:
  • Size: 12.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.11

File hashes

Hashes for threadmsg-0.2.5-py3-none-any.whl
Algorithm Hash digest
SHA256 c875aef8a9497556364928983ee5b68d572e84734e3af2d0208f4ab8d09af011
MD5 a63137613cbfbcaf1bd3919d21ea83e5
BLAKE2b-256 168fe3a09746dd34282694dfaf994a55aeb9617e91b94b132d11c7525c36fdb6

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page