Skip to main content

A Django-like ORM with synchronous and asynchronous support

Project description

djanorm

A Django-inspired ORM for Python with full synchronous and asynchronous support. The same API you know from Django, without depending on the full framework.

Features

  • Same API as Django ORMfilter, exclude, get, create, update, delete, Q, F, aggregations, slicing...
  • Native async — every method has an a* variant: acreate, aget, aupdate, adelete...
  • Atomic transactionsdorm.transaction.atomic() / aatomic() with automatic savepoint nesting
  • SQLite (sync via sqlite3, async via aiosqlite)
  • PostgreSQL (sync/async via psycopg, connection pool via psycopg-pool)
  • Migration systemmakemigrations / migrate / rollback, RunSQL / RunPython with reverse hooks
  • CLIdorm command to manage migrations and open a shell (IPython auto-detected)
  • Thread-safe — connections are safe to share across threads; async connections are coroutine-safe
  • Relationship loadingselect_related() with nested paths and prefetch_related() for FK, reverse FK, and M2M
  • Partial loadingonly() / defer() to fetch a subset of columns
  • Abstract model inheritance — share fields and Meta options across models with abstract = True
  • Signalspre_save, post_save, pre_delete, post_delete hooks
  • Validationfull_clean() / clean() / validate_unique() with custom rules
  • DB functionsCase/When, Coalesce, Upper, Lower, Length, Concat, Now, Cast, Abs
  • Set operationsunion(), intersection(), difference() across querysets
  • Default orderingMeta.ordering applied automatically to all queries
  • on_deleteCASCADE, PROTECT, SET_NULL, SET_DEFAULT enforced at Python level
  • Streamingiterator() / aiterator() for memory-efficient row-by-row processing
  • Convenienceget_or_none() / aget_or_none() returns None instead of raising DoesNotExist
  • Efficient bulk insertsbulk_create() uses a single multi-row INSERT per batch

Installation

# SQLite support
pip install "djanorm[sqlite]"

# PostgreSQL support
pip install "djanorm[postgresql]"

# Both
pip install "djanorm[sqlite,postgresql]"

# With uv
uv add "djanorm[sqlite]"
uv add "djanorm[postgresql]"

Setup

There are two ways to configure djanorm depending on how you use it.

Project with migrations (recommended)

Create a settings.py next to your app packages. The dorm CLI reads it automatically — you never call dorm.configure() yourself.

# settings.py
DATABASES = {
    "default": {
        "ENGINE": "sqlite",   # or "postgresql"
        "NAME": "db.sqlite3",
    }
}

That's it. dorm scans the directory of settings.py recursively and registers every Python package that contains a models.py — no INSTALLED_APPS needed.

If you prefer explicit control (e.g. to exclude certain packages, or when app packages live outside the settings directory), you can declare them manually:

# settings.py — explicit override, optional
INSTALLED_APPS = ["blog", "shop", "shop.payments"]

For PostgreSQL, pool size and driver options can be tuned:

DATABASES = {
    "default": {
        "ENGINE": "postgresql",
        "NAME": "my_database",
        "USER": "postgres",
        "PASSWORD": "secret",
        "HOST": "localhost",
        "PORT": 5432,
        "MIN_POOL_SIZE": 1,   # default
        "MAX_POOL_SIZE": 10,  # default
        "OPTIONS": {
            "sslmode": "require",    # passed directly to psycopg
            "connect_timeout": 10,
        },
    }
}

Then run migrations and open a shell:

dorm makemigrations
dorm migrate
dorm shell

See the Migrations section for the full CLI reference.

Programmatic use (scripts and libraries)

If you are using djanorm in a standalone script or embedding it inside another framework — without the dorm CLI — call dorm.configure() at startup instead of using a settings.py file:

import dorm

dorm.configure(
    DATABASES={
        "default": {
            "ENGINE": "sqlite",   # or "postgresql"
            "NAME": "db.sqlite3",
        }
    }
)

Defining models

import dorm

class Author(dorm.Model):
    name   = dorm.CharField(max_length=100)
    email  = dorm.EmailField(unique=True)
    age    = dorm.IntegerField()
    bio    = dorm.TextField(null=True, blank=True)
    active = dorm.BooleanField(default=True)
    joined = dorm.DateTimeField(auto_now_add=True)

    class Meta:
        db_table = "authors"
        ordering = ["name"]     # default sort for all queries


class Book(dorm.Model):
    title     = dorm.CharField(max_length=200)
    author    = dorm.ForeignKey(Author, on_delete=dorm.CASCADE)
    pages     = dorm.IntegerField(default=0)
    published = dorm.BooleanField(default=False)

Available fields

