Skip to main content

A python library for running async code in background threads.

Project description

threaded_async

A self-contained Python library for running async code in background threads.

Python offers two primary forms of concurrency: threads (threading) and asynchronous coroutines (asyncio). This library provides support for running asynchronous coroutines in background threads and includes synchronization primitives that simplify coordinating between threaded and async code.

Note that in most cases, threading and coroutines are used independently, as they serve different purposes and solve different problems. Before using this library, it is worth considering whether combining the two is actually necessary in your scenario.

Quick start

Install the threaded_async library using pip install threaded-async.

Event loops can be created on a background thread by instantiating an AsyncRunner and entering its context. Coroutines can be scheduled on these background threads by creating a BackgroundTask:

import asyncio
import threaded_async

async def foo() -> int:
  """Sleep and return a number."""
  await asyncio.sleep(0.1)
  return 10

with threaded_async.AsyncRunner() as runner:
  # Deploy the coroutine as a task on the background thread's event loop.
  background_task = runner.create_task(foo())
  # Block the current thread waiting for the task to complete.
  print(background_task.wait())

A number of synchronization primitives are also provided for bidirectional communication between threads and async code. These include Event, Queue and Future. Below is an example of running an async worker that processes a queue filled by the main thread:

import threaded_async

async def increment_worker(
    in_queue: threaded_async.Queue[int],
    out_queue: threaded_async.Queue[int]):
  """Increment integers from in_queue and put them in out_queue."""
  while True:
    number = await in_queue.get()
    await out_queue.put(number + 1)

with threaded_async.AsyncRunner() as runner:
  in_queue = threaded_async.Queue[int](runner)
  out_queue = threaded_async.Queue[int](runner)
  background_task = runner.create_task(increment_worker(in_queue, out_queue))
  for i in range(10):
    in_queue.put_wait(i)
    print(out_queue.get_wait())
  background_task.cancel()

Control inversion

In some scenarios (e.g., AI scripts controlling video games), it is useful to be able to deploy async code that interacts with an API, such that the timing of when API calls are fulfilled is under the precise control of another thread.

Consider the following example script:

async def client_code(client: Stub):
  for i in range(10):
    print(f'Client got: {await client.increment(i)}')

Typically, calling the increment function would trigger server code that computes the appropriate result, but we would like the server to decide when to process client requests. We refer to this pattern as control inversion, since instead of the client request triggering work on the server, the server triggers the client by providing results to past requests.

This can be accomplished by using queues as described above, but threaded_async provides a convenience Server and Client class to support this use case.

from threaded_async.control_inversion import ExecutionRequest
from threaded_async.threaded_async import Future

class Stub(threaded_async.Client):
  """The interface between async coroutine and main thread."""
  async def increment(self, number: int) -> int:
    return await self.execute(Stub.increment, number)

class MyServer(threaded_async.Server):
  """A server that processes increment requests."""

  def _handle_request(
      self, request: ExecutionRequest[int], future: Future[int]):
    if request.fun == Stub.increment:
      # Handle increment request.
      (number,) = request.args
      future.set_result(number + 1)
    else:
      assert False, f"Unknown function {request.fun}"

server = MyServer()
with server:
  client = Stub(server)
  server.create_background_task(client_code(client))
  for i in range(3):
    print('Processing new client requests')
    server.process()

This will output the following (assuming synchronized printing):

Processing new client requests
Client got: 1
Processing new client requests
Client got: 2
Processing new client requests
Client got: 3

The client code waits on the request to the server until the server.process function is called, which provides results and allows the client code to resume execution.

Development

To work in the development environment, you will need python 3.8+ and uv installed on your system. The following commands can be used to download the code, set up the environment and run tests.

git clone https://github.com/agentic-ai/threaded_async.git
cd threaded_async
uv sync --all-groups
./presubmit.sh  # Run tests / lint / typecheck

You can find the github repository here.

Before submitting a pull request, please ensure ./presubmit.sh completes without errors.

More information

Additional information, e.g., about error handling and shutdown behavior can be found in the cookbook.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

threaded_async-1.4.tar.gz (23.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

threaded_async-1.4-py3-none-any.whl (20.0 kB view details)

Uploaded Python 3

File details

Details for the file threaded_async-1.4.tar.gz.

File metadata

  • Download URL: threaded_async-1.4.tar.gz
  • Upload date:
  • Size: 23.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.1

File hashes

Hashes for threaded_async-1.4.tar.gz
Algorithm Hash digest
SHA256 f7d4aebd1b32263b1dbbc63a842da3b1a6a6751535889868194b07d3b5cd09d6
MD5 c2e9b444f767506388b579aa0e23d8c1
BLAKE2b-256 2fd2512f474f1d5fc2458bceada0bfaab7a171c61c8d547e5356f28e4d94e5e4

See more details on using hashes here.

File details

Details for the file threaded_async-1.4-py3-none-any.whl.

File metadata

  • Download URL: threaded_async-1.4-py3-none-any.whl
  • Upload date:
  • Size: 20.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.1

File hashes

Hashes for threaded_async-1.4-py3-none-any.whl
Algorithm Hash digest
SHA256 f69e32602dc7e97a4697dffd0f658fff5351c5cdd9321cd4af8a71011461d2dd
MD5 1cb95f91b46308cccfa5638ac125ec29
BLAKE2b-256 d73cd5cc6a4124f29048294ed1bd2b613bf7dd351cd6f0cf4fae95cea3807554

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page