Skip to main content

A tiny event loop for Python.

Project description

tinyio

A tiny (~400 lines) event loop for Python

Ever used asyncio and wished you hadn't?

tinyio is a dead-simple event loop for Python, born out of my frustration with trying to get robust error handling with asyncio. (I'm not the only one running into its sharp corners: link1, link2.)

This is an alternative for the simple use-cases, where you just need an event loop, and want to crash the whole thing if anything goes wrong. (Raising an exception in every coroutine so it can clean up its resources.)

import tinyio

def slow_add_one(x: int):
    yield tinyio.sleep(1)
    return x + 1

def foo():
    four, five = yield [slow_add_one(3), slow_add_one(4)]
    return four, five

loop = tinyio.Loop()
out = loop.run(foo())
assert out == (4, 5)
  • Somewhat unusually, our syntax uses yield rather than await, but the behaviour is the same. Await another coroutine with yield coro. Await on multiple with yield [coro1, coro2, ...] (a 'gather' in asyncio terminology; a 'nursery' in trio terminology).
  • An error in one coroutine will cancel all coroutines across the entire event loop.
    • If the erroring coroutine is sequentially depended on by a chain of other coroutines, then we chain their tracebacks for easier debugging.
    • Errors propagate to and from synchronous operations ran in threads.
  • Can nest tinyio loops inside each other, none of this one-per-thread business.
  • Ludicrously simple. No need for futures, tasks, etc. Here's the entirety of the day-to-day API:
    tinyio.Loop
    tinyio.run_in_thread
    tinyio.sleep
    tinyio.CancelledError
    

Installation

pip install tinyio

Documentation

Loops

Create a loop with tinyio.Loop(). It has a single method, .run(coro), which consumes a coroutine, and which returns the output of that coroutine.

Coroutines can yield four possible things:

  • yield: yield nothing, this just pauses and gives other coroutines a chance to run.
  • yield coro: wait on a single coroutine, in which case we'll resume with the output of that coroutine once it is available.
  • yield [coro1, coro2, coro3]: wait on multiple coroutines by putting them in a list, and resume with a list of outputs once all have completed. This is what asyncio calls a 'gather' or 'TaskGroup', and what trio calls a 'nursery'.
  • yield {coro1, coro2, coro3}: schedule one or more coroutines but do not wait on their result - they will run independently in the background.

If you yield on the same coroutine multiple times (e.g. in a diamond dependency pattern) then the coroutine will be scheduled once, and on completion all dependees will receive its output. (You can even do this if the coroutine has already finished: yield on it to retrieve its output.)

Threading

Blocking functions can be ran in threads using tinyio.run_in_thread(fn, *args, **kwargs), which gives a coroutine you can yield on. Example:

import time, tinyio

def slow_blocking_add_one(x: int) -> int:
    time.sleep(1)
    return x + 1

def foo(x: int):
    out = yield [tinyio.run_in_thread(slow_blocking_add_one, x) for _ in range(3)]
    return out

loop = tinyio.Loop()
out = loop.run(foo(x=1))  # runs in one second, not three
assert out == [2, 2, 2]

Sleeping

This is tinyio.sleep(delay_in_seconds), which is a coroutine you can yield on.

Error propagation

If any coroutine raises an error, then:

  1. All coroutines across the entire loop will have tinyio.CancelledError raised in them (from whatever yield point they are currently waiting at).
  2. Any functions ran in threads via tinyio.run_in_thread will also have tinyio.CancelledError raised in the thread.
  3. The original error is raised out of loop.run(...). This behaviour can be configured (e.g. to collect errors into a BaseExceptionGroup) by setting loop.run(..., exception_group=None/False/True).

This gives every coroutine a chance to shut down gracefully. Debuggers like patdb offer the ability to navigate across exceptions in an exception group, allowing you to inspect the state of all coroutines that were related to the error.

Synchronisation

We ship batteries-included with the usual collection of standard operations for synchronisation.

Click to expand
tinyio.as_completed       tinyio.Semaphore
tinyio.Barrier            tinyio.ThreadPool
tinyio.Event              tinyio.timeout
tinyio.Lock               tinyio.TimeoutError

  • tinyio.as_completed({coro1, coro2, ...})

    This schedules multiple coroutines in the background (like yield {coro1, coro2, ...}), and then offers their results in the order they complete.

    This is iterated over in the following way, using its .done() and .get() methods:

    def main():
        iterator = yield tinyio.as_completed({coro1, coro2, coro3})
        for next_coro in iterator:
            result = yield next_coro
    

  • tinyio.Barrier(value)

    This has a single method barrier.wait(), which is a coroutine you can yield on. Once value many coroutines have yielded on this method then it will unblock.


  • tinyio.Event()

    This is a wrapper around a boolean flag, initialised with False. This has the following methods:

    • .is_set(): return the value of the flag.
    • .set(): set the flag to True.
    • .clear(): set the flag to False.
    • .wait(timeout_in_seconds=None), which is a coroutine you can yield on. This will unblock if the internal flag is True or if timeout_in_seconds seconds pass. (Typically the former is accomplished by calling .set() from another coroutine or from a thread.)

  • tinyio.Lock()

    This is just a convenience for tinyio.Semaphore(value=1), see below.


  • tinyio.Semaphore(value)

    This manages an internal counter that is initialised at value, is decremented when entering a region, and incremented when exiting. This blocks if this counter is at zero. In this way, at most value coroutines may acquire the semaphore at a time.

    This is used as:

    semaphore = Semaphore(value)
    
    ...
    
    with (yield semaphore()):
        ...
    

  • tinyio.timeout(coro, timeout_in_seconds)

    This is a coroutine you can yield on, used as output, success = yield tinyio.timeout(coro, timeout_in_seconds).

    This runs coro for at most timeout_in_seconds. If it succeeds in that time then the pair (output, True) is returned . Else this will return (None, False), and coro will be halted by raising tinyio.TimeoutError inside it.


  • tinyio.ThreadPool(max_threads)

    This is equivalent to making multiple tinyio.run_in_thread calls, but will limit the number of threads to at most max_threads. Additional work after that will block until a thread becomes available.

    This has two methods:

    • .run_in_thread(fn, *args, **kwargs), which is a coroutine you can yield on. This is equivalent to yield tinyio.run_in_thread(fn, *args, **kwargs).
    • .map(fn, xs), which is a coroutine you can yield on. This is equivalent to yield [tinyio.run_in_thread(fn, x) for x in xs].

Asynchronous context managers

Click to expand

You can use the following pattern to implement context managers with asynchronous creation:

import contextlib
import tinyio

def my_context_manager(x):
    print("Initialising...")
    yield tinyio.sleep(1)
    print("Initialised")
    return make_context_manager(x)

@contextlib.contextmanager
def make_context_manager(x):
    try:
        yield x
    finally:
        print("Cleaning up")

def my_coro():
    with (yield my_context_manager(x=5)) as val:
        print(f"Got val {val}")

tinyio.Loop().run(my_coro())

This isn't anything fancier than just using a coroutine that returns a regular with-compatible context manager. See tinyio.Semaphore for an example of this pattern.

Asynchronous iterators

Click to expand

You can use the following pattern to implement asychronous iterators:

import tinyio

def slow_range(x):  # this function is an iterator-of-coroutines
for i in range(x):
    yield slow_range_i(i)  # this `yield` statement is seen by the `for` loop

def slow_range_i(i):  # this function is a coroutine
yield tinyio.sleep(1)  # this `yield` statement is seen by the `tinyio.Loop()`
return i

def my_coro():
for x in slow_range(5):
    x = yield x
    print(f"Got {x}")

tinyio.Loop().run(my_coro())

Here we just have yield being used in a couple of different ways that you're already used to:

  • as a regular Python generator/iterator;
  • as a tinyio coroutine.

For an example of this, see tinyio.as_completed.

Integration with asyncio and trio

We have support for putting trio event loops within asyncio/trio event loops, or vice-versa.

Click to expand
tinyio.to_asyncio         tinyio.to_trio
tinyio.from_asyncio       tinyio.from_trio

  • tinyio.to_asyncio(coro, exception_group=None)

    This converts a tinyio coroutine into an asyncio coroutine.

    For example:

    def add_one(x):
        yield tinyio.sleep(1)
        return x + 1
    
    async def foo(x):
        y = await tinyio.to_asyncio(add_one(x))
        return y
    
    asyncio.run(foo(3))
    

  • tinyio.from_asyncio(coro)

    This converts an asyncio coroutine into a tinyio coroutine.

    WARNING!
    This works by running the entire asyncio portion in a separate thread. This may lead to surprises if the asyncio and non-asyncio portions interact in non-threadsafe ways.

    For example:

    async def add_one(x):
        await asyncio.sleep(1)
        return x + 1
    
    def foo(x):
        y = yield tinyio.from_asyncio(add_one(x))
        return y
    
    tinyio.Loop().run(foo(3))
    

  • tinyio.to_trio(coro, exception_group=None)

    This converts a tinyio coroutine into an trio coroutine.

    For example:

    def add_one(x):
        yield tinyio.sleep(1)
        return x + 1
    
    async def foo(x):
        y = await tinyio.to_trio(add_one(x))
        return y
    
    trio.run(foo, 3)
    

  • tinyio.from_trio(async_fn, *args)

    This converts an trio coroutine into a tinyio coroutine.

    For example:

    async def add_one(x):
        await trio.sleep(1)
        return x + 1
    
    def foo(x):
        y = yield tinyio.from_trio(add_one, x)
        return y
    
    tinyio.Loop().run(foo(3))
    

Advanced topic: isolating coroutines without crashing the others

Click to expand

If you would like to run a coroutine (and transitively, all the coroutines it yields) without crashing the entire event loop, then you can use tinyio.isolate. This will run the coroutine in a nested tinyio.Loop – so that it crashing will only affect everything in that nested loop – and then return the result or the error.

def some_coroutine(x):
    if x == 3:
        raise ValueError("Oh no, a bug!")
    yield

def someone_elses_buggy_code():
    yield [some_coroutine(x=3), another_coroutine()]

def main():
    app = someone_elses_buggy_code()
    result_or_error, success = yield tinyio.isolate(app)
    # at this point then `someone_elses_buggy_code`, `some_coroutine` and
    #`another_coroutine` have been cancelled, but `main` is still running.

FAQ

Why yield - why not await like is normally seen for coroutines?

The reason is that await does not offer a suspension point to an event loop (it just calls __await__ and maybe that offers a suspension point), so if we wanted to use that syntax then we'd need to replace yield coro with something like await tinyio.Task(coro). The traditional syntax is not worth the extra class.

I have a function I want to be a coroutine, but it has zero yield statements, so it is just a normal function?

You can distinguish it from a normal Python function by putting if False: yield somewhere inside its body. Another common trick is to put a yield statement after the final return statement. Bit ugly but oh well.

vs asyncio or trio?

I wasted a lot of time trying to get correct error propagation with asyncio, trying to reason whether my tasks would be cleaned up correctly or not (edge-triggered vs level-triggered etc etc). trio is excellent but still has a one-loop-per-thread rule, and doesn't propagate cancellations to/from threads. These points inspired me to try writing my own.

tinyio has the following unique features, and as such may be the right choice if any of the following are must-haves for you:

  • the propagation of errors to/from threads;
  • no one-loop-per-thread rule;
  • simple+robust error semantics (crash the whole loop if anything goes wrong);
  • tiny, hackable, codebase.

Conversely, at least right now we don't ship batteries-included with a few things:

  • asynchronously launching subprocesses / making network requests / accessing the file system (in all cases use run_in_thread instead);
  • scheduling work on the event loop whilst cleaning up from errors.

If none of the bullet points are must-haves for you, or if any of its limitations are dealbreakers, then either trio or asyncio are likely to be better choices. :)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

