Task/workflow toolkit for Temporal + Redis
Project description
r7kit
r7kit — лёгкая обёртка над Temporal + Redis для организации workflow-задач с хранением состояния (payload) в Redis и автоматической сериализацией.
Содержание
- Установка
- Конфигурация
- Базовые понятия
- Определение Workflow
- Запуск worker-процесса
- Запуск workflow из клиента
- CRUD операций над задачей
- Вложенный payload и child-workflow
- Логирование и сериализация
- TTL-удаление задач
Установка
pip install r7kit
Библиотека не поднимает ни Temporal-сервис, ни Redis — предполагается, что они уже есть.
Конфигурация
Можно настроить r7kit через переменные окружения или программно до первого импорта:
from r7kit.config import configure
configure(
redis_url="redis://localhost:6379",
temporal_address="localhost:7233",
stream_default="task_events",
deleted_ttl=60, # TTL после delete (в секундах)
)
Переменные окружения:
REDIS_URLTEMPORAL_ADDRESSR7KIT_STREAMR7KIT_DELETED_TTL
Базовые понятия
Каждому workflow соответствует Redis-задача, которая хранится в HSET task:{uuid} и имеет:
- статус (status)
- временные метки (created_at, updated_at)
- payload (вложенный словарь)
- поток событий (Redis Stream)
Библиотека сериализует payload в orjson + префикс, автоматически декодируя при чтении.
Определение Workflow
from r7kit.decorators import taskflow
from r7kit.base_task_workflow import BaseTaskWorkflow
@taskflow(queue="producer-queue")
class MyFlow(BaseTaskWorkflow):
async def handle(self) -> int:
await self.load_task()
x = self.payload["input"]
await self.patch_task({"status": "processing"})
await self.patch_task({"status": "done", "payload": {"result": x * 2}})
await self.delete_task(ttl=60)
return x * 2
Наследуй
BaseTaskWorkflow, используйload_task,patch_task,delete_taskиself.payload.
Запуск worker-процесса
from r7kit.worker import R7Worker
if __name__ == "__main__":
R7Worker("myapp", queue="producer-queue").start()
Указывается имя пакета, где лежат ваши воркфлоу, и очередь.
Запуск workflow из клиента
from myapp.flows import MyFlow
handle, uuid = await MyFlow.start(payload={"input": 42})
await handle.result()
Или:
from r7kit.workflow_utils import submit_workflow
await submit_workflow(MyFlow, payload={"input": 42})
CRUD операций над задачей
После await self.load_task() доступны:
self.task: объектTaskModelself.payload: словарь
Поддерживаются:
await self.patch_task({"status": "in_progress"})
await self.delete_task(ttl=60)
await self.get_task()
await self.exists() # логическое удаление
Асинхронный контекст для сохранения payload:
async with self.state():
self.payload["x"] = 1
Вложенный payload и child-workflow
Можно запускать дочерние воркфлоу:
result = await self.child("OtherFlow", queue="q").run()
Или:
handle, child_task_id = await self.child("OtherFlow", queue="q").start(payload={...})
Логирование и сериализация
Для логирования:
from r7kit.logging import setup
setup("DEBUG")
Для ручной сериализации (например, вне воркфлоу):
from r7kit.serializer import dumps, loads
TTL-удаление задач
Удаление:
await self.delete_task(ttl=10)
Это:
- Ставит
deleted_at = now - Устанавливает
PEXPIREна Redis-ключ задачи - Логируется в Redis Stream
TTL можно задать через R7KIT_DELETED_TTL или явно в методе.
Пример
@taskflow(queue="producer-queue")
class ProducerWorkflow(BaseTaskWorkflow):
async def handle(self) -> int:
await self.load_task()
x = int(self.payload["mathematic"]["input"])
await self.patch_task({"status": "sent"})
res = await self.child("ProcessorWorkflow", queue="processor-queue").run()
await self.patch_task({"status": "done", "payload": {"mathematic": {"input": x, "result": res}}})
await self.delete_task(ttl=10)
return res
Зависимости
- temporalio
- redis.asyncio
- orjson
- pydantic
Лицензия
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file r7kit-0.2.0.tar.gz.
File metadata
- Download URL: r7kit-0.2.0.tar.gz
- Upload date:
- Size: 17.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6c6bc12eee1f7d81d551c88409f7213b1b19466479501b8ac6d4862f52385606
|
|
| MD5 |
34816142fbec08baffa516d1bc6ef06a
|
|
| BLAKE2b-256 |
e2914bd9c1e8ae05ff8e634284a16328a8c4d5e1bf4ac5df020dea351d4109e8
|
File details
Details for the file r7kit-0.2.0-py3-none-any.whl.
File metadata
- Download URL: r7kit-0.2.0-py3-none-any.whl
- Upload date:
- Size: 22.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8c94f66bb1540487ddc31a56cc89ff630011aa53e4186a3f4e8c31ae39da2f7d
|
|
| MD5 |
4fab34a05e61dc67731f00c6c7e7f78b
|
|
| BLAKE2b-256 |
74b354e6ce802058d6d4504dd9541e33be1d8730bb60932a4ee3100177e71128
|