Field Description
AutoField / BigAutoField Auto-increment integer (default PK)
CharField(max_length=) Variable-length string
TextField Unlimited text
IntegerField / BigIntegerField / SmallIntegerField Integers
FloatField Floating point number
DecimalField(max_digits=, decimal_places=) Precise decimal
BooleanField Boolean
DateField / TimeField / DateTimeField Date and time
EmailField String with email validation
URLField / SlugField Specialised strings
UUIDField UUID
JSONField JSON (JSONB on PostgreSQL)
ForeignKey(to, on_delete=) Foreign key
OneToOneField(to, on_delete=) One-to-one relation
ManyToManyField(to) Many-to-many relation

on_delete options

When a referenced object is deleted, djanorm applies the on_delete policy at the Python level (not just via DB constraints):

Constant Behaviour
dorm.CASCADE Delete related objects automatically
dorm.PROTECT Raise dorm.ProtectedError if related objects exist
dorm.SET_NULL Set the FK column to NULL (requires null=True)
dorm.SET_DEFAULT Set the FK column to its default value
dorm.DO_NOTHING Do nothing; rely on DB-level constraints
dorm.RESTRICT Same as PROTECT but enforced only at the DB level
class Article(dorm.Model):
    author = dorm.ForeignKey(Author, on_delete=dorm.PROTECT)
    # Trying to delete an Author that has Articles raises ProtectedError

class Comment(dorm.Model):
    article = dorm.ForeignKey(Article, on_delete=dorm.CASCADE)
    # Deleting an Article also deletes all its Comments

class Profile(dorm.Model):
    author = dorm.ForeignKey(Author, on_delete=dorm.SET_NULL, null=True, blank=True)
    # Deleting an Author sets Profile.author to NULL
from dorm import ProtectedError

try:
    author.delete()
except ProtectedError as e:
    print("Blocked:", e.protected_objects)

Abstract models

Mark a model as abstract = True in its Meta to use it as a mixin. It defines no database table of its own; its fields and Meta options are inherited by concrete subclasses.

class TimestampedModel(dorm.Model):
    created_at = dorm.DateTimeField(auto_now_add=True, null=True)
    updated_at = dorm.DateTimeField(auto_now=True, null=True)

    class Meta:
        abstract = True


class Post(TimestampedModel):
    title = dorm.CharField(max_length=200)
    body  = dorm.TextField()

    class Meta:
        db_table  = "posts"
        ordering  = ["-created_at"]  # overrides abstract ordering if any


class Comment(TimestampedModel):
    text = dorm.CharField(max_length=500)
    # inherits created_at and updated_at columns automatically

Post._meta.fields will include id, created_at, updated_at, title, and body. Meta options (ordering, etc.) defined on the abstract parent are inherited unless the concrete class defines its own.

Meta.ordering

Set ordering on a model to apply a default ORDER BY clause to every query that doesn't call .order_by() explicitly:

class Product(dorm.Model):
    name  = dorm.CharField(max_length=100)
    price = dorm.IntegerField()

    class Meta:
        ordering = ["price"]       # ascending
        # ordering = ["-price"]    # descending
        # ordering = ["name", "-price"]  # multiple fields
# Automatically ordered by price ASC
products = list(Product.objects.all())

# Explicit order_by() overrides Meta.ordering
expensive_first = list(Product.objects.order_by("-price"))

# .order_by() with no arguments clears the default ordering
unordered = list(Product.objects.order_by())

Signals

Signals let you run code automatically before or after a model is saved or deleted, without modifying the model itself.

from dorm.signals import pre_save, post_save, pre_delete, post_delete

# Connect a receiver function
@post_save.connect
def on_author_saved(sender, instance, created, **kwargs):
    if created:
        print(f"New author created: {instance.name}")
    else:
        print(f"Author updated: {instance.name}")

@pre_delete.connect
def on_before_delete(sender, instance, **kwargs):
    print(f"About to delete: {instance}")

You can also connect to signals from a specific sender only:

@post_save.connect_for(Author)
def welcome_new_author(sender, instance, created, **kwargs):
    if created:
        send_welcome_email(instance.email)

Available signals

Signal When fired Extra kwargs
pre_save Before save() / asave() created (bool)
post_save After save() / asave() created (bool)
pre_delete Before delete() / adelete()
post_delete After delete() / adelete()

Validation

full_clean()

Runs all validation checks on a model instance: field-level type validation, custom business rules, and uniqueness constraints. Raises dorm.ValidationError on failure.

author = Author(name="", age=-5, email="not-an-email")
try:
    author.full_clean()
except dorm.ValidationError as e:
    print(e.message)

clean()

Override clean() on the model to add cross-field validation:

class Event(dorm.Model):
    start = dorm.DateTimeField()
    end   = dorm.DateTimeField()

    def clean(self):
        if self.end <= self.start:
            raise dorm.ValidationError("end must be after start")

full_clean() calls clean() automatically.

validate_unique()

Checks that no other row in the database has the same value for any unique=True field.

author = Author(email="alice@example.com")  # already exists
try:
    author.validate_unique()
except dorm.ValidationError as e:
    print("Duplicate:", e.message)

full_clean() calls validate_unique() automatically. It is also checked when you call save() on a new instance.


