Skip to main content

async + threads + decorators = ?

Project description

logo

Downloads Downloads Coverage Status Hits-of-Code Tests Python versions PyPI version Checked with mypy Ruff

Данная библиотека решает 3 проблемы:

  • Асинхронное программирование с использованием синтаксиса async / await теряет смысл, если в коде часто встречаются куски с "тяжелыми" вычислениями или иными задачами, которые блокируют event-loop. Зато теперь вы можете навесить на такую "тяжелую" функцию декоратор @awaitable и она станет корутиной, которая будет исполняться в отдельном потоке, не блокируя event-loop. Во всем остальном это будет совершенно обычная корутина.
  • Многопоточное программирование многословно. Чтобы заставить ваш код исполняться в многопоточном режиме, вам нужно создавать объекты потоков, передавать туда нужные функции и запускать потоки. Теперь же вам достаточно навесить на обычную функцию декоратор и она автоматически будет исполняться в многопоточном режиме.
  • Частое создание потоков в программе требует постоянно отслеживать создание потоков и управление ими. Здесь же минимальным уровнем абстракции для вас становится группа потоков (pool of threads), а не какой-то отдельный поток. Ими становится удобно управлять в рамках т. н. "комнат" (rooms) с такими группами, где каждой группе присваивается имя.

Прочитайте документацию ниже, чтобы увидеть, как все это работает.

Оглавление

Быстрый старт

Установите awaits через pip:

$ pip install awaits

Теперь просто импортируйте декоратор @awaitable и примените его к вашей функции. Никаких настроек, ничего лишнего - все уже работает:

import asyncio
from awaits import awaitable

@awaitable
def sum(a, b):
  # Какой-то сложный датасаенз. Что-то, что вычисляется долго и мешает вашему event-loop'у жить.
  return a + b

# Теперь sum - это корутина! Пока она выполняется в отдельном потоке, управление передается в event-loop.
print(asyncio.run(sum(2, 2)))

Готово! Мы сделали из обычной функции неблокирующую ваш event-loop корутину, к которой теперь применим синтаксис await.

Если ваша функция ничего не возвращает, к ней можно применить другой декоратор, @shoot:

from awaits import shoot

@shoot
def hello():
  # Тоже что-то тяжелое, но результат чего вам по какой-то причине не нужен.
  print('Hello world!')

# Функция будет "отстрелена" исполняться в отдельный поток, не блокируя основной.
hello()

Ваша функция будет исполняться в другом потоке, в то время как основной может уже заняться чем-то еще.

Более подробно о возможностях библиотеки awaits читайте ниже.

Как это все работает?

Базовым "примитивом" библиотеки является группа потоков (threads pool). "Сердцем" группы является очередь (queue) с задачами (объектами класса Task). Когда вы создаете новую группу потоков, внутри себя она порождает сколько-то потоков с "воркерами", которые постоянно ждут новых задач из очереди. Как только в очереди появляется новая задача, первый же освободившийся воркер выполняет ее.

Чтобы выполнить в группе произвольную функцию, вам достаточно передать ее туда вместе с необходимыми аргументами. При этом группа вернет вам объект класса Task, в котором по значению атрибута done вы можете отслеживать, выполнена ваша задача или нет. Если она выполнена - можете забрать результат из атрибута result. Более подробно о работе с группами потоков читайте в соответствующем разделе.

Для удобства управления несколькими группами, библиотека содержит абстракцию "комната". По своей сути это обертка вокруг словаря с группами потоков. Обращаясь к "комнате" по ключу, вы либо получаете новую группу потоков, если ранее этой группы не существовало, либо уже имеющуюся группу, если ранее она создавалась. Так вам становится не нужно вручную создавать группы потоков.

Для работы декораторов используется "комната", хранящаяся в синглтоне. Обернутые в декораторы @awaitable и @shoot функции будут выполняться в группах потоков из одной и той же комнаты (по умолчанию - в одной группе потоков под названием "base").

За счет такой компоновки, весь менеджмент потоков происходит "под капотом" и вам больше не нужно задумываться над тем, в каком именно потоке выполнится ваша функция. Она выполнится в том, который раньше всех освободится.

Как работает группа потоков?

