Skip to main content

SQLite-backed workflow state persistence and undo history for multi-step human-in-the-loop workflows.

Project description

cjm-workflow-state

Install

pip install cjm_workflow_state

Project Structure

nbs/
├── history.ipynb     # Generic undo history stack for workflow operations.
└── state_store.ipynb # SQLite-backed workflow state storage for persistence across restarts.

Total: 2 notebooks

Module Dependencies

graph LR
    history[history<br/>History]
    state_store[state_store<br/>State Store]

No cross-module dependencies detected.

CLI Reference

No CLI commands found in this project.

Module Overview

Detailed documentation for each module in the project:

History (history.ipynb)

Generic undo history stack for workflow operations.

Import

from cjm_workflow_state.history import (
    DEFAULT_MAX_HISTORY_DEPTH,
    push_history,
    pop_history
)

Functions

def push_history(
    history: List[Dict[str, Any]],  # Current history stack
    snapshot: Dict[str, Any],  # State snapshot to save
    max_depth: int = DEFAULT_MAX_HISTORY_DEPTH,  # Maximum history depth
) -> List[Dict[str, Any]]:  # Updated history stack
    "Push a state snapshot onto the history stack, enforcing max depth."
def pop_history(
    history: List[Dict[str, Any]],  # Current history stack
) -> Optional[Tuple[Dict[str, Any], List[Dict[str, Any]]]]:  # (snapshot, updated_history) or None
    "Pop the most recent snapshot from the history stack."

Variables

DEFAULT_MAX_HISTORY_DEPTH = 50

State Store (state_store.ipynb)

SQLite-backed workflow state storage for persistence across restarts.

Import

from cjm_workflow_state.state_store import (
    SessionSummary,
    SQLiteWorkflowStateStore
)

Classes

@dataclass
class SessionSummary:
    "Lightweight metadata for a single workflow session."
    
    flow_id: str  # Workflow identifier
    session_id: str  # Session identifier
    label: Optional[str]  # Human-readable label, or None to defer to a generator
    current_step: Optional[str]  # Current StepFlow step ID, or None
    created_at: str  # ISO timestamp string from SQLite CURRENT_TIMESTAMP
    updated_at: str  # ISO timestamp string from SQLite CURRENT_TIMESTAMP
    state_size_bytes: int  # Length of state_json column — cheap size hint
class SQLiteWorkflowStateStore:
    def __init__(
        self,
        db_path:Path # Path to SQLite database file
    )
    "SQLite-backed workflow state storage for persistence across restarts."
    
    def __init__(
            self,
            db_path:Path # Path to SQLite database file
        )
        "Initialize the state store and create or migrate the schema."
    
    def get_current_step(
            self,
            flow_id:str, # Workflow identifier
            session_id:str # Session identifier string
        ) -> Optional[str]: # Current step ID, or None if no row exists
        "Get the current step ID for a session."
    
    def set_current_step(
            self,
            flow_id:str, # Workflow identifier
            session_id:str, # Session identifier string
            step_id:str # Step ID to set as current
        ) -> None
        "Set the current step ID for a session, upserting the row if needed."
    
    def get_state(
            self,
            flow_id:str, # Workflow identifier
            session_id:str # Session identifier string
        ) -> Dict[str, Any]: # Full session state dictionary
        "Get the full state dictionary for a session."
    
    def update_state(
            self,
            flow_id:str, # Workflow identifier
            session_id:str, # Session identifier string
            updates:Dict[str, Any] # Top-level state keys to merge in
        ) -> None
        "Merge updates into the session state, upserting the row if needed."
    
    def clear_state(
            self,
            flow_id:str, # Workflow identifier
            session_id:str # Session identifier string
        ) -> None
        "Delete the session row entirely. Idempotent."
    
    def get_step_state(
            self,
            flow_id:str, # Workflow identifier
            session_id:str, # Session identifier string
            step_id:str # Step identifier
        ) -> Dict[str, Any]: # State dictionary for that step
        "Get the state dictionary for a specific step."
    
    def update_step_state(
            self,
            flow_id:str, # Workflow identifier
            session_id:str, # Session identifier string
            step_id:str, # Step identifier
            updates:Dict[str, Any] # Step state keys to merge in
        ) -> None
        "Merge updates into a specific step's state."
    
    def clear_step_state(
            self,
            flow_id:str, # Workflow identifier
            session_id:str, # Session identifier string
            step_id:str # Step identifier
        ) -> None
        "Remove a specific step's state while preserving the rest of the session."
    
    def create_session(
            self,
            flow_id:str, # Workflow identifier
            session_id:Optional[str]=None, # Pre-chosen session ID (auto-generated UUID4 if None)
            label:Optional[str]=None # Optional human-readable label
        ) -> str: # The created session_id
        "Insert a new empty session row and return its session_id."
    
    def session_exists(
            self,
            flow_id:str, # Workflow identifier
            session_id:str # Session identifier string
        ) -> bool: # True if a row exists for the given session
        "Check whether a session row exists."
    
    def get_session_summary(
            self,
            flow_id:str, # Workflow identifier
            session_id:str # Session identifier string
        ) -> Optional[SessionSummary]: # Session metadata, or None if not found
        "Return lightweight metadata for a single session."
    
    def list_sessions(
            self,
            flow_id:str, # Workflow identifier to list sessions for
            order_by:str="updated_at", # Sort column: "updated_at", "created_at", or "label"
            descending:bool=True # Sort direction
        ) -> List[SessionSummary]: # All sessions for this flow, ordered
        "List all sessions for a flow as lightweight SessionSummary records."
    
    def set_session_label(
            self,
            flow_id:str, # Workflow identifier
            session_id:str, # Session identifier string
            label:Optional[str] # New label, or None to clear
        ) -> None
        "Update the session label. No-op if the session does not exist."
    
    def delete_session(
            self,
            flow_id:str, # Workflow identifier
            session_id:str # Session identifier string
        ) -> None
        "Delete a session row. Alias of `clear_state` for symmetry with the session API."

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cjm_workflow_state-0.0.3.tar.gz (12.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

cjm_workflow_state-0.0.3-py3-none-any.whl (12.3 kB view details)

Uploaded Python 3

File details

Details for the file cjm_workflow_state-0.0.3.tar.gz.

File metadata

  • Download URL: cjm_workflow_state-0.0.3.tar.gz
  • Upload date:
  • Size: 12.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for cjm_workflow_state-0.0.3.tar.gz
Algorithm Hash digest
SHA256 1212fb82a6914d2d251ea8a030c267697c00efd69f4f4f9fc02c034158b9f908
MD5 d750bb0fd3632c0b3be44204aada205d
BLAKE2b-256 18fa187e8e067201fe440fd8b37c359cd1429ade09d40660021ddb979e6bd340

See more details on using hashes here.

File details

Details for the file cjm_workflow_state-0.0.3-py3-none-any.whl.

File metadata

File hashes

Hashes for cjm_workflow_state-0.0.3-py3-none-any.whl
Algorithm Hash digest
SHA256 ac1635e3d6e486ae62c193bda2b032b91b8b817341feeb9f144fc4fb89326150
MD5 554774cee71d83edb9323e21d06f79e0
BLAKE2b-256 5b919ffd70649237d0f40192f722673f5e92d267e743f3c58293f21b8cdcc397

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page