ManyToManyField

Declare a ManyToManyField to create a junction table automatically. The related manager exposes .add(), .remove(), .set(), .clear(), and .all().

class Tag(dorm.Model):
    name = dorm.CharField(max_length=50, unique=True)


class Article(dorm.Model):
    title = dorm.CharField(max_length=200)
    tags  = dorm.ManyToManyField(Tag, related_name="articles")
article = Article.objects.create(title="Hello World")
python  = Tag.objects.create(name="python")
django  = Tag.objects.create(name="django")

# Add one or more tags
article.tags.add(python)
article.tags.add(python, django)

# Check current tags
tags = list(article.tags.all())   # [Tag(python), Tag(django)]

# Replace all tags at once
article.tags.set([django])        # removes python, keeps django

# Remove a specific tag
article.tags.remove(django)

# Remove all tags
article.tags.clear()

# Reverse relation: all articles for a tag
python_articles = list(python.articles.all())

Prefetching M2M relations

# Two queries: one for articles, one for all related tags
articles = list(Article.objects.prefetch_related("tags"))
for article in articles:
    print([t.name for t in article.tags.all()])  # no extra DB hit

Synchronous operations

Create

# Create and save in one call
author = Author.objects.create(name="Alice", email="alice@example.com", age=30)

# Instantiate then save separately
author = Author(name="Bob", email="bob@example.com", age=25)
author.save()

# get_or_create — returns (instance, created)
author, created = Author.objects.get_or_create(
    email="carol@example.com",
    defaults={"name": "Carol", "age": 28},
)

# update_or_create
author, created = Author.objects.update_or_create(
    email="carol@example.com",
    defaults={"age": 29},
)

# Create many at once
authors = Author.objects.bulk_create([
    Author(name="Dave", email="dave@example.com", age=22),
    Author(name="Eve",  email="eve@example.com",  age=31),
])

Query

# All records (default ordering from Meta.ordering is applied)
authors = Author.objects.all()

# Filter
adults   = Author.objects.filter(age__gte=18)
alices   = Author.objects.filter(name="Alice")
no_email = Author.objects.filter(email__isnull=True)

# Chain filters
result = (
    Author.objects
    .filter(active=True)
    .filter(age__gte=20)
    .order_by("-age")
)

# Exclude
inactive = Author.objects.exclude(active=True)

# Get a single record
author = Author.objects.get(email="alice@example.com")  # raises DoesNotExist or MultipleObjectsReturned

# Get or None — returns None instead of raising DoesNotExist
author = Author.objects.get_or_none(email="alice@example.com")

# First / last
first = Author.objects.order_by("age").first()
last  = Author.objects.order_by("age").last()

# Slicing
top3  = Author.objects.order_by("-age")[:3]
page2 = Author.objects.order_by("name")[10:20]

Lookups

Author.objects.filter(age__exact=30)
Author.objects.filter(age__gt=30)
Author.objects.filter(age__gte=30)
Author.objects.filter(age__lt=30)
Author.objects.filter(age__lte=30)
Author.objects.filter(age__range=(20, 30))

Author.objects.filter(name__contains="li")
Author.objects.filter(name__icontains="li")
Author.objects.filter(name__startswith="Al")
Author.objects.filter(name__endswith="ce")
Author.objects.filter(name__iexact="alice")

Author.objects.filter(bio__isnull=True)
Author.objects.filter(name__in=["Alice", "Bob"])

Author.objects.filter(joined__year=2024)
Author.objects.filter(joined__month=6)

Q objects — complex queries

from dorm import Q

Author.objects.filter(Q(age__lt=18) | Q(age__gt=65))
Author.objects.filter(Q(active=True) & Q(age__gte=18))
Author.objects.filter(~Q(name="Admin"))
Author.objects.filter(Q(active=True) & (Q(age__lt=18) | Q(age__gt=65)))

Count and existence

total    = Author.objects.count()
filtered = Author.objects.filter(active=True).count()
exists   = Author.objects.filter(email="alice@example.com").exists()

Values and value lists

# Returns dicts
rows = Author.objects.values("name", "age")

# Returns tuples
pairs = Author.objects.values_list("name", "age")

# Flat list (single field only)
names = Author.objects.values_list("name", flat=True)

Partial loading — only() and defer()

# Fetch only the listed columns (pk always included)
authors = Author.objects.only("name", "email")

# Fetch all columns except the listed ones
authors = Author.objects.defer("bio")

Update and delete

# Bulk update
n = Author.objects.filter(active=False).update(active=True)

# Bulk delete
count, detail = Author.objects.filter(age__lt=18).delete()

# Instance update
author.age = 31
author.save()
author.save(update_fields=["age"])   # only update specific fields

# Instance delete
author.delete()

# Reload from DB
author.refresh_from_db()

F expressions — reference columns

from dorm import F

# Increment without reading into Python
Author.objects.filter(active=True).update(age=F("age") + 1)

