Skip to main content

async + threads + decorators = ?

Project description

logo

Downloads Downloads Coverage Status Hits-of-Code Tests Python versions PyPI version Checked with mypy Ruff

Данная библиотека решает 3 проблемы:

  • Асинхронное программирование с использованием синтаксиса async / await теряет смысл, если в коде часто встречаются куски с "тяжелыми" вычислениями или иными задачами, которые блокируют event-loop. Зато теперь вы можете навесить на такую "тяжелую" функцию декоратор @awaitable и она станет корутиной, которая будет исполняться в отдельном потоке, не блокируя event-loop. Во всем остальном это будет совершенно обычная корутина.
  • Многопоточное программирование многословно. Чтобы заставить ваш код исполняться в многопоточном режиме, вам нужно создавать объекты потоков, передавать туда нужные функции и запускать потоки. Теперь же вам достаточно навесить на обычную функцию декоратор и она автоматически будет исполняться в многопоточном режиме.
  • Частое создание потоков в программе требует постоянно отслеживать создание потоков и управление ими. Здесь же минимальным уровнем абстракции для вас становится группа потоков (pool of threads), а не какой-то отдельный поток. Ими становится удобно управлять в рамках т. н. "комнат" (rooms) с такими группами, где каждой группе присваивается имя.

Прочитайте документацию ниже, чтобы увидеть, как все это работает.

Оглавление

Быстрый старт

Установите awaits через pip:

$ pip install awaits

Теперь просто импортируйте декоратор @awaitable и примените его к вашей функции. Никаких настроек, ничего лишнего - все уже работает:

import asyncio
from awaits import awaitable

@awaitable
def sum(a, b):
  # Какой-то сложный датасаенз. Что-то, что вычисляется долго и мешает вашему event-loop'у жить.
  return a + b

# Теперь sum - это корутина! Пока она выполняется в отдельном потоке, управление передается в event-loop.
print(asyncio.run(sum(2, 2)))

Готово! Мы сделали из обычной функции неблокирующую ваш event-loop корутину, к которой теперь применим синтаксис await.

Если ваша функция ничего не возвращает, к ней можно применить другой декоратор, @shoot:

from awaits import shoot

@shoot
def hello():
  # Тоже что-то тяжелое, но результат чего вам по какой-то причине не нужен.
  print('Hello world!')

# Функция будет "отстрелена" исполняться в отдельный поток, не блокируя основной.
hello()

Ваша функция будет исполняться в другом потоке, в то время как основной может уже заняться чем-то еще.

Более подробно о возможностях библиотеки awaits читайте ниже.

Как это все работает?

Базовым "примитивом" библиотеки является группа потоков (threads pool). "Сердцем" группы является очередь (queue) с задачами (объектами класса Task). Когда вы создаете новую группу потоков, внутри себя она порождает сколько-то потоков с "воркерами", которые постоянно ждут новых задач из очереди. Как только в очереди появляется новая задача, первый же освободившийся воркер выполняет ее.

Чтобы выполнить в группе произвольную функцию, вам достаточно передать ее туда вместе с необходимыми аргументами. При этом группа вернет вам объект класса Task, в котором по значению атрибута done вы можете отслеживать, выполнена ваша задача или нет. Если она выполнена - можете забрать результат из атрибута result. Более подробно о работе с группами потоков читайте в соответствующем разделе.

Для удобства управления несколькими группами, библиотека содержит абстракцию "комната". По своей сути это обертка вокруг словаря с группами потоков. Обращаясь к "комнате" по ключу, вы либо получаете новую группу потоков, если ранее этой группы не существовало, либо уже имеющуюся группу, если ранее она создавалась. Так вам становится не нужно вручную создавать группы потоков.

Для работы декораторов используется "комната", хранящаяся в синглтоне. Обернутые в декораторы @awaitable и @shoot функции будут выполняться в группах потоков из одной и той же комнаты (по умолчанию - в одной группе потоков под названием "base").

За счет такой компоновки, весь менеджмент потоков происходит "под капотом" и вам больше не нужно задумываться над тем, в каком именно потоке выполнится ваша функция. Она выполнится в том, который раньше всех освободится.

