Skip to main content

async + threads + decorators = ?

Project description

logo

Downloads Downloads codecov Hits-of-Code Tests Python versions PyPI version Checked with mypy Ruff

Данная библиотека решает 3 проблемы:

  • Асинхронное программирование с использованием синтаксиса async / await теряет смысл, если в коде часто встречаются куски с "тяжелыми" вычислениями или иными задачами, которые блокируют event-loop. Зато теперь вы можете навесить на такую "тяжелую" функцию декоратор @awaitable и она станет корутиной, которая будет исполняться в отдельном потоке, не блокируя event-loop. Во всем остальном это будет совершенно обычная корутина.
  • Многопоточное программирование многословно. Чтобы заставить ваш код исполняться в многопоточном режиме, вам нужно создавать объекты потоков, передавать туда нужные функции и запускать потоки. Теперь же вам достаточно навесить на обычную функцию декоратор и она автоматически будет исполняться в многопоточном режиме.
  • Частое создание потоков в программе требует постоянно отслеживать создание потоков и управление ими. Здесь же минимальным уровнем абстракции для вас становится группа потоков (pool of threads), а не какой-то отдельный поток. Ими становится удобно управлять в рамках т. н. "комнат" (rooms) с такими группами, где каждой группе присваивается имя.

Прочитайте документацию ниже, чтобы увидеть, как все это работает.

Оглавление

Быстрый старт

Установите awaits через pip:

$ pip install awaits

Теперь просто импортируйте декоратор @awaitable и примените его к вашей функции. Никаких настроек, ничего лишнего - все уже работает:

import asyncio
from awaits import awaitable


@awaitable
def sum(a, b):
  # Какой-то сложный датасаенз. Что-то, что вычисляется долго и мешает вашему event-loop'у жить.
  return a + b

# Теперь sum - это корутина! Пока она выполняется в отдельном потоке, управление передается в event-loop.
print(asyncio.run(sum(2, 2)))

Готово! Мы сделали из обычной функции неблокирующую ваш event-loop корутину, к которой теперь применим синтаксис await.

Если ваша функция ничего не возвращает, к ней можно применить другой декоратор, @shoot:

from awaits import shoot


@shoot
def hello():
  # Тоже что-то тяжелое, но результат чего вам по какой-то причине не нужен.
  print('Hello world!')

# Функция будет "отстрелена" исполняться в отдельный поток, не блокируя основной.
hello()

Ваша функция будет исполняться в другом потоке, в то время как основной может уже заняться чем-то еще.

Более подробно о возможностях библиотеки awaits читайте ниже.

Как это все работает?

Базовым "примитивом" библиотеки является группа потоков (threads pool). "Сердцем" группы является очередь (queue) с задачами (объектами класса Task). Когда вы создаете новую группу потоков, внутри себя она порождает сколько-то потоков с "воркерами", которые постоянно ждут новых задач из очереди. Как только в очереди появляется новая задача, первый же освободившийся воркер выполняет ее.

Чтобы выполнить в группе произвольную функцию, вам достаточно передать ее туда вместе с необходимыми аргументами. При этом группа вернет вам объект класса Task, в котором по значению атрибута done вы можете отслеживать, выполнена ваша задача или нет. Если она выполнена - можете забрать результат из атрибута result. Более подробно о работе с группами потоков читайте в соответствующем разделе.

Для удобства управления несколькими группами, библиотека содержит абстракцию "комната". По своей сути это обертка вокруг словаря с группами потоков. Обращаясь к "комнате" по ключу, вы либо получаете новую группу потоков, если ранее этой группы не существовало, либо уже имеющуюся группу, если ранее она создавалась. Так вам становится не нужно вручную создавать группы потоков.

Для работы декораторов используется "комната", хранящаяся в синглтоне. Обернутые в декораторы @awaitable и @shoot функции будут выполняться в группах потоков из одной и той же комнаты (по умолчанию - в одной группе потоков под названием "base").

За счет такой компоновки, весь менеджмент потоков происходит "под капотом" и вам больше не нужно задумываться над тем, в каком именно потоке выполнится ваша функция. Она выполнится в том, который раньше всех освободится.

Как работает группа потоков?

Группа потоков - это экземпляр класса ThreadsPool. Импортируем его:

from awaits.pools.threads_pool import ThreadsPool

При инициализации экземпляра будут созданы потоки. Число потоков в группе вы указываете в конструкторе класса:

threads = ThreadsPool(5)

Теперь, когда группа создана, ей можно давать задания, используя метод do():

def function(a, b, c, d=5, e=5):
  return a + b + c + d + e

task = threads.do(function, 1, 2, 3, d=10, e=20)

Первым параметром туда передается функция, которую требуется выполнить, а далее все те же параметры и в том же порядке, как при оригинальном вызове этой функции.

Что тут произошло под капотом? Метод do() создал объект класса Task, передав туда функцию для выполнения и все ее параметры, и положил его в очередь. Объект задачи он вам вернул, чтобы вы могли отслеживать прогресс выполнения и результат. Воркеры из других потоков постоянно ждут появления новых элементов в очереди. Если хоть один из них свободен - он сразу получит вашу задачу и выполнит ее. Если нет, задача будет ждать в очереди освобождения первого воркера.

Как только задача будет выполнена, вы можете получить результат:

# Флаг task.done в положении True свидетельствует о том, что задача выполнена и вы можете получить результат.
while not task.done:
    pass