Relationship loading

select_related() — SQL JOIN

Resolves FK and OneToOne relations in a single query using LEFT OUTER JOINs. Supports nested paths with __.

# Single level: Book → Author
books = list(Book.objects.select_related("author"))
for book in books:
    print(book.author.name)  # no extra DB hit

# Nested: Book → Author → Publisher
books = list(Book.objects.select_related("author__publisher"))
for book in books:
    print(book.author.publisher.name)  # no extra DB hit

# Multiple paths at once — duplicate JOINs are deduplicated automatically
books = list(Book.objects.select_related("author", "author__publisher"))

prefetch_related() — batch queries

Runs a separate bulk query per relation and stitches results in Python. Works for forward FK, reverse FK (one-to-many), and ManyToMany.

# Forward FK (same as select_related but via separate query)
books = list(Book.objects.prefetch_related("author"))

# Reverse FK: load all books for each author
authors = list(Author.objects.prefetch_related("book_set"))
for author in authors:
    print([b.title for b in author.book_set.all()])  # no extra DB hit

# ManyToMany
articles = list(Article.objects.prefetch_related("tags"))
for article in articles:
    print([t.name for t in article.tags.all()])      # no extra DB hit

Aggregations and annotations

from dorm import Count, Sum, Avg, Max, Min

# Aggregate across the full queryset
result = Author.objects.aggregate(
    total   = Count("id"),
    avg_age = Avg("age"),
    max_age = Max("age"),
)
# {"total": 42, "avg_age": 29.5, "max_age": 65}

# Annotate each row with a computed value
from dorm import Count
authors = list(
    Author.objects
    .annotate(book_count=Count("id"))   # per-row annotation
    .filter(book_count__gte=2)
)

DB Functions

Use database-level functions inside annotate(), filter(), or update() calls.

Case / When — conditional expressions

from dorm import Case, When, Value

# Classify authors by age
authors = list(
    Author.objects.annotate(
        category=Case(
            When(age__lt=18,  then=Value("minor")),
            When(age__lt=65,  then=Value("adult")),
            default=Value("senior"),
        )
    )
)
for a in authors:
    print(a.name, a.category)

Coalesce — first non-null value

from dorm import Coalesce, Value, F

# Return bio if present, otherwise a fallback string
authors = Author.objects.annotate(
    display_bio=Coalesce(F("bio"), Value("No bio provided"))
)

String functions

from dorm import Upper, Lower, Length, Concat, F, Value

authors = Author.objects.annotate(
    name_upper  = Upper(F("name")),
    name_lower  = Lower(F("name")),
    name_length = Length(F("name")),
    display     = Concat(F("name"), Value(" ("), F("email"), Value(")")),
)

Now — current timestamp

from dorm import Now

# Set updated_at to the current DB timestamp
Author.objects.filter(active=True).update(updated_at=Now())

Cast — type conversion

from dorm import Cast, F

# Cast age to text for a concatenation
authors = Author.objects.annotate(
    age_str=Cast(F("age"), output_field="text")
)

Abs — absolute value

from dorm import Abs, F

Author.objects.annotate(balance_abs=Abs(F("balance")))

Set operations

Combine two querysets from the same model using SQL set operators. The result is a CombinedQuerySet that supports order_by(), count(), slicing, and async iteration.

union()

Returns rows present in either queryset. Deduplicates by default; pass all=True to keep duplicates.

young   = Author.objects.filter(age__lt=30)
seniors = Author.objects.filter(age__gte=65)

# All young or senior authors (no duplicates)
result = list(young.union(seniors))

# Keep duplicates
result = list(young.union(seniors, all=True))

# Multiple querysets at once
result = list(
    Author.objects.filter(age=20).union(
        Author.objects.filter(age=30),
        Author.objects.filter(age=40),
    )
)

# Chain order_by and count on the combined result
ordered = young.union(seniors).order_by("name")
total   = young.union(seniors).count()

intersection()

Returns rows present in both querysets.

active  = Author.objects.filter(active=True)
seniors = Author.objects.filter(age__gte=65)

# Active seniors only
active_seniors = list(active.intersection(seniors))

difference()

Returns rows in the first queryset that are not in the second.

all_authors  = Author.objects.all()
inactive     = Author.objects.filter(active=False)

# All active authors (equivalent to .filter(active=True))
active_only = list(all_authors.difference(inactive))

Streaming with iterator()

By default list(qs) fetches all rows and stores them in an internal cache (_result_cache). Use iterator() to stream rows one by one without buffering — useful for large result sets.

# Iterate without caching
for author in Author.objects.filter(active=True).iterator():
    process(author)

# Optional chunk_size hint (accepted but currently advisory)
for author in Author.objects.all().iterator(chunk_size=500):
    process(author)

# Async variant
async for author in Author.objects.all().aiterator():
    await process_async(author)

iterator() is incompatible with prefetch_related() (which requires buffering all instances first).


Asynchronous operations

