Skip to main content

Task/workflow toolkit for Temporal + Redis

Project description

r7kit — лёгкая обёртка над Temporal + Redis для организации workflow-задач с хранением состояния (payload) в Redis и автоматической сериализацией.


  1. Установка
  2. Конфигурация
  3. Базовые понятия
  4. Определение Workflow
  5. Запуск worker-процесса
  6. Запуск workflow из клиента
  7. CRUD операций над задачей
  8. Вложенный payload и child-workflow
  9. Логирование и сериализация
  10. TTL-удаление задач

pip install r7kit

Библиотека не поднимает ни Temporal-сервис, ни Redis — предполагается, что они уже есть.


Можно настроить r7kit через переменные окружения или программно до первого импорта:

from r7kit.config import configure

configure(
    redis_url="redis://localhost:6379",
    temporal_address="localhost:7233",
    stream_default="task_events",
    deleted_ttl=60,  # TTL после delete (в секундах)
)

Переменные окружения:

  • REDIS_URL
  • TEMPORAL_ADDRESS
  • R7KIT_STREAM
  • R7KIT_DELETED_TTL

Каждому workflow соответствует Redis-задача, которая хранится в HSET task:{uuid} и имеет:

  • статус (status)
  • временные метки (created_at, updated_at)
  • payload (вложенный словарь)
  • поток событий (Redis Stream)

Библиотека сериализует payload в orjson + префикс, автоматически декодируя при чтении.


from r7kit.decorators import taskflow
from r7kit.base_task_workflow import BaseTaskWorkflow

@taskflow(queue="producer-queue")
class MyFlow(BaseTaskWorkflow):
    async def handle(self) -> int:
        await self.load_task()
        x = self.payload["input"]
        await self.patch_task({"status": "processing"})
        await self.patch_task({"status": "done", "payload": {"result": x * 2}})
        await self.delete_task(ttl=60)
        return x * 2

Наследуй BaseTaskWorkflow, используй load_task, patch_task, delete_task и self.payload.


from r7kit.worker import R7Worker

if __name__ == "__main__":
    R7Worker("myapp", queue="producer-queue").start()

Указывается имя пакета, где лежат ваши воркфлоу, и очередь.


from myapp.flows import MyFlow

handle, uuid = await MyFlow.start(payload={"input": 42})
await handle.result()

Или:

from r7kit.workflow_utils import submit_workflow

await submit_workflow(MyFlow, payload={"input": 42})

После await self.load_task() доступны:

  • self.task: объект TaskModel
  • self.payload: словарь

Поддерживаются:

await self.patch_task({"status": "in_progress"})
await self.delete_task(ttl=60)
await self.get_task()
await self.exists()  # логическое удаление

Асинхронный контекст для сохранения payload:

async with self.state():
    self.payload["x"] = 1

Можно запускать дочерние воркфлоу:

result = await self.child("OtherFlow", queue="q").run()

Или:

handle, child_task_id = await self.child("OtherFlow", queue="q").start(payload={...})

Для логирования:

from r7kit.logging import setup
setup("DEBUG")

Для ручной сериализации (например, вне воркфлоу):

from r7kit.serializer import dumps, loads

Удаление:

await self.delete_task(ttl=10)

Это:

  • Ставит deleted_at = now
  • Устанавливает PEXPIRE на Redis-ключ задачи
  • Логируется в Redis Stream

TTL можно задать через R7KIT_DELETED_TTL или явно в методе.


@taskflow(queue="producer-queue")
class ProducerWorkflow(BaseTaskWorkflow):
    async def handle(self) -> int:
        await self.load_task()
        x = int(self.payload["mathematic"]["input"])
        await self.patch_task({"status": "sent"})
        res = await self.child("ProcessorWorkflow", queue="processor-queue").run()
        await self.patch_task({"status": "done", "payload": {"mathematic": {"input": x, "result": res}}})
        await self.delete_task(ttl=10)
        return res


MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

r7kit-0.2.3.tar.gz (18.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

r7kit-0.2.3-py3-none-any.whl (23.0 kB view details)

Uploaded Python 3

File details

Details for the file r7kit-0.2.3.tar.gz.

File metadata

  • Download URL: r7kit-0.2.3.tar.gz
  • Upload date:
  • Size: 18.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.3

File hashes

Hashes for r7kit-0.2.3.tar.gz
Algorithm Hash digest
SHA256 8180e621c012326b7ad773c9a0f1d6d2be6b7fd185e6436e09a55ee3f8520bb6
MD5 b03f1f592e90b8a64695fe58ce073a5d
BLAKE2b-256 7ca7aeea033e3cb16f22592b8bf71f647d22e44b57a7324267aad4e7579073e8

See more details on using hashes here.

File details

Details for the file r7kit-0.2.3-py3-none-any.whl.

File metadata

  • Download URL: r7kit-0.2.3-py3-none-any.whl
  • Upload date:
  • Size: 23.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.3

File hashes

Hashes for r7kit-0.2.3-py3-none-any.whl
Algorithm Hash digest
SHA256 de8f3c0011aebe60cd393071cdb3e7717c84931d33c45c1d3904cb1ea8c83934
MD5 f7b1fea803cd5aa954dcb7bc9e3c0711
BLAKE2b-256 4490d1277aba60d98ed31d3ccb62de3fa505b95eb9c9e7cd1157fa83a734ecaa

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page