print(task.result)

Если при выполнении функции случилась ошибка, в объекте задачи атрибут error будет установлен в положение True, а получить экземпляр исключения вы можете из атрибута exception:

def error_function(a, b):
  return a / b

task = threads.do(error_function, 2, 0)

while not task.done:
    pass

if task.error:
  raise task.exception

Что такое "комната"?

Комната (room) - это абстракция над группами потоков, позволяющая отдавать задания разным группам, называя их по именам. По сути это обертка над словарем.

Создадим объект комнаты:

from awaits.threads_pools_room import ThreadsPoolsRoom


room = ThreadsPoolsRoom(5)

Число, передаваемое в конструктор - количество потоков в каждой из групп данной комнаты.

Конкретную группу потоков можно получить, используя синтаксис словаря:

pool = room['some_key']

Поскольку мы впервые обращаемся к комнате по этому ключу, она создаст новый объект класса ThreadsPool и вернет его. При последующих обращениях по этому ключу, она будет возвращать тот же самый объект.

Об объекте задачи

Задача - это объект класса Task. В конструктор объекта первым аргументом передается функция для выполнения, а последующими - ее аргументы:

from awaits.task import Task


def hello_something(something, sign='!'):
  hello_string = f'Hello {something}{sign}'
  print(hello_string)
  return hello_string

task = Task(hello_something, 'world')

В неактивированном состоянии задача просто хранит в себе функцию и ее аргументы. Чтобы выполнить функцию с заданными аргументами, необходимо "вызвать" объект задачи:

task()

Флаг task.done будет установлен в положение True, когда задача будет выполнена. После этого вы можете получить результат выполнения из атрибута result:

while not task.done:
    pass

print(task.result)

Если при выполнении функции случилась ошибка, в объекте задачи атрибут error будет установлен в положение True, а получить экземпляр исключения вы можете из атрибута exception:

def error_function(a, b):
  return a / b

task = threads.do(error_function, 2, 0)

while not task.done:
    pass

if task.error:
  raise task.exception

Декоратор @awaitable

Прочитав документацию выше, вы уже научились создавать группы потоков и комнаты с ними, а также давать потокам на исполнение различные задания. Однако делать даже это вручную не обязательно.

Декоратор @awaitable превращает обычную функцию в корутину, т. е. в функцию, с которой можно работать через await-синтаксис Python. Давайте попробуем создать такую функцию:

from awaits import awaitable


@awaitable
def heavy_math_function(x, y):
  return x * y

При попытке выполнения функции, она будет вести себя как обычная корутина. Однако фактически ее код будет выполняться в группе потоков. Пока код выполняется, управление будет передано в event-loop.

# Проверяем, что это действительно корутина.
print(asyncio.run(heavy_math_function(5, 5)))

"Под капотом" при этом происходит периодический опрос состояния задачи с последующим "засыпанием" (вызовом asyncio.sleep()) на некий промежуток времени. Как только задача выполнена, ее результат возвращается. Если выполнение прервано исключением - оно извлекается из объекта задачи и снова поднимается.

Промежуток, на который функция "засыпает" между опросами о готовности, по умолчанию берется из глобальных настроек библиотеки. При необходимости, вы можете указать его в фабрике декоратора (в секундах):

@awaitable(delay=0.5)
def heavy_math_function(x, y):
  return x * y

Ручное управление вам может быть полезно, к примеру, в случае особо "тяжелых" функций, которые нет смысла опрашивать слишком часто.

Кроме того, отдельным параметром вы можете указать имя группы потоков, в которой вы хотите чтобы выполнялся код. По умолчанию используется группа "base".

@awaitable(pool='gravities')
def heavy_math_function(x, y):
  return x * y

Декоратор @shoot

Этот декоратор проще, чем @awaitable. Обернутая им функция будет просто "отстрелена" в группу потоков, без ожидания результата. Возвращен при этом будет объект класса Task, что позволяет вручную отслеживать статус выполнения.

from awaits import shoot


@shoot
def other_heavy_math_function(x, y):
  return x * y

task = other_heavy_math_function(10, 10)

while not task.done:
    pass

print(task.result)

При необходимости, вы можете указать название группы потоков, в котором хотите, чтобы выполнялась ваша функция:

@shoot(pool='gravities')
def other_heavy_math_function(x, y):
  return x * y

По умолчанию также используется группа "base".

Если основной поток исполнения программы подойдет к концу, "отстреленные" функции могут не успеть выполниться, что может создать у вас ложное впечатление сломанной программы. Не стоит использовать данный декоратор, если для вас это критично.

Настройки

Вы можете настроить параметры по умолчанию самостоятельно. Для этого необходимо вызвать метод set у класса config:

from awaits import config


# Для примера устанавливаем частоту опроса задачи в декораторе @awaitable на значение 0.5 сек.
config.set(delay=10.5)

Данный метод принимает следующие именованные параметры:

pool_size (int) - количество потоков в группе по умолчанию. Важно, чтобы данная настройка была выставлена до выполнения первой задачи. Если не установить этот параметр, он будет равен 10.

delay (int или float) - значение задержки (в секундах) между итерациями опроса завершенности задачи. Используется по умолчанию в декораторе @awaitable. Если не установить это значение вручную, будет использовано число 0.001.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

awaits-0.0.11.tar.gz (19.7 kB view hashes)

Uploaded Source

Built Distribution

awaits-0.0.11-py3-none-any.whl (16.7 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page