Every sync method has an async counterpart prefixed with a:

import asyncio
import dorm

async def main():
    # Create
    author = await Author.objects.acreate(name="Alice", email="alice@example.com", age=30)

    # Get
    author = await Author.objects.aget(email="alice@example.com")
    author = await Author.objects.aget_or_none(email="missing@example.com")  # None

    # get_or_create / update_or_create
    author, created = await Author.objects.aget_or_create(
        email="bob@example.com",
        defaults={"name": "Bob", "age": 25},
    )

    # Count / existence
    total  = await Author.objects.acount()
    exists = await Author.objects.filter(active=True).aexists()

    # First / last
    first = await Author.objects.order_by("age").afirst()
    last  = await Author.objects.order_by("age").alast()

    # Update / delete
    n = await Author.objects.filter(active=False).aupdate(active=True)
    count, _ = await Author.objects.filter(age__lt=18).adelete()

    # Instance save / delete / reload
    author.age = 31
    await author.asave()
    await author.adelete()
    await author.arefresh_from_db()

    # Async iteration
    async for author in Author.objects.filter(active=True).order_by("name"):
        print(author.name)

    # Streaming without cache
    async for author in Author.objects.all().aiterator():
        print(author.name)

    # Bulk async
    objs = [Author(name=f"User{i}", email=f"u{i}@x.com", age=20) for i in range(100)]
    await Author.objects.abulk_create(objs)

    # Async aggregation
    result = await Author.objects.aaggregate(total=dorm.Count("id"), avg=dorm.Avg("age"))

asyncio.run(main())

Transactions

import dorm

# Sync
with dorm.transaction.atomic():
    author = Author.objects.create(name="Alice", age=30)
    Book.objects.create(title="My Book", author_id=author.pk)

# Async
async with dorm.transaction.aatomic():
    author = await Author.objects.acreate(name="Alice", age=30)
    await Book.objects.acreate(title="My Book", author_id=author.pk)

Savepoint nesting

Nested atomic() calls automatically use savepoints. An inner failure rolls back only the inner block; the outer transaction can still commit.

with dorm.transaction.atomic():
    author = Author.objects.create(name="Alice", age=30)
    try:
        with dorm.transaction.atomic():        # creates SAVEPOINT
            Book.objects.create(title="Bad Book", author_id=author.pk)
            raise ValueError("something went wrong")
    except ValueError:
        pass   # inner block rolled back; author still in transaction

# Only Alice is committed; no Book

get_or_create / update_or_create are atomic

Both methods wrap their INSERT in an implicit transaction and handle concurrent inserts safely — if another thread creates the same row first, they return the existing object instead of raising IntegrityError.


Migrations

What is an app?

An app is a Python package (a directory with __init__.py) that groups related models. Each app has its own migrations/ folder.

myproject/
├── settings.py
├── blog/
│   ├── __init__.py
│   ├── models.py
│   └── migrations/
│       └── __init__.py
└── shop/
    ├── __init__.py
    ├── models.py
    └── migrations/
        └── __init__.py

CLI commands

# Detect model changes and generate migration files
dorm makemigrations

# Apply all pending migrations
dorm migrate

# Apply migrations for a specific app only
dorm migrate blog

# Show migration status ([ ] pending, [X] applied)
dorm showmigrations

# Interactive shell (IPython if available — top-level await works out of the box)
dorm shell

# Override settings explicitly
dorm makemigrations --settings=myproject.settings

--settings is optional. dorm resolves settings in order: --settings flag → DORM_SETTINGS env var → settings module in current directory.

Undoing migrations

# Roll back to a specific migration (undoes everything after it)
dorm migrate blog 0002

# Undo all migrations for an app
dorm migrate blog zero

Custom migrations with RunSQL / RunPython

# myapp/migrations/0003_add_score.py
from dorm.migrations.operations import RunSQL, RunPython

def seed(app_label, registry):
    Author = registry["Author"]
    Author.objects.get_or_create(
        email="admin@example.com",
        defaults={"name": "Admin", "age": 0},
    )

def unseed(app_label, registry):
    Author = registry["Author"]
    Author.objects.filter(email="admin@example.com").delete()

operations = [
    RunSQL(
        sql="ALTER TABLE authors ADD COLUMN score INTEGER DEFAULT 0",
        reverse_sql="ALTER TABLE authors DROP COLUMN score",
    ),
    RunPython(code=seed, reverse_code=unseed),
]

Full example

import asyncio
import dorm

dorm.configure(
    DATABASES={"default": {"ENGINE": "sqlite", "NAME": "blog.db"}},
)


# ── Models ─────────────────────────────────────────────────────────────────────

class TimestampedModel(dorm.Model):
    created_at = dorm.DateTimeField(auto_now_add=True, null=True)
    updated_at = dorm.DateTimeField(auto_now=True, null=True)

    class Meta:
        abstract = True


