Skip to main content

Finite State Machine framework with data modes, resource management, and streaming support

Project description

DataKnobs FSM

Finite State Machine framework with data modes, resource management, and streaming support.

Features

  • Multiple APIs: SimpleFSM, AsyncSimpleFSM, and AdvancedFSM for different use cases
  • Data Handling Modes: COPY, REFERENCE, and DIRECT modes for flexible data management
  • Resource Management: Built-in support for databases, files, HTTP services, and vector stores
  • Streaming Support: Process large datasets with chunking and backpressure handling
  • Advanced Debugging: Step-by-step execution, breakpoints, and execution hooks
  • Flexible Configuration: YAML/JSON configuration with schema validation
  • Built-in Functions: Library of common validation and transformation functions

Installation

pip install dataknobs-fsm

Quick Start

Simple FSM

from dataknobs_fsm import SimpleFSM
from dataknobs_fsm.core.data_modes import DataHandlingMode

# Define configuration
config = {
    "name": "data_pipeline",
    "states": [
        {"name": "start", "is_start": True},
        {"name": "process"},
        {"name": "end", "is_end": True}
    ],
    "arcs": [
        {
            "from": "start",
            "to": "process",
            "transform": {
                "type": "inline",
                "code": "lambda data, ctx: {**data, 'processed': True}"
            }
        },
        {"from": "process", "to": "end"}
    ]
}

# Create and run FSM
fsm = SimpleFSM(config, data_mode=DataHandlingMode.COPY)
result = fsm.process({"input": "data"})
print(f"Result: {result['data']}")

Advanced FSM with Debugging

from dataknobs_fsm import AdvancedFSM, ExecutionMode
import asyncio

async def debug_example():
    # Create FSM with debug mode
    fsm = AdvancedFSM(
        "config.yaml",
        execution_mode=ExecutionMode.DEBUG
    )

    # Add breakpoint
    fsm.add_breakpoint("process")

    # Create context and run
    context = fsm.create_context({"input": "data"})
    await fsm.run_until_breakpoint(context)
    print(f"Stopped at: {context.current_state}")

    # Continue execution
    await fsm.step(context)

asyncio.run(debug_example())

Examples

The examples/ directory contains comprehensive examples:

Data Processing Examples

  • data_pipeline_example.py - Data validation and transformation pipeline
  • data_validation_pipeline.py - Data quality validation workflow
  • database_etl.py - Complete ETL pipeline with transaction management
  • large_file_processor.py - Memory-efficient large file processing
  • end_to_end_streaming.py - Streaming pipeline demonstration

Advanced Features

  • advanced_debugging.py - Full debugging features demonstration
  • advanced_debugging_simple.py - Simplified debugging example

Text Processing

  • normalize_file_example.py - Text file normalization with streaming
  • normalize_file_with_regex.py - Advanced regex transformations
  • test_regex_yaml.py - Testing script for YAML regex configurations

Configuration Examples

  • regex_transforms.yaml - Field transformation workflows
  • regex_workflow.yaml - Pattern extraction and masking configurations

Running Examples

# Navigate to the FSM package
cd packages/fsm

# Run the database ETL example
uv run python examples/database_etl.py

# Run the data processing pipeline
uv run python examples/data_pipeline_example.py

# Run streaming example
uv run python examples/end_to_end_streaming.py

# Run with custom parameters
uv run python examples/database_etl.py --batch-size 500

Data Handling Modes

The FSM framework provides three data handling modes:

  • COPY Mode: Creates deep copies of data for each state, ensuring isolation
  • REFERENCE Mode: Uses lazy loading with optimistic locking for memory efficiency
  • DIRECT Mode: In-place modifications for maximum performance (single-threaded only)
from dataknobs_fsm import SimpleFSM
from dataknobs_fsm.core.data_modes import DataHandlingMode

# Use COPY mode for safety
fsm = SimpleFSM(config, data_mode=DataHandlingMode.COPY)

# Use REFERENCE mode for large datasets
fsm = SimpleFSM(config, data_mode=DataHandlingMode.REFERENCE)

# Use DIRECT mode for performance
fsm = SimpleFSM(config, data_mode=DataHandlingMode.DIRECT)

LLM Integration

For LLM-specific integrations, workflows, and examples, please see the dataknobs-llm package:

  • FSM Integration Module: dataknobs_llm.fsm_integration
  • LLM Workflow Patterns: RAG pipelines, chain-of-thought, multi-agent systems
  • Conversation Examples: FSM-based conversational AI systems
  • Documentation: See packages/llm/README.md for FSM integration guide

The LLM package provides comprehensive LLM abstractions, providers, and FSM integration capabilities.

Documentation

For detailed documentation, see:

Testing

Run the tests with:

cd packages/fsm
uv run pytest tests/ -v

Development

This package is part of the DataKnobs ecosystem. For development setup and guidelines, see the main repository README.

License

Licensed under the same terms as the DataKnobs project.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dataknobs_fsm-0.1.2.tar.gz (1.1 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dataknobs_fsm-0.1.2-py3-none-any.whl (228.1 kB view details)

Uploaded Python 3

File details

Details for the file dataknobs_fsm-0.1.2.tar.gz.

File metadata

  • Download URL: dataknobs_fsm-0.1.2.tar.gz
  • Upload date:
  • Size: 1.1 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.7

File hashes

Hashes for dataknobs_fsm-0.1.2.tar.gz
Algorithm Hash digest
SHA256 7cba18acad46046ae96c11445bff129c92d119b2d3d77b2fa47b4bcf354e2cbf
MD5 719bef1a5374e9f9357aed5c665436c5
BLAKE2b-256 b0fbe4c1887f55cbc196d98750756f51069792382cafabe45a40c95708ae7a5c

See more details on using hashes here.

File details

Details for the file dataknobs_fsm-0.1.2-py3-none-any.whl.

File metadata

File hashes

Hashes for dataknobs_fsm-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 3dc4c9b45c6019824d52884755667063969d60e10f4d8c6f4d97c05e5c990c8b
MD5 c569eff993b6b6135eb6c7c92dd85530
BLAKE2b-256 cc3da0af7745153ff042f25c645a8e302d57664893d4212c8e40e792c2642039

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page