Skip to main content

A small async component engine with events, state machines, and a minimal CLI.

Project description

日本語 | 中文 | Español | Français | हिन्दी | Italiano | Português (BR)

FlexiFlow logo

A lightweight async component engine with events, state machines, and a minimal CLI.

CI PyPI version License: MIT Landing Page

A small async component engine with events, state machines, and a minimal CLI.


Why FlexiFlow?

Most workflow engines are heavyweight, opinionated, and assume you want a DAG runner. FlexiFlow is none of those things. It gives you:

  • Components with declarative rules and pluggable state machines
  • An async event bus with priority, filters, and sequential or concurrent delivery
  • Structured logging with correlation IDs baked in
  • Persistence (JSON for dev, SQLite for production) with snapshot history and pruning
  • A minimal CLI so you can demo and debug without writing a harness
  • Config introspection (explain()) to validate before you run

All in under 2,000 lines of pure Python. No heavy dependencies. No magic.


Install

pip install flexiflow

With optional extras:

pip install flexiflow[reload]   # hot-reload with watchfiles
pip install flexiflow[api]      # FastAPI integration
pip install flexiflow[dev]      # pytest + coverage

Quick Start

CLI

# Register a component and start it
flexiflow register --config examples/config.yaml --start

# Send messages through the state machine
flexiflow handle --config examples/config.yaml confirm --content confirmed
flexiflow handle --config examples/config.yaml complete

# Hot-swap rules at runtime
flexiflow update_rules --config examples/config.yaml examples/new_rules.yaml

Embedded (Python)

from flexiflow.engine import FlexiFlowEngine
from flexiflow.config_loader import ConfigLoader

config = ConfigLoader.load_component_config("config.yaml")
engine = FlexiFlowEngine()

# Register and interact
component = engine.create_component(config)
await engine.handle_message(component.name, "start")
await engine.handle_message(component.name, "confirm", content="confirmed")

You can also set FLEXIFLOW_CONFIG=/path/to/config.yaml and omit --config from the CLI.


API Overview

Event Bus

# Subscribe with priority (1=highest, 5=lowest)
handle = await bus.subscribe("my.event", "my_component", handler, priority=2)

# Publish with delivery mode
await bus.publish("my.event", data, delivery="sequential")   # ordered
await bus.publish("my.event", data, delivery="concurrent")    # parallel

# Cleanup
bus.unsubscribe(handle)
bus.unsubscribe_all("my_component")

Error policies: continue (log and keep going) or raise (fail fast).

State Machines

Built-in message types: start, confirm, cancel, complete, error, acknowledge.

Load custom states via dotted paths:

initial_state: "mypkg.states:MyInitialState"

Or register entire state packs:

states:
  InitialState: "mypkg.states:InitialState"
  Processing: "mypkg.states:ProcessingState"
  Complete: "mypkg.states:CompleteState"
initial_state: InitialState

Observability Events

Event When Payload
engine.component.registered Component registered {component}
component.message.received Message received {component, message}
state.changed State transition {component, from_state, to_state}
event.handler.failed Handler exception (continue mode) {event_name, component_name, exception}

Retry Decorator

from flexiflow.extras.retry import retry_async, RetryConfig

@retry_async(RetryConfig(max_attempts=3, base_delay=0.2, jitter=0.2))
async def my_handler(data):
    ...

Persistence

Feature JSON SQLite
History Overwrites Appends
Retention N/A prune_snapshots_sqlite()
Best for Dev/debugging Production
from flexiflow.extras import save_component, load_snapshot, restore_component

# JSON: save and restore
save_component(component, "state.json")
snapshot = load_snapshot("state.json")
restored = restore_component(snapshot, engine)
import sqlite3
from flexiflow.extras import save_snapshot_sqlite, load_latest_snapshot_sqlite

conn = sqlite3.connect("state.db")
save_snapshot_sqlite(conn, snapshot)
latest = load_latest_snapshot_sqlite(conn, "my_component")

Config Introspection

from flexiflow import explain

result = explain("config.yaml")
if result.is_valid:
    print(result.format())

Error Handling

All exceptions inherit from FlexiFlowError with structured messages (What / Why / Fix / Context):

FlexiFlowError (base)
├── ConfigError      # Configuration validation failures
├── StateError       # State registry/machine errors
├── PersistenceError # JSON/SQLite persistence errors
└── ImportError_     # Dotted path import failures
from flexiflow import FlexiFlowError, StateError

try:
    sm = StateMachine.from_name("BadState")
except StateError as e:
    print(e)  # includes What, Why, Fix, and Context

Examples

See examples/embedded_app/ for a complete working example with custom states, SQLite persistence, observability subscriptions, and retention pruning.


Security & Data Scope

FlexiFlow is a local-first async component engine library.

  • Data accessed: In-process state machine data, optional SQLite persistence for state history
  • Data NOT accessed: No cloud sync. No telemetry. No analytics. No network calls. No authentication
  • Permissions: File system write only for optional SQLite persistence. No elevated permissions

Full policy: SECURITY.md


Scorecard

Category Score
A. Security 10/10
B. Error Handling 10/10
C. Operator Docs 10/10
D. Shipping Hygiene 10/10
E. Identity (soft) 10/10
Overall 50/50

License

MIT -- Copyright (c) 2025-2026 mcp-tool-shop


Built by MCP Tool Shop

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flexiflow-1.0.1.tar.gz (53.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

flexiflow-1.0.1-py3-none-any.whl (37.5 kB view details)

Uploaded Python 3

File details

Details for the file flexiflow-1.0.1.tar.gz.

File metadata

  • Download URL: flexiflow-1.0.1.tar.gz
  • Upload date:
  • Size: 53.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for flexiflow-1.0.1.tar.gz
Algorithm Hash digest
SHA256 0c10b58f3ce23591bfb86fc824403e85c7f5799613edf10e0169803468bfffc8
MD5 c63d10db117f5e841f78cecabf23e01a
BLAKE2b-256 e47c0dda623e79d9e8c131a8abc5edb3c0b636fc351d976f76577f58a0db47a2

See more details on using hashes here.

Provenance

The following attestation bundles were made for flexiflow-1.0.1.tar.gz:

Publisher: publish.yml on mcp-tool-shop-org/flexiflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file flexiflow-1.0.1-py3-none-any.whl.

File metadata

  • Download URL: flexiflow-1.0.1-py3-none-any.whl
  • Upload date:
  • Size: 37.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for flexiflow-1.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 16b1a36b76db5a0ec218d6a065d3e8de41e470917f27e509f4962957c457eb17
MD5 70ad9e33458297483a6efb5151ce3896
BLAKE2b-256 ef377af67e66548dcf487d726206f55a634c7c4ecf59d05e4f01892a4da4edff

See more details on using hashes here.

Provenance

The following attestation bundles were made for flexiflow-1.0.1-py3-none-any.whl:

Publisher: publish.yml on mcp-tool-shop-org/flexiflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page