Task/workflow toolkit for Temporal + Redis
Project description
r7kit — лёгкая обёртка над Temporal + Redis для организации workflow-задач с хранением состояния (payload) в Redis и автоматической сериализацией.
- Установка
- Конфигурация
- Базовые понятия
- Определение Workflow
- Запуск worker-процесса
- Запуск workflow из клиента
- CRUD операций над задачей
- Вложенный payload и child-workflow
- Логирование и сериализация
- TTL-удаление задач
pip install r7kit
Библиотека не поднимает ни Temporal-сервис, ни Redis — предполагается, что они уже есть.
Можно настроить r7kit через переменные окружения или программно до первого импорта:
from r7kit.config import configure
configure(
redis_url="redis://localhost:6379",
temporal_address="localhost:7233",
stream_default="task_events",
deleted_ttl=60, # TTL после delete (в секундах)
)
Переменные окружения:
REDIS_URLTEMPORAL_ADDRESSR7KIT_STREAMR7KIT_DELETED_TTL
Каждому workflow соответствует Redis-задача, которая хранится в HSET task:{uuid} и имеет:
- статус (status)
- временные метки (created_at, updated_at)
- payload (вложенный словарь)
- поток событий (Redis Stream)
Библиотека сериализует payload в orjson + префикс, автоматически декодируя при чтении.
from r7kit.decorators import taskflow
from r7kit.base_task_workflow import BaseTaskWorkflow
@taskflow(queue="producer-queue")
class MyFlow(BaseTaskWorkflow):
async def handle(self) -> int:
await self.load_task()
x = self.payload["input"]
await self.patch_task({"status": "processing"})
await self.patch_task({"status": "done", "payload": {"result": x * 2}})
await self.delete_task(ttl=60)
return x * 2
Наследуй
BaseTaskWorkflow, используйload_task,patch_task,delete_taskиself.payload.
from r7kit.worker import R7Worker
if __name__ == "__main__":
R7Worker("myapp", queue="producer-queue").start()
Указывается имя пакета, где лежат ваши воркфлоу, и очередь.
from myapp.flows import MyFlow
handle, uuid = await MyFlow.start(payload={"input": 42})
await handle.result()
Или:
from r7kit.workflow_utils import submit_workflow
await submit_workflow(MyFlow, payload={"input": 42})
После await self.load_task() доступны:
self.task: объектTaskModelself.payload: словарь
Поддерживаются:
await self.patch_task({"status": "in_progress"})
await self.delete_task(ttl=60)
await self.get_task()
await self.exists() # логическое удаление
Асинхронный контекст для сохранения payload:
async with self.state():
self.payload["x"] = 1
Можно запускать дочерние воркфлоу:
result = await self.child("OtherFlow", queue="q").run()
Или:
handle, child_task_id = await self.child("OtherFlow", queue="q").start(payload={...})
Для логирования:
from r7kit.logging import setup
setup("DEBUG")
Для ручной сериализации (например, вне воркфлоу):
from r7kit.serializer import dumps, loads
Удаление:
await self.delete_task(ttl=10)
Это:
- Ставит
deleted_at = now - Устанавливает
PEXPIREна Redis-ключ задачи - Логируется в Redis Stream
TTL можно задать через R7KIT_DELETED_TTL или явно в методе.
@taskflow(queue="producer-queue")
class ProducerWorkflow(BaseTaskWorkflow):
async def handle(self) -> int:
await self.load_task()
x = int(self.payload["mathematic"]["input"])
await self.patch_task({"status": "sent"})
res = await self.child("ProcessorWorkflow", queue="processor-queue").run()
await self.patch_task({"status": "done", "payload": {"mathematic": {"input": x, "result": res}}})
await self.delete_task(ttl=10)
return res
- temporalio
- redis.asyncio
- orjson
- pydantic
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file r7kit-0.3.tar.gz.
File metadata
- Download URL: r7kit-0.3.tar.gz
- Upload date:
- Size: 20.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f51f054339c2536ec326d9573f02c138a67da2d1c4726733665c323c21569bc9
|
|
| MD5 |
d6042272a03c1b49e4d9de1dc81e185f
|
|
| BLAKE2b-256 |
24e0278c686b5d9358bf4296bcb85235d11918315d669c86fba181c5f0959c0b
|
File details
Details for the file r7kit-0.3-py3-none-any.whl.
File metadata
- Download URL: r7kit-0.3-py3-none-any.whl
- Upload date:
- Size: 24.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b3487101c5a7c9a894112a60e79be7b3831aee2960c390b70e5c7ea46d21f1e2
|
|
| MD5 |
c17eddc8414859767025edd83f5697bb
|
|
| BLAKE2b-256 |
e4e7e442dae8b0113821ae52f059916e7c7274cf0033004226edb6ddc1d4e948
|