Skip to main content

Tiny asyncio-first runtime helpers for service threads, loop ownership, channels, and request waiters

Project description

microio

microio is a tiny asyncio-first runtime helper library for services that own event loops, sockets, background threads, and request/reply waiters.

It is inspired by AnyIO's practical concurrency ideas, especially the problems called out in Why you should be using AnyIO APIs instead of asyncio APIs:

  • task readiness: a child service should be able to report "ready" or "failed" before its parent continues;
  • cancel scopes: stopping is a durable state with a reason, not a one-shot flag that individual operations may miss;
  • memory object streams: producers and consumers should be split into explicit sender/receiver endpoints with clear close semantics;
  • thread bridges: code outside an event-loop thread needs a safe way to submit work into that loop and observe failures.

microio is not a compatibility layer over asyncio, Trio, and Curio. It is also not a reimplementation of AnyIO. It intentionally stays smaller:

  • asyncio only;
  • stdlib only;
  • no generic networking/file APIs;
  • cooperative level cancellation only where code uses microio scopes and checkpoints;
  • no pytest plugin or framework-level dependency injection.

The goal is to make the common "small service runtime" patterns reliable and testable without pulling a full concurrency abstraction into projects that already use asyncio directly.

What It Provides

TaskGroup / CancelScope

create_task_group() wraps asyncio.TaskGroup. It keeps the stdlib failure rules, and adds the missing cancellation/readiness pieces:

  • tg.start_soon(fn, *args) starts a child task;
  • await tg.start(fn, *args) starts a child and waits until it calls task_status.started(value);
  • tg.cancel_scope.cancel() or tg.cancel() cancels owned tasks and treats that as normal shutdown;
  • checkpoint(), checkpoint_if_cancelled(), and sleep() provide cooperative level cancellation for code that uses microio primitives;
  • move_on_after(seconds) suppresses deadline cancellation;
  • fail_after(seconds) turns deadline cancellation into TimeoutError.

The group-cancel path borrows the small asyncio_cancel_scope trick: when a child task or another thread asks a group to stop, microio injects a private task exception into the underlying asyncio.TaskGroup and suppresses just that private exception on exit.

This is still asyncio cancellation. Raw await something() follows asyncio's edge-cancellation rules. Once code returns to a microio checkpoint, cancelled scopes keep raising CancelledError, even if earlier cancellation was caught.

Shielding is not exposed. A partial shield around raw Task.cancel() would look stronger than it is.

from microio import create_task_group, sleep


async def worker():
    while True: await sleep(1)


async with create_task_group() as tg:
    tg.start_soon(worker)
    await sleep(0.1)
    tg.cancel()

CloseScope

CloseScope is a small, thread-safe stop/failure state object. It records whether a service is closing, why it is closing, and whether there is an exception that should be propagated to waiters.

This is separate from CancelScope. CloseScope is for thread-safe service lifecycle state. It does not cancel asyncio tasks for you.

ServiceThread / ServiceGroup

ServiceThread is a supervised threading.Thread:

  • child code calls started() after resources are ready;
  • parents call wait_started() and get either readiness or the startup exception;
  • stop() marks the thread's CloseScope;
  • join_or_log() checks timeout results instead of ignoring them.

Use it for socket threads, protocol readers, and other owned background services.

ServiceGroup owns the repeated lifecycle boilerplate for a small set of service threads:

services = ServiceGroup(iopub, stdin, heartbeat).start().wait_started()
...
services.stop_join(timeout=1)

LoopServiceThread

LoopServiceThread owns an asyncio.Runner inside a thread and exposes:

  • call_soon() for thread-safe callbacks;
  • call_sync() for thread-safe callbacks with a return value;
  • submit() for coroutine submission from other threads;
  • task_group for async work owned by the service;
  • the same ready/failed/stop/join behavior as ServiceThread.

This is the small subset of AnyIO's thread-bridge idea that asyncio services often need: create one loop in one thread, keep ownership clear, submit coroutine work safely, and synchronously run small functions on the loop thread when needed. stop() cancels the service task group, so owned child tasks shut down with the service.

ObjectChannel

create_channel() returns (send, receive) endpoints. A sender can be used from other threads before or after the receiver has bound to an event loop. The receiver is async and supports async for.

This is inspired by AnyIO memory object streams, but adjusted for service threads:

  • the default buffer is unbounded because cross-thread producers often cannot await backpressure;
  • close is explicit and wakes async receivers;
  • receivers raise EndOfStream on direct receive after close;
  • fail(exc) is explicit and wakes async receivers with the exception;
  • late sends raise ClosedResourceError unless late_send="drop" is selected;
  • the implementation is intentionally single-receiver and simple.

Mailbox / ActorCore

Mailbox wraps an ObjectChannel for the common actor shape: thread-safe submit(), async receive, close(), fail(), and drain_nowait().

ActorCore is the tiny serialized consumer loop:

actor = ActorCore(handle)
actor.submit(item)
await actor.run()

It is deliberately not tied to a thread. A service thread, a main-thread runner, or a test can all run the same actor core.

RequestRegistry

RequestRegistry tracks request IDs and waiters:

  • register a request;
  • resolve it from another thread through a ReplyHandle;
  • wait with timeout;
  • wrap the common register-send-wait pattern with request(key, send);
  • fail one or all pending requests on service crash/close.

This is useful for debug adapters, stdin routers, RPC clients, and any protocol where a reader thread must wake request waiters reliably.

Example

import asyncio
from microio import LoopServiceThread, create_channel


class Worker(LoopServiceThread):
    def __init__(self):
        super().__init__(name="worker")
        self.send, self.receive = create_channel()

    async def run_async(self):
        self.receive.bind(asyncio.get_running_loop())
        self.started()
        async for item in self.receive:
            if item == "stop":
                self.stop()
                break
            print(item)


worker = Worker()
worker.start()
worker.wait_started()
worker.send.send_nowait("hello")
worker.send.send_nowait("stop")
worker.join_or_log(timeout=1)

Design Rules

  • Prefer explicit state over hidden magic.
  • Make startup failure visible to the parent.
  • Never ignore a join timeout.
  • Waking pending waiters on close/crash is part of the service contract.
  • Keep asyncio ownership clear: a socket or loop belongs to one service thread.

Development

pip install -e .[dev]
pytest -q

Examples

Run the counter service example:

python examples/counter_server.py

It shows LoopServiceThread, ObjectChannel, RequestRegistry, and CloseScope working together in one small service.

Version lives in microio/__init__.py as __version__.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

microio-0.1.0.tar.gz (21.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

microio-0.1.0-py3-none-any.whl (17.2 kB view details)

Uploaded Python 3

File details

Details for the file microio-0.1.0.tar.gz.

File metadata

  • Download URL: microio-0.1.0.tar.gz
  • Upload date:
  • Size: 21.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.0

File hashes

Hashes for microio-0.1.0.tar.gz
Algorithm Hash digest
SHA256 c29e3f38db224265d48257912d8a5347a95fb54e1ed0f63fb30dbd81774f3aeb
MD5 b57583f171d6e28922ee3452b1e65e31
BLAKE2b-256 2961ce5afa33aa2a13e843de45431f18fb72365ac6042e18d00ea2f8b472142f

See more details on using hashes here.

File details

Details for the file microio-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: microio-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 17.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.0

File hashes

Hashes for microio-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 86456669ec3e604bbdc623f4fecd87aeccd0caba4b0733b6a59012eee6eecb5b
MD5 1702de7527af249f1110ce2524d7678f
BLAKE2b-256 53ddac5409e623d86c26f7e3c716ebe026d28c38e8959b70d0a23c9cffe1526b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page