tinyio-0.4.0.tar.gz (25.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

tinyio-0.4.0-py3-none-any.whl (32.4 kB view details)

Uploaded Python 3

File details

Details for the file tinyio-0.4.0.tar.gz.

File metadata

  • Download URL: tinyio-0.4.0.tar.gz
  • Upload date:
  • Size: 25.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for tinyio-0.4.0.tar.gz
Algorithm Hash digest
SHA256 cda66598bc3210a126b1f1dfc59bd5eb686c908aac46f7bdd0fe2cb027775c87
MD5 c81c6db808e75d0a162efa9be66222cc
BLAKE2b-256 378d38a381e1cad4c104c3a9668bff8bad1a347d1d39b93ac3c70cf4eacfc8ce

See more details on using hashes here.

File details

Details for the file tinyio-0.4.0-py3-none-any.whl.

File metadata

  • Download URL: tinyio-0.4.0-py3-none-any.whl
  • Upload date:
  • Size: 32.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for tinyio-0.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 b73959c271860bc2fe48d02bd3f6048c948544f361083798c85b698133a0579e
MD5 27dd1d04f3ddc4491a9c72a093f81288
BLAKE2b-256 1940b246179829cebd11150e545bf28e5a6fa57a5f1597c25690840e738c0ca9

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page