Skip to main content

Finite State Machine framework with data modes, resource management, and streaming support

Project description

DataKnobs FSM

Finite State Machine framework with data modes, resource management, and streaming support.

Features

  • Multiple APIs: SimpleFSM, AsyncSimpleFSM, and AdvancedFSM for different use cases
  • Data Handling Modes: COPY, REFERENCE, and DIRECT modes for flexible data management
  • Resource Management: Built-in support for databases, files, HTTP services, LLMs, and vector stores
  • Streaming Support: Process large datasets with chunking and backpressure handling
  • Advanced Debugging: Step-by-step execution, breakpoints, and execution hooks
  • Flexible Configuration: YAML/JSON configuration with schema validation
  • Built-in Functions: Library of common validation and transformation functions

Installation

pip install dataknobs-fsm

Quick Start

Simple FSM

from dataknobs_fsm import SimpleFSM
from dataknobs_fsm.core.data_modes import DataHandlingMode

# Define configuration
config = {
    "name": "data_pipeline",
    "states": [
        {"name": "start", "is_start": True},
        {"name": "process"},
        {"name": "end", "is_end": True}
    ],
    "arcs": [
        {
            "from": "start",
            "to": "process",
            "transform": {
                "type": "inline",
                "code": "lambda data, ctx: {**data, 'processed': True}"
            }
        },
        {"from": "process", "to": "end"}
    ]
}

# Create and run FSM
fsm = SimpleFSM(config, data_mode=DataHandlingMode.COPY)
result = fsm.process({"input": "data"})
print(f"Result: {result['data']}")

Advanced FSM with Debugging

from dataknobs_fsm import AdvancedFSM, ExecutionMode
import asyncio

async def debug_example():
    # Create FSM with debug mode
    fsm = AdvancedFSM(
        "config.yaml",
        execution_mode=ExecutionMode.DEBUG
    )

    # Add breakpoint
    fsm.add_breakpoint("process")

    # Create context and run
    context = fsm.create_context({"input": "data"})
    await fsm.run_until_breakpoint(context)
    print(f"Stopped at: {context.current_state}")

    # Continue execution
    await fsm.step(context)

asyncio.run(debug_example())

Examples

The examples/ directory contains comprehensive examples:

Data Processing Examples

  • data_pipeline_example.py - Data validation and transformation pipeline
  • data_validation_pipeline.py - Data quality validation workflow
  • database_etl.py - Complete ETL pipeline with transaction management
  • large_file_processor.py - Memory-efficient large file processing
  • end_to_end_streaming.py - Streaming pipeline demonstration

Advanced Features

  • advanced_debugging.py - Full debugging features demonstration
  • advanced_debugging_simple.py - Simplified debugging example
  • llm_conversation.py - Conversational AI system using FSM

Text Processing

  • normalize_file_example.py - Text file normalization with streaming
  • normalize_file_with_regex.py - Advanced regex transformations
  • test_regex_yaml.py - Testing script for YAML regex configurations

Configuration Examples

  • regex_transforms.yaml - Field transformation workflows
  • regex_workflow.yaml - Pattern extraction and masking configurations

Running Examples

# Navigate to the FSM package
cd packages/fsm

# Run the database ETL example
uv run python examples/database_etl.py

# Run the data processing pipeline
uv run python examples/data_pipeline_example.py

# Run streaming example
uv run python examples/end_to_end_streaming.py

# Run with custom parameters
uv run python examples/database_etl.py --batch-size 500

Data Handling Modes

The FSM framework provides three data handling modes:

  • COPY Mode: Creates deep copies of data for each state, ensuring isolation
  • REFERENCE Mode: Uses lazy loading with optimistic locking for memory efficiency
  • DIRECT Mode: In-place modifications for maximum performance (single-threaded only)
from dataknobs_fsm import SimpleFSM
from dataknobs_fsm.core.data_modes import DataHandlingMode

# Use COPY mode for safety
fsm = SimpleFSM(config, data_mode=DataHandlingMode.COPY)

# Use REFERENCE mode for large datasets
fsm = SimpleFSM(config, data_mode=DataHandlingMode.REFERENCE)

# Use DIRECT mode for performance
fsm = SimpleFSM(config, data_mode=DataHandlingMode.DIRECT)

Documentation

For detailed documentation, see:

Testing

Run the tests with:

cd packages/fsm
uv run pytest tests/ -v

Development

This package is part of the DataKnobs ecosystem. For development setup and guidelines, see the main repository README.

License

Licensed under the same terms as the DataKnobs project.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dataknobs_fsm-0.1.1.tar.gz (1.3 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dataknobs_fsm-0.1.1-py3-none-any.whl (257.9 kB view details)

Uploaded Python 3

File details

Details for the file dataknobs_fsm-0.1.1.tar.gz.

File metadata

  • Download URL: dataknobs_fsm-0.1.1.tar.gz
  • Upload date:
  • Size: 1.3 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.8.24

File hashes

Hashes for dataknobs_fsm-0.1.1.tar.gz
Algorithm Hash digest
SHA256 5a3af24f954f536930caf8ce7d4c3a946ad7fa6e53ccafdbdfa3193af315ccb7
MD5 dc7ceec411930b8a5bfc28c6fca6a859
BLAKE2b-256 c454c101fad526d31c227eef528177d920323fbc53ea7964f2afdd35d1d2c0a9

See more details on using hashes here.

File details

Details for the file dataknobs_fsm-0.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for dataknobs_fsm-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 88a797e1a7ef830f3bfabb222df8057b4e42c67e75daa0e0f939cb4917442df2
MD5 273a3fb0704b80a003a540be8d6692b0
BLAKE2b-256 61eb124ff1a4cfb0e84bc5116c402e09d54353afa79b66e978e35c62c924cfb5

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page