Skip to main content

Task/workflow toolkit for Temporal + Redis

Project description

r7kit

r7kit — лёгкая обёртка над Temporal + Redis для организации workflow-задач с хранением состояния (payload) в Redis и автоматической сериализацией.


Содержание

  1. Установка
  2. Конфигурация
  3. Базовые понятия
  4. Определение Workflow
  5. Запуск worker-процесса
  6. Запуск workflow из клиента
  7. CRUD операций над задачей
  8. Вложенный payload и child-workflow
  9. Логирование и сериализация
  10. TTL-удаление задач

Установка

pip install r7kit

Библиотека не поднимает ни Temporal-сервис, ни Redis — предполагается, что они уже есть.


Конфигурация

Можно настроить r7kit через переменные окружения или программно до первого импорта:

from r7kit.config import configure

configure(
    redis_url="redis://localhost:6379",
    temporal_address="localhost:7233",
    stream_default="task_events",
    deleted_ttl=60,  # TTL после delete (в секундах)
)

Переменные окружения:

  • REDIS_URL
  • TEMPORAL_ADDRESS
  • R7KIT_STREAM
  • R7KIT_DELETED_TTL

Базовые понятия

Каждому workflow соответствует Redis-задача, которая хранится в HSET task:{uuid} и имеет:

  • статус (status)
  • временные метки (created_at, updated_at)
  • payload (вложенный словарь)
  • поток событий (Redis Stream)

Библиотека сериализует payload в orjson + префикс, автоматически декодируя при чтении.


Определение Workflow

from r7kit.decorators import taskflow
from r7kit.base_task_workflow import BaseTaskWorkflow

@taskflow(queue="producer-queue")
class MyFlow(BaseTaskWorkflow):
    async def handle(self) -> int:
        await self.load_task()
        x = self.payload["input"]
        await self.patch_task({"status": "processing"})
        await self.patch_task({"status": "done", "payload": {"result": x * 2}})
        await self.delete_task(ttl=60)
        return x * 2

Наследуй BaseTaskWorkflow, используй load_task, patch_task, delete_task и self.payload.


Запуск worker-процесса

from r7kit.worker import R7Worker

if __name__ == "__main__":
    R7Worker("myapp", queue="producer-queue").start()

Указывается имя пакета, где лежат ваши воркфлоу, и очередь.


Запуск workflow из клиента

from myapp.flows import MyFlow

handle, uuid = await MyFlow.start(payload={"input": 42})
await handle.result()

Или:

from r7kit.workflow_utils import submit_workflow

await submit_workflow(MyFlow, payload={"input": 42})

CRUD операций над задачей

После await self.load_task() доступны:

  • self.task: объект TaskModel
  • self.payload: словарь

Поддерживаются:

await self.patch_task({"status": "in_progress"})
await self.delete_task(ttl=60)
await self.get_task()
await self.exists()  # логическое удаление

Асинхронный контекст для сохранения payload:

async with self.state():
    self.payload["x"] = 1

Вложенный payload и child-workflow

Можно запускать дочерние воркфлоу:

result = await self.child("OtherFlow", queue="q").run()

Или:

handle, child_task_id = await self.child("OtherFlow", queue="q").start(payload={...})

Логирование и сериализация

Для логирования:

from r7kit.logging import setup
setup("DEBUG")

Для ручной сериализации (например, вне воркфлоу):

from r7kit.serializer import dumps, loads

TTL-удаление задач

Удаление:

await self.delete_task(ttl=10)

Это:

  • Ставит deleted_at = now
  • Устанавливает PEXPIRE на Redis-ключ задачи
  • Логируется в Redis Stream

TTL можно задать через R7KIT_DELETED_TTL или явно в методе.


Пример

@taskflow(queue="producer-queue")
class ProducerWorkflow(BaseTaskWorkflow):
    async def handle(self) -> int:
        await self.load_task()
        x = int(self.payload["mathematic"]["input"])
        await self.patch_task({"status": "sent"})
        res = await self.child("ProcessorWorkflow", queue="processor-queue").run()
        await self.patch_task({"status": "done", "payload": {"mathematic": {"input": x, "result": res}}})
        await self.delete_task(ttl=10)
        return res

Зависимости


Лицензия

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

r7kit-0.2.0.tar.gz (17.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

r7kit-0.2.0-py3-none-any.whl (22.0 kB view details)

Uploaded Python 3

File details

Details for the file r7kit-0.2.0.tar.gz.

File metadata

  • Download URL: r7kit-0.2.0.tar.gz
  • Upload date:
  • Size: 17.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.3

File hashes

Hashes for r7kit-0.2.0.tar.gz
Algorithm Hash digest
SHA256 6c6bc12eee1f7d81d551c88409f7213b1b19466479501b8ac6d4862f52385606
MD5 34816142fbec08baffa516d1bc6ef06a
BLAKE2b-256 e2914bd9c1e8ae05ff8e634284a16328a8c4d5e1bf4ac5df020dea351d4109e8

See more details on using hashes here.

File details

Details for the file r7kit-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: r7kit-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 22.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.3

File hashes

Hashes for r7kit-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 8c94f66bb1540487ddc31a56cc89ff630011aa53e4186a3f4e8c31ae39da2f7d
MD5 4fab34a05e61dc67731f00c6c7e7f78b
BLAKE2b-256 74b354e6ce802058d6d4504dd9541e33be1d8730bb60932a4ee3100177e71128

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page