class Tag(dorm.Model):
    name = dorm.CharField(max_length=50, unique=True)

    class Meta:
        db_table = "tags"
        ordering = ["name"]


class Author(dorm.Model):
    name  = dorm.CharField(max_length=100)
    email = dorm.EmailField(unique=True)
    age   = dorm.IntegerField()

    class Meta:
        db_table = "authors"
        ordering = ["name"]


class Post(TimestampedModel):
    title     = dorm.CharField(max_length=200)
    body      = dorm.TextField()
    author    = dorm.ForeignKey(Author, on_delete=dorm.CASCADE)
    published = dorm.BooleanField(default=False)
    views     = dorm.IntegerField(default=0)
    tags      = dorm.ManyToManyField(Tag, related_name="posts")

    class Meta:
        db_table = "posts"
        ordering = ["-created_at"]


# ── Signal ─────────────────────────────────────────────────────────────────────

from dorm.signals import post_save

@post_save.connect_for(Author)
def welcome_author(sender, instance, created, **kwargs):
    if created:
        print(f"Welcome, {instance.name}!")


# ── Sync demo ──────────────────────────────────────────────────────────────────

def sync_demo():
    alice = Author.objects.create(name="Alice", email="alice@example.com", age=30)
    bob   = Author.objects.create(name="Bob",   email="bob@example.com",   age=25)

    p1 = Post.objects.create(title="Hello World", body="...", author=alice, published=True)
    Post.objects.create(title="Draft",  body="...", author=alice)
    Post.objects.create(title="Bob's Post", body="...", author=bob, published=True)

    python_tag = Tag.objects.create(name="python")
    p1.tags.add(python_tag)

    # Nested select_related
    posts = list(Post.objects.select_related("author").filter(published=True))
    print([p.author.name for p in posts])

    # Prefetch M2M
    posts = list(Post.objects.prefetch_related("tags"))
    print([[t.name for t in p.tags.all()] for p in posts])

    # DB functions
    from dorm import Upper, Case, When, Value
    annotated = list(
        Author.objects.annotate(
            name_up  = Upper(dorm.F("name")),
            category = Case(
                When(age__lt=30, then=Value("young")),
                default=Value("experienced"),
            ),
        )
    )
    print(annotated[0].category)

    # Set operations
    young_or_senior = list(
        Author.objects.filter(age__lt=30).union(
            Author.objects.filter(age__gte=65)
        )
    )

    # Stream large results without caching
    for post in Post.objects.iterator():
        _ = post.title

    # Aggregations
    stats = Post.objects.aggregate(
        total     = dorm.Count("id"),
        avg_views = dorm.Avg("views"),
    )
    print(stats)

    # F expression
    Post.objects.filter(published=True).update(views=dorm.F("views") + 1)


# ── Async demo ─────────────────────────────────────────────────────────────────

async def async_demo():
    author = await Author.objects.aget(email="alice@example.com")

    await Post.objects.acreate(
        title="Async Post", body="...", author=author, published=True
    )

    async for post in Post.objects.filter(author=author).order_by("title"):
        print(post.title)

    async for post in Post.objects.all().aiterator():
        print(post.title)

    stats = await Post.objects.aaggregate(total=dorm.Count("id"))
    print(stats)


sync_demo()
asyncio.run(async_demo())

Quick reference

Operation Sync Async
Create objects.create(**kw) await objects.acreate(**kw)
Get one objects.get(**kw) await objects.aget(**kw)
Get or None objects.get_or_none(**kw) await objects.aget_or_none(**kw)
Filter objects.filter(**kw) objects.filter(**kw) + async for
First objects.first() await objects.afirst()
Last objects.last() await objects.alast()
Count objects.count() await objects.acount()
Exists objects.exists() await objects.aexists()
Update objects.update(**kw) await objects.aupdate(**kw)
Delete objects.delete() await objects.adelete()
Save instance instance.save() await instance.asave()
Delete instance instance.delete() await instance.adelete()
Reload instance.refresh_from_db() await instance.arefresh_from_db()
Get or create objects.get_or_create(...) await objects.aget_or_create(...)
Update or create objects.update_or_create(...) await objects.aupdate_or_create(...)
Bulk create objects.bulk_create([...]) await objects.abulk_create([...])
Aggregate objects.aggregate(...) await objects.aaggregate(...)
Values (dicts) objects.values(...) + for await objects.avalues(...)
Values list objects.values_list(...) + for await objects.avalues_list(...)
Partial load objects.only("f1", "f2")
Partial load objects.defer("f1", "f2")
Eager FK load objects.select_related("fk")
Nested FK load objects.select_related("fk__nested")
Batch FK/M2M load objects.prefetch_related("fk")
Stream rows objects.iterator() objects.aiterator()
Union qs1.union(qs2)
Intersection qs1.intersection(qs2)
Difference qs1.difference(qs2)
Validate instance.full_clean()
Full clean instance.clean()
Raw SQL objects.raw(sql, params) await objects.araw(sql, params)
Bulk delete w/ on_delete objects.filter(...).delete() await objects.filter(...).adelete()
FK traversal order objects.order_by("fk__field")

