Skip to main content

Tools to run asyncio tasks concurrently.

Project description

asyncio-concurrent-tasks

tests PyPi python

Tooling to run asyncio tasks.

Background task

Task that is running in the background until cancelled. Can be used as a context manager.

Example usage:

import asyncio
from typing import Callable, Awaitable
from concurrent_tasks import BackgroundTask


class HeartBeat(BackgroundTask):
    def __init__(self, interval: float, func: Callable[[], Awaitable]):
        super().__init__(self._run, interval, func)

    async def _run(self, interval: float, func: Callable[[], Awaitable]) -> None:
        while True:
            await func()
            await asyncio.sleep(interval)

Periodic task

Task that is running periodically in the background until cancelled. Can be used as a context manager. There is no guarantee that the time between calls is strictly the interval if the function takes more time than the interval to execute.

Example usage:

from typing import Callable, Awaitable
from concurrent_tasks import PeriodicTask


class HeartBeat(PeriodicTask):
    def __init__(self, interval: float, func: Callable[[], Awaitable]):
        super().__init__(interval, func)

On time periodic task

Compared to PeriodicTask, this runs consistently on a specific time. When DST switch occurs, intervals greater or equal to 1 day will be run at the same time in the timezone, intervals of less than a day will be run at a consistent interval. It can be used either without an interval to un on a specific time, or regularly.

[!TIP] Use functools.partial to pass arguments if necessary.

[!NOTE] If ignore_overdue is set to True, if the function execution time makes us miss the next run, it will be ignored.

Thread safe task pool

The goal is to be able to safely run tasks from other threads.

Parameters:

  • size can be a positive integer to limit the number of tasks concurrently running.
  • timeout can be set to define a maximum running time for each time after which it will be cancelled. Note: this excludes time spent waiting to be started (time spent in the buffer).

Example usage:

from concurrent_tasks import ThreadSafeTaskPool


async def func():
    ...


async with ThreadSafeTaskPool() as pool:
    # Create and run the task.
    result = await pool.run(func())
    # Create a task, the `concurrent.futures.Future` will hold information about completion.
    future = pool.create_task(func())

Task pool

See ThreadSafeTaskPool, the interface is the same except:

  • it is not thread safe
  • asyncio.Future is used instead of concurrent.futures.Future

Threaded task pool

Run async tasks in a dedicated thread. It will have its own event loop. Under the hook, ThreadSafeTaskPool is used.

Parameters:

  • name will be used as the thread's name.
  • size and timeout see ThreadSafeTaskPool.
  • context_manager can be optional context managers that will be entered when the loop has started and exited before the loop is stopped.

[!NOTE] All tasks will be completed when the pool is stopped.

[!TIP] Blocking and async version are the same, prefer the async version if client code is async.

Loop initialization

[!WARNING] Asyncio primitives need to be instantiated with the proper event loop.

To achieve that, use a context manager wrapping instantiation of objects:

from functools import partial

from concurrent_tasks import ThreadedPoolContextManagerWrapper, AsyncThreadedTaskPool

pool = AsyncThreadedTaskPool(context_manager=ThreadedPoolContextManagerWrapper(partial(MyObj, ...)))

Blocking

This can be used to run async functions in a dedicated event loop, while keeping it running to handle background tasks

Example usage:

from concurrent_tasks import BlockingThreadedTaskPool


async def func():
    ...


with BlockingThreadedTaskPool() as pool:
    # Create and run the task.
    result = pool.run(func())
    # Create a task, the `concurrent.Future` will hold information about completion.
    future = pool.create_task(func())

Async

Threads can be useful in cooperation with asyncio to let the OS guarantee fair resource distribution between threads. This is especially useful in case you cannot know if called code will properly cooperate with the event loop.

Example usage:

from concurrent_tasks import AsyncThreadedTaskPool


async def func():
    ...


async with AsyncThreadedTaskPool() as pool:
    # Create and run the task.
    result = await pool.run(func())
    # Create a task, the asyncio.Future will hold information about completion.
    future = pool.create_task(func())

