Skip to main content

Declarative finite state machine for SQLAlchemy 2.x models — typed, tested, async-ready.

Project description

PyPI version Python versions CI License: MIT Downloads

sqlalchemy-fsm

Declarative finite state machine for SQLAlchemy models. Add an FSMField column, decorate methods with @transition, and let the library enforce which transitions are reachable from which states.

Requirements

Python 3.10+, SQLAlchemy 2.0+.

Install

pip install sqlalchemy-fsm

Quickstart

import sqlalchemy as sa
from sqlalchemy.orm import declarative_base
from sqlalchemy_fsm import FSMField, transition

Base = declarative_base()


class BlogPost(Base):
    __tablename__ = "blog_post"
    id = sa.Column(sa.Integer, primary_key=True)
    state = sa.Column(FSMField, nullable=False, default="draft")

    @transition(source="draft", target="published")
    def publish(self):
        """Side effects (notifications, cache busts, …) go here. Return value is discarded."""

    @transition(source=["draft", "published"], target="archived")
    def archive(self): ...


post = BlogPost()
post.publish.can_proceed()   # True — we're in 'draft'
post.publish.set()           # state is now 'published'
post.publish.set()           # raises InvalidSourceStateError

source accepts a single state, a list of states, "*" (any state), or None (matches a nullable column's NULL).

Transition API

For a transition decorated as BlogPost.publish:

Expression Returns
BlogPost.publish() SQLAlchemy filter for rows in the transition's target state — use in .filter(...).
BlogPost.publish.is_(True) Same as BlogPost.publish() == True.
post.publish.is_current bool — is the row in the target state?
post.publish.set(*args, **kwargs) Execute the transition. Raises InvalidSourceStateError if the current state isn't allowed, or PreconditionError if a condition returns falsy.
post.publish.can_proceed(*args, **kwargs) bool — would set() succeed right now?

set() mutates the field in memory; commit the session yourself.

Top-level @transition requires an explicit target=. Sub-handlers inside a class-grouped transition inherit it from the enclosing class.

Conditions

conditions= gates the transition. Each callable receives the instance plus any *args / **kwargs forwarded from set() / can_proceed() and must return truthy. All must pass.

def can_publish(instance) -> bool:
    return datetime.now().hour <= 17

class BlogPost(Base):
    ...
    @transition(source="draft", target="published", conditions=[can_publish])
    def publish(self): ...

post.publish.can_proceed()   # evaluates conditions without mutating
post.publish.set()

Keep conditions side-effect-free — can_proceed() runs them too.

Declared states & startup validation

FSMField["a", "b", "c"] declares the closed set of legal states. The package then validates the transition graph at SA mapper-configuration time and raises SetupError on mismatch:

class BlogPost(Base):
    __tablename__ = "blog_post"
    id = sa.Column(sa.Integer, primary_key=True)
    state = sa.Column(
        FSMField["draft", "published", "archived"],
        nullable=False,
        default="draft",
    )

    @transition(source="draft", target="published")
    def publish(self): ...

    @transition(source=["draft", "published"], target="archived")
    def archive(self): ...

Three properties are checked:

  • Correct — every state referenced by a transition is in the declared set. (Catches typos like target="publsihed".)
  • Complete — every declared state is used somewhere (the column's default= counts as a use).
  • Reachable — every declared state is reachable along forward edges from the column's default=. (source="*" wildcards count as edges from every declared state.)

length= is derived as longest_state * 3 for headroom on later renames; override with an explicit length= for a tighter bound.

A typed FSMField[...] column needs a scalar default=<state> so reachability can be evaluated. If the FSM genuinely starts from NULL, either add a sentinel state (e.g. "uninitialized") as the default=, or use plain FSMField (no subscript), which skips validation.

Call sqlalchemy_fsm.validate_fsm(MyModel) to run the check from a test.

Permissions (RBAC)

permissions= gates the transition for authorization, separately from conditions. Permissions run after the source-state check and before conditions. A failing permission raises PermissionDeniedError from set(); can_proceed() returns False. All listed permissions must pass; each receives the instance plus any args forwarded from set() / can_proceed().

from sqlalchemy_fsm.exc import PermissionDeniedError

def is_editor(instance, user=None, **_):
    return getattr(user, "role", None) == "editor"

class Doc(Base):
    ...
    @transition(source="draft", target="published", permissions=[is_editor])
    def publish(self, user=None): ...

doc.publish.can_proceed(user=current_user)
doc.publish.set(user=current_user)   # raises PermissionDeniedError if not allowed

Class-grouped transitions

To branch on source state with different handlers, decorate a class:

@transition(target="published")
class publish:
    @transition(source="draft")
    def from_draft(self, instance):
        instance.published_via = "fresh"

    @transition(source="archived")
    def from_archive(self, instance):
        instance.published_via = "republish"

Invocation is still post.publish.set(); the sub-handler is picked by the current state.

Query helpers

Use the class-bound form inside .filter():

session.query(BlogPost).filter(BlogPost.publish())          # currently 'published'
session.query(BlogPost).filter(~BlogPost.publish())         # everything else

Events

The library emits before_transition and after_transition through SQLAlchemy's event system around every transition. Listeners receive the row, the transition method name, the source and target states, and the *args / **kwargs passed to set() / aset():

from sqlalchemy.event import listens_for

@listens_for(BlogPost, "after_transition")
def audit(instance, transition_name, source, target, args, kwargs):
    log.info(
        "%s: %s -> %s via %s by %s",
        instance.id, source, target, transition_name, kwargs.get("user"),
    )

For class-grouped transitions, transition_name is the outer name ("publish"), not the sub-handler. Remove listeners with sqlalchemy.event.remove(...).

Listeners must be sync. SQLAlchemy's InstanceEvents dispatch is synchronous; an async def listener returns a coroutine nothing awaits, so its body silently doesn't run. Wrap async work in asyncio.create_task(...).

before_transition fires before the handler and before the column is mutated — raising aborts the transition cleanly, state unchanged. after_transition fires after both — raising propagates to the caller but does not roll back the in-memory state. Treat after_transition as best-effort notification, not a transactional gate.

Transition metadata

@transition(custom={...}) attaches a free-form dict that sqlalchemy-fsm ignores but admin UIs, RBAC layers, doc generators etc. can read via Model.attr.meta.custom:

class BlogPost(Base):
    ...
    @transition(
        source="draft", target="published",
        custom={"label": "Publish post", "icon": "rocket", "groups": ["editor"]},
    )
    def publish(self): ...

BlogPost.publish.meta.custom["label"]   # "Publish post"

The dict is copied and frozen at decoration time.

Available transitions

available_transitions(instance, *args, **kwargs) yields the transitions whose source matches the current state AND whose permissions and conditions accept the given args — useful for "what can this user do with this row right now?" UI lists:

from sqlalchemy_fsm import available_transitions

for name, fsm_t in available_transitions(post, user=current_user):
    print(name, fsm_t.meta.target, fsm_t.meta.custom.get("label"))

aavailable_transitions(...) is the async sibling — it awaits acan_proceed on @async_transition and stays sync for the rest. Pass column= on multi-column models to filter to one state machine.

Testing transitions

Every @transition-decorated attribute exposes the raw handler as .fn, for tests that want to call or mock it without going through the state machinery.

Call the handler directly. Bypasses source-state, permission, and condition checks — useful for testing the body's side effects in isolation:

def test_publish_sends_notification(mocker):
    post = BlogPost()
    spy = mocker.spy(notifications, "send")
    BlogPost.publish.fn(post)             # runs body, no guards, no mutation
    spy.assert_called_once_with(post)

.fn is the same callable on the class-bound (BlogPost.publish.fn) and instance-bound (post.publish.fn) wrappers.

Replace the handler with a mock. Reach the underlying descriptor via sqlalchemy_fsm.testing.get_transition(Model, name) and assign .fn:

from sqlalchemy_fsm.testing import get_transition

def test_publish_runs_through_caller(monkeypatch):
    descriptor = get_transition(BlogPost, "publish")
    monkeypatch.setattr(descriptor, "fn", lambda self: None)

    post = BlogPost()
    Service(post).do_publish()
    assert post.state == "published"   # guards + state mutation still ran

The descriptor is the stable patch target: BlogPost.publish rebuilds a thin wrapper on each attribute access (to keep SA filter expressions clean), but the wrapper reads fn from the descriptor each time, so the patch propagates.

Async (SQLAlchemy 2.x AsyncSession)

Two modes, used together or separately:

Sync @transition under AsyncSession. A sync transition only mutates an attribute — it never touches the session — so it works unchanged under an async engine. Call .set(); await the commit.

from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine

engine = create_async_engine("postgresql+asyncpg://…")

async with AsyncSession(engine) as session:
    doc = AsyncDoc()
    session.add(doc)
    doc.publish.set()           # synchronous mutation
    await session.commit()      # async persistence

@async_transition for awaiting inside the handler. Use it when the handler, a condition, or a permission needs to await. The descriptor exposes aset(...) and acan_proceed(...), and instance.<name>() is a coroutine — the async surface mirrors the sync one:

from sqlalchemy_fsm import async_transition

async def is_editor(instance, user=None, **_):
    return await user.has_role("editor")

class AsyncDoc(Base):
    ...
    @async_transition(source="draft", target="published",
                      permissions=[is_editor])
    async def publish(self, user=None):
        await notify_subscribers(self)

await doc.publish.acan_proceed(user=u)   # bool
await doc.publish.aset(user=u)           # executes
doc.publish.is_current                   # sync, no await

Sync callables stay valid inside @async_transition: anything awaitable (coroutine, Task, Future) is resolved, anything else is taken as a value. Mixing sync and async sub-handlers under one class-grouped transition is rejected at decoration time.

The class-bound query helper (AsyncDoc.publish()) is a plain SA expression and composes with select(...).where(...) against an AsyncSession identically to the sync case. Events fire normally; listeners must still be sync (see above).

Alembic integration

sqlalchemy_fsm.extras.alembic renders the legal state set as a CHECK constraint and registers a comparator that detects drift between the model and the database. Install: pip install sqlalchemy-fsm[alembic].

In your env.py:

from sqlalchemy_fsm.extras.alembic import (
    attach_fsm_constraints,
    register_autogenerate_comparator,
)

attach_fsm_constraints(Base)         # accepts a Base / registry / list of classes
register_autogenerate_comparator()   # hook into `alembic revision --autogenerate`

context.configure(target_metadata=Base.metadata, ...)

After this, adding or removing a @transition that changes the state set shows up in the next autogenerated migration as a paired drop_constraint + create_check_constraint on ck_<table>_<col>_fsm.

Call attach_fsm_constraints(Base) alone to get the constraint without drift detection — new tables get the CHECK, but state-set changes on existing tables won't be picked up automatically.

Diagram export

sqlalchemy_fsm.extras.graph renders a model's transition graph as Mermaid / Graphviz DOT / PlantUML source for embedding in docs or rendering to SVG.

from sqlalchemy_fsm.extras.graph import to_mermaid, to_dot, to_plantuml

print(to_mermaid(BlogPost))   # stateDiagram-v2 ... (renders on GitHub)
print(to_dot(BlogPost))       # pipe through `dot -Tsvg`
print(to_plantuml(BlogPost))

source="*" is emitted as a synthetic (any) node (or [*] in PlantUML). Class-grouped transitions are flattened so the rendered edges match runtime dispatch.

Development

pdm install                                 # project + dev deps
pdm run pytest                              # tests
pdm run ruff check ./src ./tests            # lint
pdm run ruff format --check ./src ./tests   # format check
pdm run pyright                             # type check

Releasing

Tagged commits drive releases:

git tag v2.1.0
git push --follow-tags

CI runs the matrix, pdm-backend derives the version from the tag, artifacts are Sigstore-signed and published to TestPyPI then PyPI via OIDC trusted publishing, and a GitHub Release is created with notes from CHANGELOG.md.

Comparison with django-fsm

Same shape — state column plus @transition methods — on SQLAlchemy instead of Django. (django-fsm is archived since 2024; django-fsm-2 is the maintained drop-in fork.)

sqlalchemy-fsm django-fsm
ORM SQLAlchemy 2.x Django
State types String String, int, FK
Declared state set FSMField["a","b","c"] Free-form
Startup graph validation Correctness, completeness, reachability — SetupError at import None — typo'd target= silently assigns
DB constraint CHECK (col IN (...)) via Alembic extra, autogen diff None
@transition kwargs source, target, conditions, permissions, custom + on_error, permission (singular)
Condition signature (instance, *args, **kwargs) forwarded from set() (instance)
Permissions List of callables; receive set() kwargs One: Django perm string or (instance, user) -> bool
Optimistic locking Use SA version_id_col ConcurrentTransitionMixin filters UPDATE by loaded state
Async @async_transition, aset / acan_proceed, mixed sync/async checks None
Events SA before_transition / after_transition (instance, name, source, target, args, kwargs) Django signals pre_transition / post_transition (same payload)
Available-transition helper available_transitions(instance, *args, **kwargs) and async sibling get_available_<field>_transitions(user)
Dynamic target Class-grouped transitions (dispatch by source) RETURN_VALUE(...) / GET_STATE(...)
Proxy class per state None state_choices= swaps __class__
Block direct writes No (obj.state = "x" always works) protected=True
Graph export Pure Python: to_mermaid / to_dot / to_plantuml manage.py graph_transitions (graphviz extra)
Admin n/a FSMAdminMixin, unfold contrib (django-fsm-2)
Introspection helpers iter_transitions, collect_edges get_available_*_transitions(user) on instance

Neither library wraps the handler in a transaction — the caller commits.

Notes on the bigger differences

  • FSMField["a","b","c"] declares the legal set. At mapper_configured time, every source= / target= must be in it, every declared state must be used, and every state must be reachable from default=. target="publsihed" fails at import. Plain FSMField skips validation.
  • Alembic extra emits and diffs ck_<table>_<col>_fsm. django-fsm state lives only in Python; a stray UPDATE from psql can write anything.
  • Kwargs reach every check. post.publish.set(user=u) is forwarded to every permission and condition. django-fsm conditions get only the instance, so threading context means closures.
  • No permission= string — no auth framework to defer to. Pass callables.
  • No on_error= — model failures as an explicit transition, not a side-effect of a raise.
  • Async transitions work under AsyncSession, and sync .set() composes with await session.commit() since it only mutates an attribute.

What this doesn't have

  • RETURN_VALUE / GET_STATE — use class-grouped transitions, or set the attribute in the handler.
  • state_choices= proxy classes.
  • Integer or FK state columns. An enum with __str__ works; ints need a custom SA type.
  • protected=True. Bare attribute writes aren't gated; the CHECK constraint catches bad values at commit.
  • Admin integration.

Migrating from django-fsm

django-fsm sqlalchemy-fsm
FSMField(default="draft", protected=True) sa.Column(FSMField["draft", …], default="draft", nullable=False)
@transition(field=state, source="x", target="y") @transition(source="x", target="y") (column= only if >1 FSMField)
permission="app.publish" permissions=[lambda inst, user=None, **_: user and user.has_perm("app.publish")]
condition(instance) condition(instance, *args, **kwargs)
instance.do_x(); instance.save() instance.do_x.set(); session.commit()
pre_transition / post_transition event.listen(Model, "before_transition" | "after_transition", fn) — listener gets (instance, transition_name, source, target, args, kwargs)
get_available_<field>_transitions(user) available_transitions(instance, user=user) (or aavailable_transitions for async)
custom={"label": …} custom={"label": …} — read via Model.attr.meta.custom
ConcurrentTransitionMixin version_id_col on the mapper, or SELECT … FOR UPDATE
RETURN_VALUE("a", "b") Class-grouped transition with sub-handlers
manage.py graph_transitions print(to_mermaid(Model))

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sqlalchemy_fsm-3.0.0rc1.tar.gz (77.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

sqlalchemy_fsm-3.0.0rc1-py3-none-any.whl (46.8 kB view details)

Uploaded Python 3

File details

Details for the file sqlalchemy_fsm-3.0.0rc1.tar.gz.

File metadata

  • Download URL: sqlalchemy_fsm-3.0.0rc1.tar.gz
  • Upload date:
  • Size: 77.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for sqlalchemy_fsm-3.0.0rc1.tar.gz
Algorithm Hash digest
SHA256 7851206c1d08a402dc0eba9bf81135b04490cb7868467a9f7aaa0fe84211d850
MD5 37dc1cb286d52f0d88ae124c2790f170
BLAKE2b-256 619d42b4afbdee8a71556e811e328e989808bc3980ff6f431fea2b57dbd627f2

See more details on using hashes here.

Provenance

The following attestation bundles were made for sqlalchemy_fsm-3.0.0rc1.tar.gz:

Publisher: release.yml on IljaOrlovs/sqlalchemy-fsm

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file sqlalchemy_fsm-3.0.0rc1-py3-none-any.whl.

File metadata

File hashes

Hashes for sqlalchemy_fsm-3.0.0rc1-py3-none-any.whl
Algorithm Hash digest
SHA256 58c4ca29671e1fbfb45fe859dc6cd04b36002bfbe765048b640e4bfdc0edf935
MD5 0d6735b0a3e20bf72c7a7e50477fb86c
BLAKE2b-256 cae8809c44cad1d16ec7813a8cafeff141adec2bbdf5871a32b193afaf780d74

See more details on using hashes here.

Provenance

The following attestation bundles were made for sqlalchemy_fsm-3.0.0rc1-py3-none-any.whl:

Publisher: release.yml on IljaOrlovs/sqlalchemy-fsm

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page