Python library for writing asynchronous programs.
Project description
Async-Tamer
You shouldn't have to split your codebase or jump through hoops just to use
async def. After all async functions are just functions ... with superpowers.
Async-Tamer helps you harness these superpowers by seeing async def for what
it is: A means to flag and reduce periods of busy waiting. (Details at the
end of this readme.)
Features
- ✅ 100% python
- ✅ 100% free (BSD 3-clause)
- ✅ 100% lean (no dependencies)
Installation
pip install async-tamer
Usage
In a nutshell, you add @tamed to an asynchronous function to enable calling it
directly from either sync and async contexts. You may also assign a @tamed
function to an AsyncScope to get structured lifecycle management.
import asyncio
from tamer import tamed, AsyncScope
@tamed # <-- Notice the decorator
async def slow_echo(msg:str, delay:int) -> None:
await asyncio.sleep(delay)
print(msg)
slow_echo("sync > DELAY(.5)", 0.5)
with AsyncScope() as scope:
slow_echo("scope > DELAY(.2)", .2, _async_scope=scope)
slow_echo("scope > DELAY(.1)", .1, _async_scope=scope)
# implicit await :)
# Output
# ------
#
# sync > DELAY(.5)
# scope > DELAY(.1)
# scope > DELAY(.2)
The @tamed decorator
A @tamed asynchronous function changes its execution policy (how it behaves)
depending on the context it is called from. In synchronous contexts, it behaves
like an ordinary function (blocking). In async contexts, it behaves like an
ordinary coroutine (non-blocking), and when assigned to an AsyncScope it
follows the scopes context manager (non-blocking).
import asyncio
from tamer import tamed, AsyncScope
@tamed # <-- notice the decorator
async def slow_echo(msg:str, delay:int) -> None:
await asyncio.sleep(delay)
print(msg)
# ============================
# Asynchronous Execution
# ============================
async def main():
first = slow_echo("async > DELAY(1)", 1)
second = slow_echo("async > DELAY(.1)", 0.1)
third = slow_echo("async > DELAY(.5)", 0.5)
# Don't forget the await!
await second
await asyncio.gather(first, third)
asyncio.run(main())
# Output
# ------
#
# async > DELAY(.1)
# async > DELAY(.5)
# async > DELAY(1)
# ============================
# Synchronous Execution
# ============================
slow_echo("sync > DELAY(1)", 1)
slow_echo("sync > DELAY(.1)", 0.1)
slow_echo("sync > DELAY(.5)", 0.5)
# Output
# ------
#
# sync > DELAY(1)
# sync > DELAY(.1)
# sync > DELAY(.5)
# ============================
# AsyncScope Execution
# ============================
with AsyncScope() as scope:
slow_echo("scope > DELAY(1)", 1, _async_scope=scope)
slow_echo("scope > DELAY(.1)", 0.1, _async_scope=scope)
slow_echo("scope > DELAY(.5)", 0.5, _async_scope=scope)
# Output
# ------
#
# scope > DELAY(.1)
# scope > DELAY(.5)
# scope > DELAY(1)
Note: The
_async_scopekwarg is injected by the@tameddecorator and is used to add a@tamedfunction to anAsyncScope. The reason you may want to do this is documented with examples in theAsyncScopesection.
Returning Results
@tamed functions know when (and how) you expect results to be returned.
import asyncio
from tamer import tamed, AsyncScope
@tamed
async def slow_io():
await asyncio.sleep(0.1)
return 200, "Time to be awesome!"
# ============================
# Asynchronous Execution
# ============================
async def main():
coro = slow_io() # <-- normal coroutine
return_code, msg = await coro # <-- await the result
print(f"Status {return_code}: `{msg}`")
asyncio.run(main())
# Output
# ------
#
# Status 200: `Time to be awesome!`
# ============================
# Synchronous Execution
# ============================
return_code, msg = slow_io() # <-- immediate result
print(f"Status {return_code}: `{msg}`")
# Output
# ------
#
# Status 200: `Time to be awesome!`
# ============================
# AsyncScope Execution
# ============================
with AsyncScope() as scope:
delayed_result = slow_io(_async_scope=scope)
# <-- implicit await on exit
return_code, msg = delayed_result.value
print(f"Status {return_code}: `{msg}`")
# Output
# ------
#
# Status 200: `Time to be awesome!`
When called with an AsyncScope a @tamed function will return an instance of
DelayedResult. This object represents the result of the @tamed function
and should not be confused with similar concepts like a Future,
asyncio.Task, or Coroutine which represent concurrently executing code.
While those are related objects, a DelayedResult is simpler. For example,
unlike code, results don't execute. As such you can't cancel them nor can you
chain callbacks. They (results) are simply values that a function outputs and in
the case of a DelayedResult it is a value that arrives late to the party.
What you can do with a DelayedResult is await it in an async context or use
it to .block() a synchronous context until it becomes available. Further, you
can inspect it's .value which will either return the result or raise an
AttributeError if the result is unavailable.
import asyncio
from tamer import tamed, AsyncScope
@tamed
async def request(delay:int):
await asyncio.sleep(delay)
return 200, "You are awesome!"
@tamed
async def post_process(raw_result):
ret_code, msg = await raw_result # <-- awaitable in async context
a, b = msg.rsplit(" ", 1)
return ret_code, " ".join((a, "very", b))
with AsyncScope() as scope:
raw_result = request(0.1, _async_scope=scope)
result = post_process(raw_result, _async_scope=scope) # <-- pass it around
try:
return_code, msg = result.value
except AttributeError: # <-- AttributeError, not attived yet
print(f"scope > result: Not yet available.")
result.block() # <-- block in sync context
return_code, msg = result.value
print(f"scope > result: Status {return_code}: `{msg}`")
# Output
# ------
#
# scope > result: Not yet available.
# scope > result: Status 200: `You are very awesome!`
The AsyncScope
An AsyncScope manages a set of @tamed functions and controls their
lifecycle. It's a structured way to add async sections to your program. Long
story short, you need to be aware of 3 keywords:
- Scheduling: Line of code that starts a
@tamedfunction. - Switching: Line of code that switches to another execution context.
- Cleaning: Line of code that deals with error handling.
You schedule a @tamed function by calling it with _async_scope= set to a
meaningful value and the AsyncScope helps with switching and cleaning. To
that end it guarantees that all functions within the scope have finished when
a scope exits. To achieve this, it switches between async contexts at the
end of the scope until all its functions have finished. Note here that finished
does not mean succeeded; functions may raise exceptions or get cancled. This is
where the cleaning part comes in which we cover later during "Exception
Management".
Additionally, you can nest scopes. @tamed functions assigned to an
outer_scope execute independelty and alongside @tamed functions from an
inner_scope and there can be arbitrary switching between them. However,
since the inner_scope waits for all its functions to complete before
switching back to the synchronous context, the scheduling of new functions
below after the inner_scope will wait for inner_scope's completion.
import asyncio
from tamer import tamed, AsyncScope
@tamed
async def slow_echo(msg:str, delay:int) -> None:
await asyncio.sleep(delay)
print(msg)
with AsyncScope() as outer_scope:
slow_echo("Outer Scope > DELAY(1.5)", 1.5, _async_scope=outer_scope)
slow_echo("Outer Scope > DELAY(1)", 1, _async_scope=outer_scope)
with AsyncScope() as inner_scope:
slow_echo("Outer Scope > Inner Scope > DELAY(2)", 2, _async_scope=inner_scope)
slow_echo("Outer Scope > Inner Scope > DELAY(1)", 1, _async_scope=inner_scope)
# await inner_scope functions
# Note: scheduled after inner scope has finished
slow_echo("Outer Scope > DELAY(.5)", 0.5, _async_scope=outer_scope)
# Output
# ------
#
# Outer Scope > DELAY(1)
# Outer Scope > Inner Scope > DELAY(1)
# Outer Scope > DELAY(1.5)
# Outer Scope > Inner Scope > DELAY(2)
# Outer Scope > DELAY(.5)
Just like @tamed and DelayedResult, this works not just in synchronous
contexts (with) but also in asynchronous ones (async with).
import asyncio
from tamer import tamed, AsyncScope
@tamed
async def slow_echo(msg:str, delay:int) -> None:
await asyncio.sleep(delay)
print(msg)
@tamed
async def slow_bulk_echo() -> None:
async with AsyncScope() as outer_scope: # <-- `async with` in async contexts
slow_echo("Outer Scope > DELAY(1.5)", 1.5, _async_scope=outer_scope)
slow_echo("Outer Scope > DELAY(1)", 1, _async_scope=outer_scope)
async with AsyncScope() as inner_scope:
slow_echo("Outer Scope > Inner Scope > DELAY(2)", 2, _async_scope=inner_scope)
slow_echo("Outer Scope > Inner Scope > DELAY(1)", 1, _async_scope=inner_scope)
# await inner_scope functions
# Note: scheduled after inner scope has finished
slow_echo("Outer Scope > DELAY(.5)", 0.5, _async_scope=outer_scope)
slow_bulk_echo()
# Output
# ------
#
# Outer Scope > DELAY(1)
# Outer Scope > Inner Scope > DELAY(1)
# Outer Scope > DELAY(1.5)
# Outer Scope > Inner Scope > DELAY(2)
# Outer Scope > DELAY(.5)
The ability to nest AsyncScopes is especially useful when you combine it with
its kwargs: exit_mode and error_mode. As the names suggest, the
exit_mode controls what happens when the scope exits and the error_mode
controls what happens when an assigned function produces an exception.
By default these are set to exit_mode="wait" and error_mode="cancel". The
former will "wait" for unfinished functions at the end of the scope. The
latter will "cancel" other unfinished functions if one of them fails. This
behavior matches a asyncio.TaskGroup or trio.Nursery. It is useful when you
call functions in batches, e.g., when making several web API calls or reading a
batch of images from disk.
Alternatively, you can use exit_mode="cancel" which will "cancel" unfinished
functions at the end of the scope. This is useful to shut down "infinity loops"
or to cancel ongoing requests for data that you thought you'd need, but didn't.
import asyncio
from datetime import datetime
from tamer import tamed, AsyncScope
class RateLimiter:
def __init__(self):
# allow an initial burst
self.max_tokens = 3
self.tokens = self.max_tokens
@tamed
async def generate_tokens(self, delay:int):
while True: # <-- generate new tokens forever
await asyncio.sleep(delay)
self.tokens = min(self.tokens + 1, self.max_tokens)
@tamed
async def get_token(self):
# Note: This would not work with threads, but is perfectly
# fine in asyncio
while self.tokens == 0:
await asyncio.sleep(0)
self.tokens -= 1
return True
@tamed
async def fake_request(rate_limiter):
await rate_limiter.get_token()
print(datetime.now().strftime("%H:%M:%S.%f"), "Requesting...")
throttle = RateLimiter()
with AsyncScope(exit_mode="cancel") as service_layer:
throttle.generate_tokens(1, _async_scope=service_layer)
with AsyncScope() as batch:
for _ in range(6):
fake_request(throttle, _async_scope=batch)
# <-- wait for all requests to finish
# <-- cancel the rate limiter
# Output
# ------
# 00:22:28.348290 Requesting...
# 00:22:28.348436 Requesting...
# 00:22:28.348564 Requesting...
# 00:22:29.347495 Requesting...
# 00:22:30.347555 Requesting...
# 00:22:31.347597 Requesting...
Exception Management
Unfortunately, shit happens. If it does, Python raises an exception and you, the
author of the program, have to decide how to respond. @tamed async functions
follow suite and there is no difference between them and ordinary functions.
from tamer import tamed, AsyncScope
@tamed
async def faulty_function()
raise RuntimeError("Oh no!")
# ============================
# Asynchronous Execution
# ============================
async def main():
coro = faulty_function()
try:
await coro
except RuntimeError:
print("Actually, I'm good.")
asyncio.run(main())
# Output
# ------
#
# Actually, I'm good.
# ============================
# Synchronous Execution
# ============================
try:
faulty_function()
except RuntimeError:
print("Actually, I'm good.")
# Output
# ------
#
# Actually, I'm good.
# ============================
# AsyncScope Execution
# ============================
with AsyncScope() as scope:
delayed_result = faulty_function(_async_scope=scope)
try:
delayed_result.block()
except RuntimeError:
print("Actually, I'm good.")
# Output
# ------
#
# Actually, I'm good.
The one special case are functions in an AsyncScope. Here, you consume results
using DelayedResult.value but handle exceptions when waiting for a result via
DelayedResult.block() or via await delayed_result. This is deliberate since
any code following the above statements can now assert that the result has
successfully arrived.
The implicit await at the end of an AsyncScope acts as a catch-all that raises
any exceptions that you don't wait for explicitly. This ensures that no exception
is left behind and that your program doesn't produce unintended side-effects.
from tamer import tamed, AsyncScope
@tamed
async def faulty_function()
raise RuntimeError("Oh no!")
with AsyncScope() as scope:
result = faulty_function(_async_scope=scope)
# Output (excerpt)
# ----------------
#
# Traceback (most recent call last):
# [...]
# RuntimeError: Oh no!
Author's Note
Note: This section is quite philosophical and more about why this package exists and less about how you use it. It's the "ramblings of an old man" so I won't tell anyone if you choose to skip it :)
If you read about python's asyncio library online you will find that people generally approach the library with a performance mindset. However, only some actually see its benefits materialize. This results in mixed, but often negative sentiment about the library ranging between asyncio being "fake parallelism", "too complicated to use", or "useful but very niche".
In my mind, this sentiment is typically caused by a mix of three reasons that lead users to believe that asyncio should be used like threads:
- Users think that asyncio uses thread-level parallelism under the hood and/or is used to implement green threads.
- Asyncio uses thread-like semantics (see the table at the end).
- A lot of online documentation and tutorials point out that
async/awaitgives massive performance boosts over thread-based webserver implementations.
Writing asynchronous code while thinking about writing threads is not wrong ...
but it's incomplete and - in my opinion - missdirected. Why? Because they are
two different kinds of parallelism. Threadding takes a big block of work, slices
it into smaller chunks, and works on multiple of them in parallel across
multiple cores. Async/await takes a big block of work, flags periods of busy
waiting and reorders instructions to minimize idle time. In other words,
threadding uses thread-level parallelism (duh!) and async/await uses
instruction-level parallelism. Both are forms of parallelism but they are
NOT the same thing.
| Threads | Async/Await | |
|---|---|---|
| Orchestration | Main thread + inf-loop | Event Loop |
| Creation | tid = Thread(fn, ...) |
coro = fn(...) |
| Synchronization | tid.join() |
await coro |
| Data exchange | shared memory | shared memory |
| Overhead | ~50 µs (OS level thread) | None (function call) |
| Concurrency | preemtive | cooperative |
Implicit Parallelism
Instead of thinking about async/await as multi-threadded code, we should think
about it as code that splits loading and consuming of data. It is aware that
input data comes from external systems (DB, socket, filesystem, blob storage,
...), that these external systems are slow, and that programs spend most of
their time waiting for data.
Asyncios (clever) trick is to realize that we can only execute instructions
one by one (thank you GIL), but we can wait for any number of external systems
in parallel. Traditional (sync) code asks for a single piece of data, does
nothing while it is being prepared (busy waiting), consumes it, and then moves
on to ask for the next piece of data. Async code requests all the data first,
busy waits for everything in parallel to arrive, and then consumes the data
one by one without waiting ever again. This reduces time spent idle from
sum(load_times) (sync) to max(load_times) (async); a trick known as implicit
parallelism or - if you are into compilers and how CPUs work instruction-level
parallelism.
Moreover, if our workload allows processing data out of order, we request all
data at the start, busy wait until the first piece arrives, process it, and then
either move to the next piece or resume busy waiting until data arrives again.
This way we spend between min(load_times) and max(load_times) busy waiting,
which can be a huge speedup compared to sum(load_times) in the sync case.
Why Create Async-Tamer
Realizing how the parallelism behind async/await works and moving away from a
thread-like design simplifies the code and its design. Functions that load data
become async def and functions that consume the data remain as they were.
Unfortunately, it is not simple to express this with asyncio today. We can't use
await outside an async function and executing an async function requires
spinning up an event loop and orchestrating it.
This is why I wrote async-tamer. It limits the extent to which async
proliferates through the codebase, i.e., it @tamed the async keyword ^.^. We
need an event loop and its implicit parallelism while we are waiting for data.
We don't need an event loop when locally processing data and we get into a world
of pain if we attempt to do so. Both parts of a program should be clearly
separated and async def (waiting for external stuff) vs def (processing
local stuff) nicely fits that bill.
The rest of the library really is syntax sugar and an attempt to remove as much
boilerplate as possible. I really like the idea of structured concurrency in
trio and it is exactly what we want when loading an "initial batch" of data.
Thus, AsyncScopes work in a similar fashion. However, I also think that
Nurseries and (asyncio) TaskGroups lack fine-grained control and adding the
lifecycle kwargs felt like a natural extension of the idea.
Thanks for reading all the way to the end and happy coding!
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file async_tamer-1.0.0.tar.gz.
File metadata
- Download URL: async_tamer-1.0.0.tar.gz
- Upload date:
- Size: 17.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.7.1 CPython/3.10.11 Darwin/22.6.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
dfa1dead8d3f95126172d5013697771e579e4b52a21fa92460e06883d949dc46
|
|
| MD5 |
7732de31150e5c8da8ab6fd76550313e
|
|
| BLAKE2b-256 |
582da70f527c7d9e90b7a2d5af18e7b8fe7782e0d70a926ef06c359ac52816dd
|
File details
Details for the file async_tamer-1.0.0-py3-none-any.whl.
File metadata
- Download URL: async_tamer-1.0.0-py3-none-any.whl
- Upload date:
- Size: 13.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.7.1 CPython/3.10.11 Darwin/22.6.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a6e5856ffa78a59d6b3cc627ddb61f900276615c745205c2bc86d93e273e48b1
|
|
| MD5 |
71b6ce8b3f77e516d64c8922885b863e
|
|
| BLAKE2b-256 |
c456bd3cce51c8202e56c43b3f98aaff45fd66b1e3b339a70a748583ab18d5ef
|