Tier 4 features

Bulk delete respects on_delete

QuerySet.delete() and QuerySet.adelete() now enforce Python-level on_delete policies (CASCADE, PROTECT, SET_NULL, SET_DEFAULT) before executing the bulk SQL DELETE, identical to single-instance instance.delete():

# Deleting a queryset cascades to related objects
deleted, counts = Category.objects.filter(name="old").delete()
# counts: {"myapp.Category": 1, "myapp.Article": 5}

# PROTECT prevents deletion if related objects exist
from dorm.exceptions import ProtectedError
try:
    Publisher.objects.all().delete()
except ProtectedError:
    print("Cannot delete: protected references exist")

order_by() with FK traversal

Sort by fields on related models using double-underscore traversal — dorm automatically adds the required JOIN:

# Sort books by their author's name
books = Book.objects.order_by("author__name")

# Descending
books = Book.objects.order_by("-author__name")

# Multi-level traversal
books = Book.objects.order_by("author__publisher__name", "title")

# Combined with filters
books = Book.objects.filter(published=True).order_by("author__name", "-year")

Field validators

Attach validators to any field. They run when full_clean() is called:

from dorm.validators import MinValueValidator, MaxValueValidator, RegexValidator, validate_email

class Product(dorm.Model):
    name = dorm.CharField(
        max_length=200,
        validators=[MinLengthValidator(3), MaxLengthValidator(50)],
    )
    price = dorm.FloatField(
        validators=[MinValueValidator(0), MaxValueValidator(9999)],
    )
    sku = dorm.CharField(
        max_length=20,
        validators=[RegexValidator(r"^[A-Z]{2}-\d{4}$", "Invalid SKU format")],
    )
    email = dorm.CharField(max_length=100, validators=[validate_email])

Built-in validators:

Validator Purpose
MinValueValidator(n) Value ≥ n
MaxValueValidator(n) Value ≤ n
MinLengthValidator(n) len(value) >= n
MaxLengthValidator(n) len(value) <= n
RegexValidator(pattern, msg) Must match regex
EmailValidator() / validate_email Valid email format

Custom validators are plain callables that raise ValidationError:

def no_spaces(value):
    if " " in value:
        raise dorm.ValidationError("No spaces allowed.")

class Tag(dorm.Model):
    slug = dorm.CharField(max_length=50, validators=[no_spaces])

Raw SQL queries

Execute arbitrary SQL and get back hydrated model instances:

# Sync
results = list(Author.objects.raw('SELECT * FROM "authors" WHERE age > %s', [25]))
for author in results:
    print(author.name, author.age)  # full model instances

# Async
authors = await Author.objects.araw('SELECT * FROM "authors" ORDER BY "name"')

# Async iteration
async for author in Author.objects.raw('SELECT * FROM "authors"'):
    print(author.name)

The query can return any columns — unknown columns are stored as plain attributes. JOINs and computed columns work seamlessly:

results = Author.objects.raw(
    'SELECT a.*, COUNT(b.id) AS book_count '
    'FROM authors a LEFT JOIN books b ON b.author_id = a.id '
    'GROUP BY a.id'
)
for a in results:
    print(a.name, a.book_count)

Connection auto-reconnect

Both SQLite and PostgreSQL backends now perform a health-check before executing each query. If the connection has been dropped (e.g. server restart, idle timeout), dorm transparently reconnects:

  • SQLite sync/async: executes SELECT 1 on the cached connection; recreates it on failure.
  • PostgreSQL pool: passes check=ConnectionPool.check_connection to the psycopg pool, which verifies each connection when it is borrowed.

No application code changes are needed. Long-running processes (workers, daemons) will automatically recover from transient connection failures.

Rename detection in migrations

The MigrationAutodetector can now produce RenameModel and RenameField operations instead of delete+create pairs.

Explicit hints (recommended) — safe, deterministic:

from dorm.migrations.autodetector import MigrationAutodetector

detector = MigrationAutodetector(
    from_state,
    to_state,
    rename_hints={
        "models": {"myapp": {"OldModelName": "NewModelName"}},
        "fields": {"myapp.MyModel": {"old_field": "new_field"}},
    },
)
changes = detector.changes("myapp")

Heuristic auto-detection (opt-in via detect_renames=True):

  • Model rename: detects when a deleted model and a new model share identical field names and types.
  • Field rename: detects when exactly one field is removed and one added within the same model, and both share the same db_type.
detector = MigrationAutodetector(from_state, to_state, detect_renames=True)
changes = detector.changes("myapp")

Database indexes (Meta.indexes)

Define indexes on models using dorm.Index. They are stored in model metadata and can be applied via migration operations:

import dorm
from dorm.indexes import Index

