Skip to main content

Asynchronous threading package for Python

Project description

athreading

Test and build PyPI version PyPI python versions License Contributor Covenant

Code style Checked with mypy Pydocstyle Codecov

athreading-logo

athreading is a Python library that allows you to run synchronous I/O functions asynchronously using asyncio via background threads. It provides decorators to adapt synchronous functions and generators, enabling them to operate without blocking the event loop.

Features

  • @athreading.call: Adapts a synchronous function into an asynchronous function.
  • @athreading.iterate_callback: Adapts a synchronous function using a callback (push-based stream) into an asynchronous iterator.
  • @athreading.iterate: Adapts a synchronous iterator (pull-based stream) into an asynchronous iterator.
  • @athreading.generate: Adapts a synchronous generator (pull-driven stream) into an asynchronous generator.

[!NOTE] Due to Python <=3.13 Global Interpreter Lock (GIL), this library does not provide multi-threaded CPU parallelism unless using:

  • Python 3.9 with nogil
  • or Python >=3.13 with free threading enabled

Installation

athreading can be installed from PyPI using pip:

pip install athreading

Usage

athreading enables running synchronous functions and iterators asynchronously using asyncio.

1. Adapt a synchronous function

The @athreading.call decorator transforms a synchronous function into an asynchronous function.

>>> import athreading
>>> import time
>>> import math
>>> import asyncio
>>>
>>> @athreading.call
... def compute_sqrt(x):
...     time.sleep(0.05)  # Simulate a blocking I/O operation
...     return math.sqrt(x)
...
>>> async def amain():
...     results = await asyncio.gather(
...         compute_sqrt(2),
...         compute_sqrt(3),
...         compute_sqrt(4)
...     )
...     print(results)

>>> asyncio.run(amain())
[1.4142135623730951, 1.7320508075688772, 2.0]

In this example, compute_sqrt is a synchronous function that sleeps for 0.5 seconds to simulate a blocking I/O operation. By decorating it with @athreading.call, it can be awaited within an asynchronous context, allowing multiple calls to run concurrently without blocking the event loop.

2. Adapt a synchronous function with callback (push-based stream)

The @athreading.iterate_callback decorator transforms a synchronous function using a callback into an asynchronous iterator.

>>> import athreading
>>> import time
>>> import datetime
>>> import asyncio
>>>
>>> @athreading.iterate_callback
... def time_generator(callback, n):
...     for value in range(n):
...         time.sleep(0.05)  # Simulate a blocking I/O operation
...         callback(value)
...
>>> async def aprint_stream(label):
...     async with time_generator(n=10) as stream:
...         async for current_time in stream:
...             print(f"{label}: {current_time}")
...
>>> async def amain():
...
...     await asyncio.gather(
...         aprint_stream("Stream 1"),
...         aprint_stream("Stream 2"),
...         aprint_stream("Stream 3"),
...     )
...
>>> asyncio.run(amain())  # doctest: +ELLIPSIS
Stream ...

3. Adapt a synchronous iterator (pull-based stream)

The @athreading.iterate decorator transforms a synchronous iterator into an asynchronous iterator.

>>> import athreading
>>> import time
>>> import datetime
>>> import asyncio
>>>
>>> @athreading.iterate
... def time_generator(n):
...     for _ in range(n):
...         time.sleep(0.05)  # Simulate a blocking I/O operation
...         yield datetime.datetime.now()
...
>>> async def print_stream(label):
...     async with time_generator(10) as stream:
...         async for current_time in stream:
...             print(f"{label}: {current_time}")
...
>>> async def amain():
...     await asyncio.gather(
...         print_stream("Stream 1"),
...         print_stream("Stream 2"),
...         print_stream("Stream 3"),
...     )
...
>>> asyncio.run(amain())  # doctest: +ELLIPSIS
Stream ...

This example demonstrates running three asynchronous streams concurrently. Each stream processes the time_generator function independently, and the decorator ensures iteration occurs without blocking the event loop.

4. Adapt a synchronous generator (push-and-pull-based stream)

The @athreading.generate decorator converts a synchronous generator function into an asynchronous generator function that supports asend.

>>> import athreading
>>> import time
>>> import asyncio
>>>
>>> @athreading.generate
... def controlled_counter(start, step):
...     current = start
...     while True:
...         time.sleep(0.5)  # Simulate a blocking I/O operation
...         received = yield current
...         current = received if received is not None else current + step
...
>>> async def amain():
...     async with controlled_counter(0, 1) as async_gen:
...         print(await async_gen.asend(None))  # Start the generator
...         print(await async_gen.asend(None))  # Advance with default step
...         print(await async_gen.asend(10))   # Send a new value to control the counter
...         print(await async_gen.asend(None))  # Continue from the new value
...
>>> asyncio.run(amain())
0
1
10
11

This example demonstrates how @athreading.generate transforms a synchronous generator into an asynchronous generator. The asend method sends values to control the generator's state dynamically, enabling interactive workflows while avoiding blocking the event loop.

License

This project is licensed under the BSD-3-Clause License.

For more information and examples, please visit the athreading GitHub repository.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

athreading-0.3.0.tar.gz (10.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

athreading-0.3.0-py3-none-any.whl (12.4 kB view details)

Uploaded Python 3

File details

Details for the file athreading-0.3.0.tar.gz.

File metadata

  • Download URL: athreading-0.3.0.tar.gz
  • Upload date:
  • Size: 10.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for athreading-0.3.0.tar.gz
Algorithm Hash digest
SHA256 d97cb08cbdeb78cbea449f02d618980e830485f9c8d315dbb42640f7bd0ba01e
MD5 7810cf534afc5ae0fe80c5b94a1b3b60
BLAKE2b-256 b3512711f5fd0eb79344b98714aa99a9c3bfa039c74b05b47e6cf051e024ac3a

See more details on using hashes here.

Provenance

The following attestation bundles were made for athreading-0.3.0.tar.gz:

Publisher: publish.yml on calgray/athreading

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file athreading-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: athreading-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 12.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for athreading-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 48b63dd62c002600aab32115c66ac5ab91f404d25dae1e403502b0635e24696b
MD5 04829d586a692d6a3ab416d433d88994
BLAKE2b-256 3df36fd9ed52500e0d0b97f203725a0a2744e8eab62f40de0fa48603d58f17d8

See more details on using hashes here.

Provenance

The following attestation bundles were made for athreading-0.3.0-py3-none-any.whl:

Publisher: publish.yml on calgray/athreading

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page