Группа потоков - это экземпляр класса ThreadsPool. Импортируем его:

from awaits.pools.threads_pool import ThreadsPool

При инициализации экземпляра будут созданы потоки. Число потоков в группе вы указываете в конструкторе класса:

threads = ThreadsPool(5)

Теперь, когда группа создана, ей можно давать задания, используя метод do():

def function(a, b, c, d=5, e=5):
  return a + b + c + d + e

task = threads.do(function, 1, 2, 3, d=10, e=20)

Первым параметром туда передается функция, которую требуется выполнить, а далее все те же параметры и в том же порядке, как при оригинальном вызове этой функции.

Что тут произошло под капотом? Метод do() создал объект класса Task, передав туда функцию для выполнения и все ее параметры, и положил его в очередь. Объект задачи он вам вернул, чтобы вы могли отслеживать прогресс выполнения и результат. Воркеры из других потоков постоянно ждут появления новых элементов в очереди. Если хоть один из них свободен - он сразу получит вашу задачу и выполнит ее. Если нет, задача будет ждать в очереди освобождения первого воркера.

Как только задача будет выполнена, вы можете получить результат:

# Флаг task.done в положении True свидетельствует о том, что задача выполнена и вы можете получить результат.
while not task.done:
    pass

print(task.result)

Если при выполнении функции случилась ошибка, в объекте задачи атрибут error будет установлен в положение True, а получить экземпляр исключения вы можете из атрибута exception:

def error_function(a, b):
  return a / b

task = threads.do(error_function, 2, 0)

while not task.done:
    pass

if task.error:
  raise task.exception

Что такое "комната"?

Комната (room) - это абстракция над группами потоков, позволяющая отдавать задания разным группам, называя их по именам. По сути это обертка над словарем.

Создадим объект комнаты:

from awaits.threads_pools_room import ThreadsPoolsRoom


room = ThreadsPoolsRoom(5)

Число, передаваемое в конструктор - количество потоков в каждой из групп данной комнаты.

Конкретную группу потоков можно получить, используя синтаксис словаря:

pool = room['some_key']

Поскольку мы впервые обращаемся к комнате по этому ключу, она создаст новый объект класса ThreadsPool и вернет его. При последующих обращениях по этому ключу, она будет возвращать тот же самый объект.

Об объекте задачи

Задача - это объект класса Task. В конструктор объекта первым аргументом передается функция для выполнения, а последующими - ее аргументы:

from awaits.task import Task

def hello_something(something, sign='!'):
  hello_string = f'Hello {something}{sign}'
  print(hello_string)
  return hello_string

task = Task(hello_something, 'world')

В неактивированном состоянии задача просто хранит в себе функцию и ее аргументы. Чтобы выполнить функцию с заданными аргументами, необходимо "вызвать" объект задачи:

task()

Флаг task.done будет установлен в положение True, когда задача будет выполнена. После этого вы можете получить результат выполнения из атрибута result:

while not task.done:
    pass

print(task.result)

Если при выполнении функции случилась ошибка, в объекте задачи атрибут error будет установлен в положение True, а получить экземпляр исключения вы можете из атрибута exception:

def error_function(a, b):
  return a / b

task = threads.do(error_function, 2, 0)

while not task.done:
    pass

if task.error:
  raise task.exception

Декоратор @awaitable

Прочитав документацию выше, вы уже научились создавать группы потоков и комнаты с ними, а также давать потокам на исполнение различные задания. Однако делать даже это вручную не обязательно.

Декоратор @awaitable превращает обычную функцию в корутину, т. е. в функцию, с которой можно работать через await-синтаксис Python. Давайте попробуем создать такую функцию:

from awaits import awaitable

@awaitable
def heavy_math_function(x, y):
  return x * y

При попытке выполнения функции, она будет вести себя как обычная корутина. Однако фактически ее код будет выполняться в группе потоков. Пока код выполняется, управление будет передано в event-loop.

# Проверяем, что это действительно корутина.
print(asyncio.run(heavy_math_function(5, 5)))

"Под капотом" при этом происходит периодический опрос состояния задачи с последующим "засыпанием" (вызовом asyncio.sleep()) на некий промежуток времени. Как только задача выполнена, ее результат возвращается. Если выполнение прервано исключением - оно извлекается из объекта задачи и снова поднимается.