Restartable task

Task that can be started and cancelled multiple times until it can finally be completed. This is useful to handle pauses and retries when handling with a connection.

[!TIP] Use functools.partial to pass parameters to the function.

Example usage:

from functools import partial
from concurrent_tasks import RestartableTask

async def send(data): ...

task: RestartableTask[int] = RestartableTask(partial(send, b"\x00"), timeout=1)
task.start()
assert await task == 1

# Running in other tasks:

# On connection lost:
task.cancel()
# On connection resumed:
task.start()
# On response received:
task.set_result(1)

Loop exception handler

Shut down process when an unhandled exception is caught or a signal is received. To make this a graceful stop, pass a stop_func.

When creating multiple background tasks, exceptions raised within those will be forwarded directly to the event loop. In order to act on those exceptions, we need to use loop.set_exception_handler.

[!IMPORTANT] When a signal is received and the process is already shutting down, it will be force killed.

Example minimalistic implementation:

import asyncio
from concurrent_tasks import LoopExceptionHandler

async def run():
    event = asyncio.Event()
    tasks = []
    async def _stop():
        await asyncio.gather(*tasks)
        event.set()
    async with LoopExceptionHandler(_stop):
        # Adding a bunch of tasks here...
        await event.wait()

Debouncer

Can be used either through the class Debouncer (which can be gracefully ended through its __aexit__ method) or with the decorator debounce.

  • eager calls the function immediately unless debounced. Debounced calls will return the last result.
  • lazy will call the function at the end of the debounce duration, with the last parameters. All calls will block until the period is over.
  • when both options are used, the first call will return immediately, and subsequent debounced calls will block until the end.

If the output of the debounced function isn't important, AsyncDebouncer can be used.

Robust protocol

Automatically reconnects unless closed purposefully. To be used with asyncio connections and transports.

[!WARNING] When reading data, reconnections will be transparent, so it's up to the consumer to handle incomplete data.

import asyncio
from functools import partial
from concurrent_tasks import RobustStream


async def run():
    async with RobustStream(partial(
        asyncio.get_running_loop().create_connection,
        address="127.0.0.1",
        port=8000,
    )) as protocol:
        await protocol.write(b"...")
        reader = protocol.reader
        data = await reader.readline()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

concurrent_tasks-1.15.1.tar.gz (12.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

concurrent_tasks-1.15.1-py3-none-any.whl (20.9 kB view details)

Uploaded Python 3

File details

Details for the file concurrent_tasks-1.15.1.tar.gz.

File metadata

  • Download URL: concurrent_tasks-1.15.1.tar.gz
  • Upload date:
  • Size: 12.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for concurrent_tasks-1.15.1.tar.gz
Algorithm Hash digest
SHA256 03286717d86fb256fb72fdf03a69f1e8028ccb5f4e63d211f2ad79936492494f
MD5 3707df6f470e05aa219bab93bc0b612b
BLAKE2b-256 bcda6614a2f6795d79e50ed165ceea7d9f9eec62b0084c110a66f96e57ad2740

See more details on using hashes here.

Provenance

The following attestation bundles were made for concurrent_tasks-1.15.1.tar.gz:

Publisher: publish.yml on gpajot/asyncio-concurrent-tasks

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file concurrent_tasks-1.15.1-py3-none-any.whl.

File metadata

File hashes

Hashes for concurrent_tasks-1.15.1-py3-none-any.whl
Algorithm Hash digest
SHA256 50668c0ae4a72e4b337c671a984f1b7ac97906637bc400fd585c99c8608d53dc
MD5 3ceed2d35c4b2da8992b5e100679575a
BLAKE2b-256 2be095d015179cef15c3668b7ad414a37b18593d81a7f26a49bb71c87e3c9e3a

See more details on using hashes here.

Provenance

The following attestation bundles were made for concurrent_tasks-1.15.1-py3-none-any.whl:

Publisher: publish.yml on gpajot/asyncio-concurrent-tasks

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page