Skip to main content

SQLite-backed workflow state persistence and undo history for multi-step human-in-the-loop workflows.

Project description

cjm-workflow-state

Install

pip install cjm_workflow_state

Project Structure

nbs/
├── history.ipynb     # Generic undo history stack for workflow operations.
└── state_store.ipynb # SQLite-backed workflow state storage for persistence across restarts.

Total: 2 notebooks

Module Dependencies

graph LR
    history[history<br/>History]
    state_store[state_store<br/>State Store]

No cross-module dependencies detected.

CLI Reference

No CLI commands found in this project.

Module Overview

Detailed documentation for each module in the project:

History (history.ipynb)

Generic undo history stack for workflow operations.

Import

from cjm_workflow_state.history import (
    DEFAULT_MAX_HISTORY_DEPTH,
    push_history,
    pop_history
)

Functions

def push_history(
    history: List[Dict[str, Any]],  # Current history stack
    snapshot: Dict[str, Any],  # State snapshot to save
    max_depth: int = DEFAULT_MAX_HISTORY_DEPTH,  # Maximum history depth
) -> List[Dict[str, Any]]:  # Updated history stack
    "Push a state snapshot onto the history stack, enforcing max depth."
def pop_history(
    history: List[Dict[str, Any]],  # Current history stack
) -> Optional[Tuple[Dict[str, Any], List[Dict[str, Any]]]]:  # (snapshot, updated_history) or None
    "Pop the most recent snapshot from the history stack."

Variables

DEFAULT_MAX_HISTORY_DEPTH = 50

State Store (state_store.ipynb)

SQLite-backed workflow state storage for persistence across restarts.

Import

from cjm_workflow_state.state_store import (
    SQLiteWorkflowStateStore
)

Classes

class SQLiteWorkflowStateStore:
    def __init__(
        self,
        db_path: Path  # Path to SQLite database file
    )
    "SQLite-backed workflow state storage for persistence across restarts."
    
    def __init__(
            self,
            db_path: Path  # Path to SQLite database file
        )
        "Initialize the state store and create tables if needed."
    
    def get_current_step(
            self,
            flow_id: str,  # Workflow identifier
            session_id: str  # Session identifier string
        ) -> Optional[str]:  # Current step ID or None
        "Get current step ID for a workflow."
    
    def set_current_step(
            self,
            flow_id: str,  # Workflow identifier
            session_id: str,  # Session identifier string
            step_id: str  # Step ID to set as current
        ) -> None
        "Set current step ID for a workflow."
    
    def get_state(
            self,
            flow_id: str,  # Workflow identifier
            session_id: str  # Session identifier string
        ) -> Dict[str, Any]:  # Workflow state dictionary
        "Get all workflow state."
    
    def update_state(
            self,
            flow_id: str,  # Workflow identifier
            session_id: str,  # Session identifier string
            updates: Dict[str, Any]  # State updates to merge
        ) -> None
        "Update workflow state by merging new values."
    
    def clear_state(
            self,
            flow_id: str,  # Workflow identifier
            session_id: str  # Session identifier string
        ) -> None
        "Clear all workflow state."
    
    def get_step_state(
            self,
            flow_id: str,  # Workflow identifier
            session_id: str,  # Session identifier string
            step_id: str  # Step identifier
        ) -> Dict[str, Any]:  # Step-specific state dictionary
        "Get state for a specific step."
    
    def update_step_state(
            self,
            flow_id: str,  # Workflow identifier
            session_id: str,  # Session identifier string
            step_id: str,  # Step identifier
            updates: Dict[str, Any]  # Step-specific state updates
        ) -> None
        "Update state for a specific step."
    
    def clear_step_state(
            self,
            flow_id: str,  # Workflow identifier
            session_id: str,  # Session identifier string
            step_id: str  # Step identifier
        ) -> None
        "Clear state for a specific step."

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cjm_workflow_state-0.0.2.tar.gz (10.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

cjm_workflow_state-0.0.2-py3-none-any.whl (10.3 kB view details)

Uploaded Python 3

File details

Details for the file cjm_workflow_state-0.0.2.tar.gz.

File metadata

  • Download URL: cjm_workflow_state-0.0.2.tar.gz
  • Upload date:
  • Size: 10.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for cjm_workflow_state-0.0.2.tar.gz
Algorithm Hash digest
SHA256 f5bb897af3731c73190eb2e4ba4d2f99f4baf9c7b74b20bec5af343502533b23
MD5 3811f8d7f28894a683134ef6ff7e3613
BLAKE2b-256 9b6372569f435b0cd691408d7339be0eb749ece35393d70d0004448b4a2cace8

See more details on using hashes here.

File details

Details for the file cjm_workflow_state-0.0.2-py3-none-any.whl.

File metadata

File hashes

Hashes for cjm_workflow_state-0.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 112e2206325c47972f7fb6fca337b059a66473b554a4c7870760d54e842012a9
MD5 94b825e793c3c9923039ddfec9cfc860
BLAKE2b-256 d5c96d9b04c34142ee483b171138d32056e73d316b8f82519531473a77281334

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page