Промежуток, на который функция "засыпает" между опросами о готовности, по умолчанию берется из глобальных настроек библиотеки. При необходимости, вы можете указать его в фабрике декоратора (в секундах):

@awaitable(delay=0.5)
def heavy_math_function(x, y):
  return x * y

Ручное управление вам может быть полезно, к примеру, в случае особо "тяжелых" функций, которые нет смысла опрашивать слишком часто.

Кроме того, отдельным параметром вы можете указать имя группы потоков, в которой вы хотите чтобы выполнялся код. По умолчанию используется группа "base".

@awaitable(pool='gravities')
def heavy_math_function(x, y):
  return x * y

Декоратор @shoot

Этот декоратор проще, чем @awaitable. Обернутая им функция будет просто "отстрелена" в группу потоков, без ожидания результата. Возвращен при этом будет объект класса Task, что позволяет вручную отслеживать статус выполнения.

from awaits import shoot

@shoot
def other_heavy_math_function(x, y):
  return x * y

task = other_heavy_math_function(10, 10)

while not task.done:
    pass

print(task.result)

При необходимости, вы можете указать название группы потоков, в котором хотите, чтобы выполнялась ваша функция:

@shoot(pool='gravities')
def other_heavy_math_function(x, y):
  return x * y

По умолчанию также используется группа "base".

Если основной поток исполнения программы подойдет к концу, "отстреленные" функции могут не успеть выполниться, что может создать у вас ложное впечатление сломанной программы. Не стоит использовать данный декоратор, если для вас это критично.

Настройки

У данной библиотеки есть 2 настраиваемых глобальных параметра:

  • pool_size (int > 0) - количество потоков в группе по умолчанию. Важно, чтобы данная настройка была выставлена до выполнения первой задачи. По умолчанию 10.

  • delay (int или float >= 0) - значение задержки (в секундах) между итерациями опроса завершенности задачи. Используется по умолчанию в декораторе @awaitable. По умолчанию 0.001.

Рекомендованный способ изменения значений по умолчанию - добавить раздел [tool.awaits] в ваш файл pyproject.toml:

[tool.awaits]
pool_size = 5
delay = 0.3

Также вы можете создать специальный файл настроек awaits.toml или .awaits.toml и разместить эти поля там. Еще можно вы можете создать файл awaits.json или awaits.yaml и сделать там, в синтаксисе, соответствующем расширению файла.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

awaits-0.0.14.tar.gz (20.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

awaits-0.0.14-py3-none-any.whl (16.5 kB view details)

Uploaded Python 3

File details

Details for the file awaits-0.0.14.tar.gz.

File metadata

  • Download URL: awaits-0.0.14.tar.gz
  • Upload date:
  • Size: 20.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for awaits-0.0.14.tar.gz
Algorithm Hash digest
SHA256 ddbd9562db9bd0294f2f2eec0cd42f0ac58afa83dbeb0120e5430425f7efaa59
MD5 e616a4fcb0ae48c6cfc5f244132b13e8
BLAKE2b-256 e8f2de19cad2f81ecff66dadecad0d56feac8fbe124ac33a7be7ccf59442e4c6

See more details on using hashes here.

Provenance

The following attestation bundles were made for awaits-0.0.14.tar.gz:

Publisher: release.yml on pomponchik/awaits

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file awaits-0.0.14-py3-none-any.whl.

File metadata

  • Download URL: awaits-0.0.14-py3-none-any.whl
  • Upload date:
  • Size: 16.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for awaits-0.0.14-py3-none-any.whl
Algorithm Hash digest
SHA256 f82e8215821353e87ac28f52db59581ba33db8769dccb55be295dc28e0f46561
MD5 7d79e963c24949f38eb11def7e4e11d2
BLAKE2b-256 a7c9a9ee9c2f5b2d706d62c2b6c0b3dc09a6de2fe3fd439c28457da5f06a3345

See more details on using hashes here.

Provenance

The following attestation bundles were made for awaits-0.0.14-py3-none-any.whl:

Publisher: release.yml on pomponchik/awaits

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page