Python client for pg_tasks
Project description
Python client for pg_tasks
Клиент для pg_tasks.
Предоставляет классы Tasks и Worker для создания рабочих процессов.
Установка
pip install pg_tasks_python
Quickstart
Запустим расширение и создадим таблицу для задачи:
CRAETE EXTENSION pg_tasks;
CREATE TABLE tasks.do_something(
LIKE tasks.template
INCLUDING DEFAULTS
INCLUDING IDENTITY
INCLUDING INDEXES,
payload text
);
SELECT tasks.register('tasks', 'do_something');
Опишем модуль на python:
from pg_tasks import Task, Worker
import psycopg
class DoSomething(Task):
def __init__(self, conn_factory):
super().__init__()
self.conn_factory = conn_factory
def run(self, id, **kwargs):
# Представим себе, что здесь полезный код))
print('DO HARD WORK')
# Подтверждение выполнения задачи
conn = None
try:
conn = self.conn_factory()
self.finish(conn, task_id=id)
finally:
if conn is not None:
conn.close()
# Композит
do_something = DoSomething(
lambda: psycopg.connect('dbname=test'),
)
worker = Worker('dbname=test')
worker.add(do_something)
worker.run()
Task
Базовый класс для задач. Метод run должен быть переопределен, и содержать
в себе саму задачу. Этот метод будет вызван при получении задач из БД.
Метод run принимает строку из БД в виде **kwargs. Все столбцы, описанные
в соответствующей таблице, будут переданы в задачу.
После успешного завершения бизнес-логики задачу необходимо пометить
как завершенную методом .finish(), передав в него id задачи
и время начала задачи. В большинстве случаев достаточно передать created_at
из kwargs.
В случае, когда задачу выполнить невозможно, и нужно повторить ее позже
(например, задача должна обратиться к внешнему сервису,
который сейчас недоступен), нужно вызвать метод retry, передав в него
id задачи и, опционально, время, после которого задача должна быть выполнена.
В случае, когда задачу выполнить невозможно, нужно выполнить метод cancel,
передав в него id задачи. Такая задача не будет доступна для повторения
другим воркерам, пока не будет сброшена.
Метод reset используется для сброса задач, принимает id задачи. Нужен,
в основном, для ручного перезапуска задач при авариях.
Метод clean принимает дату и время, и вычищает все задачи, успешно
выполненные до указанной даты.
Все методы принимают первым обязательным аргументом подключение к БД, через которое задача будет работать с БД.
В случае, когда задача работает в транзакции, крайне рекомендуется после завершения бизнес-логики оперировать задачей (подтверждать, отменять и т.д.) в той же транзакции.
Также в методы finish, cancel, и retry можно передать комментарий
к задаче в поле comment в виде строки, комментарий будет сохранен в БД.
Метод acquire нужен для вызова из Worker, напрямую он не используется.
Worker
Worker имеет 3 метода - add, remove и run.
Метод add используется для добавления новых типов задач.
Вторым аргументом можно задать количество рабочих потоков, по умолчанию - 1.
Если воркер уже запущен, то потоки будут запущены сразу, если нет - запуск
будет отложен до запуска воркера.
Метод remove используется для остановки определенного типа задач.
Метод run запускает worker. Метод блокирующий,
содержит в себе бесконечный цикл.
Внутри run делает несколько вещей.
Для каждого типа задачи запускается один или несколько рабочих потоков. Каждый рабочий поток циклически запрашивает пачку задач и последовательно выполняет их. Если получена пустая пачка, значит, задач к выполнению сейчас нет, и рабочий поток встает на паузу в ожидании уведомления о новой задаче. Если уведомление не приходит в интервал, заданный в задаче, то рабочий поток начинает цикл заново.
Также Worker создает поток-слушатель. Слушатель получает уведомления в канале pg_tasks, и уведомляет рабочие потоки о них. Также слушатель регулярно пингует DB, когда уведомлений нет больше определенного времени.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pg_tasks-0.0.1.tar.gz.
File metadata
- Download URL: pg_tasks-0.0.1.tar.gz
- Upload date:
- Size: 10.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.10.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ad5d206057e5ed93cf23ba9a127773982d7e04414b11c16afa5f6dd888257e7b
|
|
| MD5 |
d9dda915e793c3825ac4fdf80b12b2d6
|
|
| BLAKE2b-256 |
4ea751ae4b9a5901b8450a9459fa9bcecb59b5a53dd60985c1f6e5dd4f3beab8
|
File details
Details for the file pg_tasks-0.0.1-py3-none-any.whl.
File metadata
- Download URL: pg_tasks-0.0.1-py3-none-any.whl
- Upload date:
- Size: 8.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.10.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f1b1187d559643a84a01edaf9b8ff2a37a7e443a337385003e1ba2fdd8a23a98
|
|
| MD5 |
2de4c2c438c04a62ae590361c6908c77
|
|
| BLAKE2b-256 |
c73da36f62b435beeb5d7f3b4e6b2ad971cabffaefa7df930ad9324b2ec2fb97
|