Skip to main content

Task/workflow toolkit for Temporal + Redis

Project description

r7kit

R7kit — это лёгкий, модульный фреймворк для управления задачами (tasks) и воркфлоу (workflows) с использованием Temporal и Redis.

Он упрощает реализацию pipeline-процессов и сохранение состояний, с полной поддержкой асинхронных activity и потоковых логов.


🚀 Быстрый старт

Создание задачи и запуск workflow:

from r7kit.workflow_utils import submit_workflow

handle = await submit_workflow(
    "my_pipeline.HelloPipeline",
    payload={"user": "Ann"},
)
print("Workflow started:", handle.id)

Получение и обновление задачи:

from r7kit.tasks import get_task, patch_task

task = await get_task(task_id)
await patch_task(task_id, {"status": "running"})

📦 Основные возможности

  • ✅ Создание, обновление, удаление задач в Redis
  • 🔁 Версионирование и TTL ключей
  • 🧠 Поддержка BaseWorkflow и StatefulWorkflow для управления логикой
  • 🔌 Простая интеграция с Temporal SDK (Python)
  • 📜 Сериализация payload с orjson, совместимость и безопасность
  • ⚙️ Конфигурация через переменные окружения или configure()

📁 Структура модулей

Модуль Назначение
activities.py Temporal Activity (создание, патчинг, удаление задач)
tasks.py API для запуска и получения задач
workflow_utils.py Утилиты для запуска и управления воркфлоу
base_workflow.py Базовый класс с task API
stateful_workflow.py Автоматическое сохранение state
redis_client.py Singleton Redis-клиент с reconnect
temporal_client.py Singleton Temporal-клиент
serializer.py Безопасная сериализация payload-ов
config.py Конфигурация (Redis, Temporal адреса)
exceptions.py Исключения: NotFound, Conflict, AlreadyExists

🧪 Пример pipeline

from temporalio import workflow
from r7kit.base_task_workflow import BaseTaskWorkflow

@workflow.defn
class HelloPipeline(BaseTaskWorkflow):
    @workflow.run
    async def run(self, task_id: str):
        await self._run_impl(task_id)

    async def handle(self) -> None:
        await self.patch_task({"status": "started"})
        await workflow.sleep(3)
        await self.patch_task({"status": "done"})

📝 Лицензия

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

r7kit-0.1.9.tar.gz (15.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

r7kit-0.1.9-py3-none-any.whl (20.9 kB view details)

Uploaded Python 3

File details

Details for the file r7kit-0.1.9.tar.gz.

File metadata

  • Download URL: r7kit-0.1.9.tar.gz
  • Upload date:
  • Size: 15.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.3

File hashes

Hashes for r7kit-0.1.9.tar.gz
Algorithm Hash digest
SHA256 4e6138ba79c1a67fd7cac7b589aaf9e4ac6edbdf0c150d85cc90e42be5bce86c
MD5 4181a84c434bc538fcf0e6a3e3196dd7
BLAKE2b-256 c0a7b67aeb7c68b68553c6b2934f5faaa3e21e7d60704eef6424abf1a41a5239

See more details on using hashes here.

File details

Details for the file r7kit-0.1.9-py3-none-any.whl.

File metadata

  • Download URL: r7kit-0.1.9-py3-none-any.whl
  • Upload date:
  • Size: 20.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.3

File hashes

Hashes for r7kit-0.1.9-py3-none-any.whl
Algorithm Hash digest
SHA256 26fe3de3026ec36c52aafce5ed84156a4ed2359ef26cb4b613149cadb2731032
MD5 c1fbf107ec2880d345079f6581b057d5
BLAKE2b-256 642158c1bd0996cce26d4c9068f3fc4c6fb28a5449e70f5b2c47c9729ae04366

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page