Как работает группа потоков?

Группа потоков - это экземпляр класса ThreadsPool. Импортируем его:

from awaits.pools.threads_pool import ThreadsPool

При инициализации экземпляра будут созданы потоки. Число потоков в группе вы указываете в конструкторе класса:

threads = ThreadsPool(5)

Теперь, когда группа создана, ей можно давать задания, используя метод do():

def function(a, b, c, d=5, e=5):
  return a + b + c + d + e

task = threads.do(function, 1, 2, 3, d=10, e=20)

Первым параметром туда передается функция, которую требуется выполнить, а далее все те же параметры и в том же порядке, как при оригинальном вызове этой функции.

Что тут произошло под капотом? Метод do() создал объект класса Task, передав туда функцию для выполнения и все ее параметры, и положил его в очередь. Объект задачи он вам вернул, чтобы вы могли отслеживать прогресс выполнения и результат. Воркеры из других потоков постоянно ждут появления новых элементов в очереди. Если хоть один из них свободен - он сразу получит вашу задачу и выполнит ее. Если нет, задача будет ждать в очереди освобождения первого воркера.

Как только задача будет выполнена, вы можете получить результат:

# Флаг task.done в положении True свидетельствует о том, что задача выполнена и вы можете получить результат.
while not task.done:
    pass

print(task.result)

Если при выполнении функции случилась ошибка, в объекте задачи атрибут error будет установлен в положение True, а получить экземпляр исключения вы можете из атрибута exception:

def error_function(a, b):
  return a / b

task = threads.do(error_function, 2, 0)

while not task.done:
    pass

if task.error:
  raise task.exception

Что такое "комната"?

Комната (room) - это абстракция над группами потоков, позволяющая отдавать задания разным группам, называя их по именам. По сути это обертка над словарем.

Создадим объект комнаты:

from awaits.threads_pools_room import ThreadsPoolsRoom


room = ThreadsPoolsRoom(5)

Число, передаваемое в конструктор - количество потоков в каждой из групп данной комнаты.

Конкретную группу потоков можно получить, используя синтаксис словаря:

pool = room['some_key']

Поскольку мы впервые обращаемся к комнате по этому ключу, она создаст новый объект класса ThreadsPool и вернет его. При последующих обращениях по этому ключу, она будет возвращать тот же самый объект.

Об объекте задачи

Задача - это объект класса Task. В конструктор объекта первым аргументом передается функция для выполнения, а последующими - ее аргументы:

from awaits.task import Task

def hello_something(something, sign='!'):
  hello_string = f'Hello {something}{sign}'
  print(hello_string)
  return hello_string

task = Task(hello_something, 'world')

В неактивированном состоянии задача просто хранит в себе функцию и ее аргументы. Чтобы выполнить функцию с заданными аргументами, необходимо "вызвать" объект задачи:

task()

Флаг task.done будет установлен в положение True, когда задача будет выполнена. После этого вы можете получить результат выполнения из атрибута result:

while not task.done:
    pass

print(task.result)

Если при выполнении функции случилась ошибка, в объекте задачи атрибут error будет установлен в положение True, а получить экземпляр исключения вы можете из атрибута exception:

def error_function(a, b):
  return a / b

task = threads.do(error_function, 2, 0)

while not task.done:
    pass

if task.error:
  raise task.exception

Декоратор @awaitable

Прочитав документацию выше, вы уже научились создавать группы потоков и комнаты с ними, а также давать потокам на исполнение различные задания. Однако делать даже это вручную не обязательно.

Декоратор @awaitable превращает обычную функцию в корутину, т. е. в функцию, с которой можно работать через await-синтаксис Python. Давайте попробуем создать такую функцию:

from awaits import awaitable

@awaitable
def heavy_math_function(x, y):
  return x * y

При попытке выполнения функции, она будет вести себя как обычная корутина. Однако фактически ее код будет выполняться в группе потоков. Пока код выполняется, управление будет передано в event-loop.

# Проверяем, что это действительно корутина.
print(asyncio.run(heavy_math_function(5, 5)))

