SQLite-backed workflow state persistence and undo history for multi-step human-in-the-loop workflows.
Project description
cjm-workflow-state
Install
pip install cjm_workflow_state
Project Structure
nbs/
├── history.ipynb # Generic undo history stack for workflow operations.
└── state_store.ipynb # SQLite-backed workflow state storage for persistence across restarts.
Total: 2 notebooks
Module Dependencies
graph LR
history[history<br/>History]
state_store[state_store<br/>State Store]
No cross-module dependencies detected.
CLI Reference
No CLI commands found in this project.
Module Overview
Detailed documentation for each module in the project:
History (history.ipynb)
Generic undo history stack for workflow operations.
Import
from cjm_workflow_state.history import (
DEFAULT_MAX_HISTORY_DEPTH,
push_history,
pop_history
)
Functions
def push_history(
history: List[Dict[str, Any]], # Current history stack
snapshot: Dict[str, Any], # State snapshot to save
max_depth: int = DEFAULT_MAX_HISTORY_DEPTH, # Maximum history depth
) -> List[Dict[str, Any]]: # Updated history stack
"Push a state snapshot onto the history stack, enforcing max depth."
def pop_history(
history: List[Dict[str, Any]], # Current history stack
) -> Optional[Tuple[Dict[str, Any], List[Dict[str, Any]]]]: # (snapshot, updated_history) or None
"Pop the most recent snapshot from the history stack."
Variables
DEFAULT_MAX_HISTORY_DEPTH = 50
State Store (state_store.ipynb)
SQLite-backed workflow state storage for persistence across restarts.
Import
from cjm_workflow_state.state_store import (
SessionSummary,
SQLiteWorkflowStateStore
)
Classes
@dataclass
class SessionSummary:
"Lightweight metadata for a single workflow session."
flow_id: str # Workflow identifier
session_id: str # Session identifier
label: Optional[str] # Human-readable label, or None to defer to a generator
current_step: Optional[str] # Current StepFlow step ID, or None
created_at: str # ISO timestamp string from SQLite CURRENT_TIMESTAMP
updated_at: str # ISO timestamp string from SQLite CURRENT_TIMESTAMP
state_size_bytes: int # Length of state_json column — cheap size hint
class SQLiteWorkflowStateStore:
def __init__(
self,
db_path:Path # Path to SQLite database file
)
"SQLite-backed workflow state storage for persistence across restarts."
def __init__(
self,
db_path:Path # Path to SQLite database file
)
"Initialize the state store and create or migrate the schema."
def get_current_step(
self,
flow_id:str, # Workflow identifier
session_id:str # Session identifier string
) -> Optional[str]: # Current step ID, or None if no row exists
"Get the current step ID for a session."
def set_current_step(
self,
flow_id:str, # Workflow identifier
session_id:str, # Session identifier string
step_id:str # Step ID to set as current
) -> None
"Set the current step ID for a session, upserting the row if needed."
def get_state(
self,
flow_id:str, # Workflow identifier
session_id:str # Session identifier string
) -> Dict[str, Any]: # Full session state dictionary
"Get the full state dictionary for a session."
def update_state(
self,
flow_id:str, # Workflow identifier
session_id:str, # Session identifier string
updates:Dict[str, Any] # Top-level state keys to merge in
) -> None
"Merge updates into the session state, upserting the row if needed."
def clear_state(
self,
flow_id:str, # Workflow identifier
session_id:str # Session identifier string
) -> None
"Delete the session row entirely. Idempotent."
def get_step_state(
self,
flow_id:str, # Workflow identifier
session_id:str, # Session identifier string
step_id:str # Step identifier
) -> Dict[str, Any]: # State dictionary for that step
"Get the state dictionary for a specific step."
def update_step_state(
self,
flow_id:str, # Workflow identifier
session_id:str, # Session identifier string
step_id:str, # Step identifier
updates:Dict[str, Any] # Step state keys to merge in
) -> None
"Merge updates into a specific step's state."
def clear_step_state(
self,
flow_id:str, # Workflow identifier
session_id:str, # Session identifier string
step_id:str # Step identifier
) -> None
"Remove a specific step's state while preserving the rest of the session."
def create_session(
self,
flow_id:str, # Workflow identifier
session_id:Optional[str]=None, # Pre-chosen session ID (auto-generated UUID4 if None)
label:Optional[str]=None # Optional human-readable label
) -> str: # The created session_id
"Insert a new empty session row and return its session_id."
def session_exists(
self,
flow_id:str, # Workflow identifier
session_id:str # Session identifier string
) -> bool: # True if a row exists for the given session
"Check whether a session row exists."
def get_session_summary(
self,
flow_id:str, # Workflow identifier
session_id:str # Session identifier string
) -> Optional[SessionSummary]: # Session metadata, or None if not found
"Return lightweight metadata for a single session."
def list_sessions(
self,
flow_id:str, # Workflow identifier to list sessions for
order_by:str="updated_at", # Sort column: "updated_at", "created_at", or "label"
descending:bool=True # Sort direction
) -> List[SessionSummary]: # All sessions for this flow, ordered
"List all sessions for a flow as lightweight SessionSummary records."
def set_session_label(
self,
flow_id:str, # Workflow identifier
session_id:str, # Session identifier string
label:Optional[str] # New label, or None to clear
) -> None
"Update the session label. No-op if the session does not exist."
def delete_session(
self,
flow_id:str, # Workflow identifier
session_id:str # Session identifier string
) -> None
"Delete a session row. Alias of `clear_state` for symmetry with the session API."
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file cjm_workflow_state-0.0.3.tar.gz.
File metadata
- Download URL: cjm_workflow_state-0.0.3.tar.gz
- Upload date:
- Size: 12.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1212fb82a6914d2d251ea8a030c267697c00efd69f4f4f9fc02c034158b9f908
|
|
| MD5 |
d750bb0fd3632c0b3be44204aada205d
|
|
| BLAKE2b-256 |
18fa187e8e067201fe440fd8b37c359cd1429ade09d40660021ddb979e6bd340
|
File details
Details for the file cjm_workflow_state-0.0.3-py3-none-any.whl.
File metadata
- Download URL: cjm_workflow_state-0.0.3-py3-none-any.whl
- Upload date:
- Size: 12.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ac1635e3d6e486ae62c193bda2b032b91b8b817341feeb9f144fc4fb89326150
|
|
| MD5 |
554774cee71d83edb9323e21d06f79e0
|
|
| BLAKE2b-256 |
5b919ffd70649237d0f40192f722673f5e92d267e743f3c58293f21b8cdcc397
|