class Article(dorm.Model):
    title = dorm.CharField(max_length=200)
    slug  = dorm.CharField(max_length=200)
    published_at = dorm.DateTimeField()

    class Meta:
        indexes = [
            Index(fields=["slug"], unique=True, name="idx_article_slug"),
            Index(fields=["published_at"], name="idx_article_date"),
            Index(fields=["title", "slug"]),  # auto-named: idx_article_title_slug
        ]

Migration operations:

from dorm.migrations.operations import AddIndex, RemoveIndex
from dorm.indexes import Index

class Migration:
    operations = [
        AddIndex(
            model_name="Article",
            index=Index(fields=["slug"], unique=True, name="idx_article_slug"),
        ),
        RemoveIndex(
            model_name="Article",
            index=Index(fields=["old_col"], name="idx_article_old"),
        ),
    ]

The MigrationAutodetector automatically detects added and removed indexes when comparing two ProjectState objects.

Subquery support (__in=queryset)

Pass a QuerySet directly as the value of an __in lookup. dorm compiles it as an IN (SELECT …) subquery — no intermediate Python list is materialised:

# All books whose author is in the "active authors" set
active_authors = Author.objects.filter(active=True)
books = Book.objects.filter(author__in=active_authors)

# Works with slicing too
top3 = Author.objects.order_by("-rating")[:3]
books = Book.objects.filter(author__in=top3)

# Same-model self-referential subquery
promoted = Author.objects.filter(rank__gte=5)
result = Author.objects.filter(pk__in=promoted)

# Async
books = await Book.objects.filter(author__in=active_authors).all()

Plain list values for __in continue to work unchanged.

squashmigrations command

Combine a range of migrations into a single squashed file to keep your migration history manageable:

# Squash migrations 0001 through 0005 for the "myapp" app
dorm squashmigrations myapp 0001 0005

# With a custom name
dorm squashmigrations myapp 0001 0005 --squashed-name initial_squashed

# Start from the very beginning (omit start_migration)
dorm squashmigrations myapp 0005

The generated file has a replaces list that the executor honours — once all replaced migrations are recorded as applied, the squashed migration is automatically marked applied too. Operations are automatically optimised (e.g. consecutive AddField/RemoveField pairs cancel out, AddField followed by AlterField is merged into a single AddField).

connection.set_autocommit() for long-running processes

Control transaction behaviour on the active connection. Useful for batch jobs and long-running workers where you want each statement to commit individually or where you manage commits manually:

from dorm.db.connection import get_connection, get_async_connection

# ── Sync ──────────────────────────────────────────────────────────────────────
conn = get_connection()

# Enable autocommit — each statement commits immediately
conn.set_autocommit(True)
for row in big_data:
    MyModel.objects.create(**row)

# Disable autocommit and manage transactions manually
conn.set_autocommit(False)
try:
    MyModel.objects.create(name="A")
    MyModel.objects.create(name="B")
    conn.commit()
except Exception:
    conn.rollback()
    raise

# ── Async ─────────────────────────────────────────────────────────────────────
conn = get_async_connection()
await conn.set_autocommit(True)
await MyModel.objects.acreate(name="immediate")
await conn.set_autocommit(False)

await conn.commit()    # manual commit
await conn.rollback()  # manual rollback

set_autocommit() is supported on all backends: SQLite sync/async and PostgreSQL sync/async. On PostgreSQL a dedicated persistent connection (separate from the pool) is used when autocommit is enabled.


Dependencies

Extra Package Purpose
sqlite aiosqlite Async SQLite
postgresql psycopg[binary,pool] Sync/Async PostgreSQL with connection pool

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

djanorm-1.0.0.tar.gz (155.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

djanorm-1.0.0-py3-none-any.whl (77.6 kB view details)

Uploaded Python 3

File details

Details for the file djanorm-1.0.0.tar.gz.

File metadata

  • Download URL: djanorm-1.0.0.tar.gz
  • Upload date:
  • Size: 155.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for djanorm-1.0.0.tar.gz
Algorithm Hash digest
SHA256 6b2f4d5988648fbf9b03f44a2aed46b6dfaab96b0e1b5697d1198e292763931c
MD5 8f696c7124c170927fdb63305297e872
BLAKE2b-256 475ef212ec30d040ff458f73ea7247fe5e6aef1900450ec1b8225ad71ee794c8

See more details on using hashes here.

Provenance

The following attestation bundles were made for djanorm-1.0.0.tar.gz:

Publisher: publish.yml on rroblf01/d-orm

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file djanorm-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: djanorm-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 77.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for djanorm-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 8d73255ef3af8f7b528721863b0482483e98aacd5f534a7af62a8db115b27e00
MD5 672a9c600f520162693f2a8c1355bcf7
BLAKE2b-256 ab210c17f6ddf6b158af60de03bea9685c1a4d743beab7b8417480edf79138cc

See more details on using hashes here.

Provenance

The following attestation bundles were made for djanorm-1.0.0-py3-none-any.whl:

Publisher: publish.yml on rroblf01/d-orm

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page