"Под капотом" при этом происходит периодический опрос состояния задачи с последующим "засыпанием" (вызовом asyncio.sleep()) на некий промежуток времени. Как только задача выполнена, ее результат возвращается. Если выполнение прервано исключением - оно извлекается из объекта задачи и снова поднимается.

Промежуток, на который функция "засыпает" между опросами о готовности, по умолчанию берется из глобальных настроек библиотеки. При необходимости, вы можете указать его в фабрике декоратора (в секундах):

@awaitable(delay=0.5)
def heavy_math_function(x, y):
  return x * y

Ручное управление вам может быть полезно, к примеру, в случае особо "тяжелых" функций, которые нет смысла опрашивать слишком часто.

Кроме того, отдельным параметром вы можете указать имя группы потоков, в которой вы хотите чтобы выполнялся код. По умолчанию используется группа "base".

@awaitable(pool='gravities')
def heavy_math_function(x, y):
  return x * y

Декоратор @shoot

Этот декоратор проще, чем @awaitable. Обернутая им функция будет просто "отстрелена" в группу потоков, без ожидания результата. Возвращен при этом будет объект класса Task, что позволяет вручную отслеживать статус выполнения.

from awaits import shoot

@shoot
def other_heavy_math_function(x, y):
  return x * y

task = other_heavy_math_function(10, 10)

while not task.done:
    pass

print(task.result)

При необходимости, вы можете указать название группы потоков, в котором хотите, чтобы выполнялась ваша функция:

@shoot(pool='gravities')
def other_heavy_math_function(x, y):
  return x * y

По умолчанию также используется группа "base".

Если основной поток исполнения программы подойдет к концу, "отстреленные" функции могут не успеть выполниться, что может создать у вас ложное впечатление сломанной программы. Не стоит использовать данный декоратор, если для вас это критично.

Настройки

Вы можете настроить параметры по умолчанию самостоятельно. Для этого необходимо вызвать метод set у класса config:

from awaits import config

# Для примера устанавливаем частоту опроса задачи в декораторе @awaitable на значение 0.5 сек.
config.set(delay=10.5)

Данный метод принимает следующие именованные параметры:

pool_size (int) - количество потоков в группе по умолчанию. Важно, чтобы данная настройка была выставлена до выполнения первой задачи. Если не установить этот параметр, он будет равен 10.

delay (int или float) - значение задержки (в секундах) между итерациями опроса завершенности задачи. Используется по умолчанию в декораторе @awaitable. Если не установить это значение вручную, будет использовано число 0.001.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

awaits-0.0.13.tar.gz (19.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

awaits-0.0.13-py3-none-any.whl (16.5 kB view details)

Uploaded Python 3

File details

Details for the file awaits-0.0.13.tar.gz.

File metadata

  • Download URL: awaits-0.0.13.tar.gz
  • Upload date:
  • Size: 19.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for awaits-0.0.13.tar.gz
Algorithm Hash digest
SHA256 0d0d9822cf711ea3545eb9bac413ece2241ebca8a14be086b4871333dda5f058
MD5 9a4713c3f4527040e9ec1cedd0fff9f2
BLAKE2b-256 e9234e139634a2133bf368b5657efe428378cad4571f89c64096ae3f77b6f9b8

See more details on using hashes here.

Provenance

The following attestation bundles were made for awaits-0.0.13.tar.gz:

Publisher: release.yml on pomponchik/awaits

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file awaits-0.0.13-py3-none-any.whl.

File metadata

  • Download URL: awaits-0.0.13-py3-none-any.whl
  • Upload date:
  • Size: 16.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for awaits-0.0.13-py3-none-any.whl
Algorithm Hash digest
SHA256 65ef8c731b781bf368487cb7af7ffa83251e3e12ed8d282da086f08764139627
MD5 1c821975c38e00c381f7c7cb76cf6e2d
BLAKE2b-256 7a82f904c7b82b62ca3adcea3d2af7aedfbf231add3a02212baeaccacc338be4

See more details on using hashes here.

Provenance

The following attestation bundles were made for awaits-0.0.13-py3-none-any.whl:

Publisher: release.yml on pomponchik/awaits

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page