Skip to main content

Python client for pg_tasks

Project description

Python client for pg_tasks

Клиент для pg_tasks.

Предоставляет классы Tasks и Worker для создания рабочих процессов.

Установка

pip install pg_tasks_python

Quickstart

Запустим расширение и создадим таблицу для задачи:

CRAETE EXTENSION pg_tasks;

CREATE TABLE tasks.do_something(
    LIKE tasks.template
        INCLUDING DEFAULTS
        INCLUDING IDENTITY
        INCLUDING INDEXES,
    payload text
);
SELECT tasks.register('tasks', 'do_something');

Опишем модуль на python:

from pg_tasks import Task, Worker
import psycopg


class DoSomething(Task):
    
    def __init__(self, conn_factory):
        super().__init__()
        self.conn_factory = conn_factory
    
    def run(self, id, **kwargs):
        # Представим себе, что здесь полезный код))
        print('DO HARD WORK')
        
        # Подтверждение выполнения задачи
        conn = None
        try:
            conn = self.conn_factory()
            self.finish(conn, task_id=id)
        finally:
            if conn is not None:
                conn.close()


# Композит
do_something = DoSomething(
    lambda: psycopg.connect('dbname=test'),
)

worker = Worker('dbname=test')
worker.add(do_something)

worker.run()

Task

Базовый класс для задач. Метод run должен быть переопределен, и содержать в себе саму задачу. Этот метод будет вызван при получении задач из БД.

Метод run принимает строку из БД в виде **kwargs. Все столбцы, описанные в соответствующей таблице, будут переданы в задачу.

После успешного завершения бизнес-логики задачу необходимо пометить как завершенную методом .finish(), передав в него id задачи и время начала задачи. В большинстве случаев достаточно передать created_at из kwargs.

В случае, когда задачу выполнить невозможно, и нужно повторить ее позже (например, задача должна обратиться к внешнему сервису, который сейчас недоступен), нужно вызвать метод retry, передав в него id задачи и, опционально, время, после которого задача должна быть выполнена.

В случае, когда задачу выполнить невозможно, нужно выполнить метод cancel, передав в него id задачи. Такая задача не будет доступна для повторения другим воркерам, пока не будет сброшена.

Метод reset используется для сброса задач, принимает id задачи. Нужен, в основном, для ручного перезапуска задач при авариях.

Метод clean принимает дату и время, и вычищает все задачи, успешно выполненные до указанной даты.

Все методы принимают первым обязательным аргументом подключение к БД, через которое задача будет работать с БД.

В случае, когда задача работает в транзакции, крайне рекомендуется после завершения бизнес-логики оперировать задачей (подтверждать, отменять и т.д.) в той же транзакции.

Также в методы finish, cancel, и retry можно передать комментарий к задаче в поле comment в виде строки, комментарий будет сохранен в БД.

Метод acquire нужен для вызова из Worker, напрямую он не используется.

Worker

Worker имеет 3 метода - add, remove и run.

Метод add используется для добавления новых типов задач. Вторым аргументом можно задать количество рабочих потоков, по умолчанию - 1. Если воркер уже запущен, то потоки будут запущены сразу, если нет - запуск будет отложен до запуска воркера.

Метод remove используется для остановки определенного типа задач.

Метод run запускает worker. Метод блокирующий, содержит в себе бесконечный цикл.

Внутри run делает несколько вещей.

Для каждого типа задачи запускается один или несколько рабочих потоков. Каждый рабочий поток циклически запрашивает пачку задач и последовательно выполняет их. Если получена пустая пачка, значит, задач к выполнению сейчас нет, и рабочий поток встает на паузу в ожидании уведомления о новой задаче. Если уведомление не приходит в интервал, заданный в задаче, то рабочий поток начинает цикл заново.

Также Worker создает поток-слушатель. Слушатель получает уведомления в канале pg_tasks, и уведомляет рабочие потоки о них. Также слушатель регулярно пингует DB, когда уведомлений нет больше определенного времени.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pg_tasks-0.0.1.tar.gz (10.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pg_tasks-0.0.1-py3-none-any.whl (8.0 kB view details)

Uploaded Python 3

File details

Details for the file pg_tasks-0.0.1.tar.gz.

File metadata

  • Download URL: pg_tasks-0.0.1.tar.gz
  • Upload date:
  • Size: 10.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.10.13

File hashes

Hashes for pg_tasks-0.0.1.tar.gz
Algorithm Hash digest
SHA256 ad5d206057e5ed93cf23ba9a127773982d7e04414b11c16afa5f6dd888257e7b
MD5 d9dda915e793c3825ac4fdf80b12b2d6
BLAKE2b-256 4ea751ae4b9a5901b8450a9459fa9bcecb59b5a53dd60985c1f6e5dd4f3beab8

See more details on using hashes here.

File details

Details for the file pg_tasks-0.0.1-py3-none-any.whl.

File metadata

  • Download URL: pg_tasks-0.0.1-py3-none-any.whl
  • Upload date:
  • Size: 8.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.10.13

File hashes

Hashes for pg_tasks-0.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 f1b1187d559643a84a01edaf9b8ff2a37a7e443a337385003e1ba2fdd8a23a98
MD5 2de4c2c438c04a62ae590361c6908c77
BLAKE2b-256 c73da36f62b435beeb5d7f3b4e6b2ad971cabffaefa7df930ad9324b2ec2fb97

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page