Tiny asyncio-first runtime helpers for service threads, loop ownership, channels, and request waiters
Project description
microio
microio is a tiny asyncio-first runtime helper library for services that own
event loops, sockets, background threads, and request/reply waiters.
It is inspired by AnyIO's practical concurrency ideas, especially the problems called out in Why you should be using AnyIO APIs instead of asyncio APIs:
- task readiness: a child service should be able to report "ready" or "failed" before its parent continues;
- cancel scopes: stopping is a durable state with a reason, not a one-shot flag that individual operations may miss;
- memory object streams: producers and consumers should be split into explicit sender/receiver endpoints with clear close semantics;
- thread bridges: code outside an event-loop thread needs a safe way to submit work into that loop and observe failures.
microio is not a compatibility layer over asyncio, Trio, and Curio. It is also
not a reimplementation of AnyIO. It intentionally stays smaller:
- asyncio only;
- stdlib only;
- no generic networking/file APIs;
- cooperative level cancellation only where code uses
microioscopes and checkpoints; - no pytest plugin or framework-level dependency injection.
The goal is to make the common "small service runtime" patterns reliable and testable without pulling a full concurrency abstraction into projects that already use asyncio directly.
What It Provides
TaskGroup / CancelScope
create_task_group() wraps asyncio.TaskGroup. It keeps the stdlib failure
rules, and adds the missing cancellation/readiness pieces:
tg.start_soon(fn, *args)starts a child task;await tg.start(fn, *args)starts a child and waits until it callstask_status.started(value);tg.cancel_scope.cancel()ortg.cancel()cancels owned tasks and treats that as normal shutdown;checkpoint(),checkpoint_if_cancelled(), andsleep()provide cooperative level cancellation for code that usesmicroioprimitives;move_on_after(seconds)suppresses deadline cancellation;fail_after(seconds)turns deadline cancellation intoTimeoutError.
The group-cancel path borrows the small asyncio_cancel_scope trick: when a
child task or another thread asks a group to stop, microio injects a private
task exception into the underlying asyncio.TaskGroup and suppresses just that
private exception on exit.
This is still asyncio cancellation. Raw await something() follows asyncio's
edge-cancellation rules. Once code returns to a microio checkpoint, cancelled
scopes keep raising CancelledError, even if earlier cancellation was caught.
Shielding is not exposed. A partial shield around raw Task.cancel() would look
stronger than it is.
from microio import create_task_group, sleep
async def worker():
while True: await sleep(1)
async with create_task_group() as tg:
tg.start_soon(worker)
await sleep(0.1)
tg.cancel()
CloseScope
CloseScope is a small, thread-safe stop/failure state object. It records whether
a service is closing, why it is closing, and whether there is an exception that
should be propagated to waiters.
This is separate from CancelScope. CloseScope is for thread-safe service
lifecycle state. It does not cancel asyncio tasks for you.
ServiceThread / ServiceGroup
ServiceThread is a supervised threading.Thread:
- child code calls
started()after resources are ready; - parents call
wait_started()and get either readiness or the startup exception; stop()marks the thread'sCloseScope;join_or_log()checks timeout results instead of ignoring them.
Use it for socket threads, protocol readers, and other owned background services.
ServiceGroup owns the repeated lifecycle boilerplate for a small set of service
threads:
services = ServiceGroup(iopub, stdin, heartbeat).start().wait_started()
...
services.stop_join(timeout=1)
LoopServiceThread
LoopServiceThread owns an asyncio.Runner inside a thread and exposes:
call_soon()for thread-safe callbacks;call_sync()for thread-safe callbacks with a return value;submit()for coroutine submission from other threads;task_groupfor async work owned by the service;- the same ready/failed/stop/join behavior as
ServiceThread.
This is the small subset of AnyIO's thread-bridge idea that asyncio services often
need: create one loop in one thread, keep ownership clear, submit coroutine work
safely, and synchronously run small functions on the loop thread when needed.
stop() cancels the service task group, so owned child tasks shut down with the
service.
ObjectChannel
create_channel() returns (send, receive) endpoints. A sender can be used from
other threads before or after the receiver has bound to an event loop. The receiver
is async and supports async for.
This is inspired by AnyIO memory object streams, but adjusted for service threads:
- the default buffer is unbounded because cross-thread producers often cannot await backpressure;
- close is explicit and wakes async receivers;
- receivers raise
EndOfStreamon direct receive after close; fail(exc)is explicit and wakes async receivers with the exception;- late sends raise
ClosedResourceErrorunlesslate_send="drop"is selected; - the implementation is intentionally single-receiver and simple.
Mailbox / ActorCore
Mailbox wraps an ObjectChannel for the common actor shape: thread-safe
submit(), async receive, close(), fail(), and drain_nowait().
ActorCore is the tiny serialized consumer loop:
actor = ActorCore(handle)
actor.submit(item)
await actor.run()
It is deliberately not tied to a thread. A service thread, a main-thread runner, or a test can all run the same actor core.
RequestRegistry
RequestRegistry tracks request IDs and waiters:
- register a request;
- resolve it from another thread through a
ReplyHandle; - wait with timeout;
- wrap the common register-send-wait pattern with
request(key, send); - fail one or all pending requests on service crash/close.
This is useful for debug adapters, stdin routers, RPC clients, and any protocol where a reader thread must wake request waiters reliably.
Example
import asyncio
from microio import LoopServiceThread, create_channel
class Worker(LoopServiceThread):
def __init__(self):
super().__init__(name="worker")
self.send, self.receive = create_channel()
async def run_async(self):
self.receive.bind(asyncio.get_running_loop())
self.started()
async for item in self.receive:
if item == "stop":
self.stop()
break
print(item)
worker = Worker()
worker.start()
worker.wait_started()
worker.send.send_nowait("hello")
worker.send.send_nowait("stop")
worker.join_or_log(timeout=1)
Design Rules
- Prefer explicit state over hidden magic.
- Make startup failure visible to the parent.
- Never ignore a join timeout.
- Waking pending waiters on close/crash is part of the service contract.
- Keep asyncio ownership clear: a socket or loop belongs to one service thread.
Development
pip install -e .[dev]
pytest -q
Examples
Run the counter service example:
python examples/counter_server.py
It shows LoopServiceThread, ObjectChannel, RequestRegistry, and
CloseScope working together in one small service.
Version lives in microio/__init__.py as __version__.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file microio-0.1.0.tar.gz.
File metadata
- Download URL: microio-0.1.0.tar.gz
- Upload date:
- Size: 21.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c29e3f38db224265d48257912d8a5347a95fb54e1ed0f63fb30dbd81774f3aeb
|
|
| MD5 |
b57583f171d6e28922ee3452b1e65e31
|
|
| BLAKE2b-256 |
2961ce5afa33aa2a13e843de45431f18fb72365ac6042e18d00ea2f8b472142f
|
File details
Details for the file microio-0.1.0-py3-none-any.whl.
File metadata
- Download URL: microio-0.1.0-py3-none-any.whl
- Upload date:
- Size: 17.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
86456669ec3e604bbdc623f4fecd87aeccd0caba4b0733b6a59012eee6eecb5b
|
|
| MD5 |
1702de7527af249f1110ce2524d7678f
|
|
| BLAKE2b-256 |
53ddac5409e623d86c26f7e3c716ebe026d28c38e8959b70d0a23c9cffe1526b
|