Skip to main content

The concept of MongoDB, SQLAlchemy and Pydantic.

Project description

mongotic

The concept of MongoDB, SQLAlchemy, and Pydantic combined together in one simple and effective solution. It enables you to use SQLAlchemy v2 query syntax with MongoDB, and allows you to define your data models using Pydantic.

Documentation: https://allen2c.github.io/mongotic/

Project management: Issues and epics are tracked with PLANK under .plank/.

Overview

The mongotic library is designed to make working with MongoDB as seamless as possible by using familiar tools and patterns from the SQLAlchemy and Pydantic ecosystems. It provides a consistent and expressive way to interact with MongoDB collections, and utilises Pydantic for validation and data definition.

Features

  • SQLAlchemy v2 API: select(), session.scalars(), ScalarResult — familiar patterns without a SQL database.
  • Rich query operators: logical combinators (or_, and_, not_), null checks, string matching, range, and distinct.
  • Session management: refresh(), merge(), expunge(), expire(), state inspection (.new, .dirty, .deleted).
  • Declarative indexes: define __indexes__ on the model, apply with create_indexes().
  • Bulk Operations: insert(), update(), and delete() statement builders via session.execute(), returning a Result with .rowcount and .inserted_ids.
  • Column projection: select(User.name, User.email) returns lightweight Row results; single-column projection unwraps to plain values via session.scalars().
  • Full async API: mongotic.asyncio — mirrors the sync session on top of pymongo.AsyncMongoClient.
  • Data Validation: Utilise Pydantic's powerful schema definition for data validation and serialisation.
  • Type Checking: Benefit from type checking and autocomplete in IDEs due to static type definitions.
  • Works on standalone MongoDB: No replica set required — no multi-document transaction dependency.

Installation

pip install mongotic

Usage

v0.3.0 breaking change: session.query() has been removed. Use select() + session.scalars() instead.

from typing import Optional, Text

from pydantic import Field

from mongotic import MultipleResultsFound, NotFound, create_engine, delete, select, update
from mongotic.model import MongoBaseModel
from mongotic.orm import sessionmaker


class User(MongoBaseModel):
    __databasename__ = "test_database"
    __tablename__ = "user"

    name: Text = Field(..., max_length=50)
    email: Text = Field(...)
    company: Optional[Text] = Field(None, max_length=50)
    age: Optional[int] = Field(None, ge=0, le=200)


engine = create_engine("mongodb://localhost:27017")
Session = sessionmaker(bind=engine)

# ── Add ──────────────────────────────────────────────────────────────────────
session = Session()
session.add(User(name="Allen Chou", email="allen@example.com", company="Acme", age=30))
session.add_all([
    User(name="Bob", email="bob@example.com", company="Acme", age=25),
    User(name="Carol", email="carol@example.com", company="Acme", age=28),
])
session.commit()

# ── Query ────────────────────────────────────────────────────────────────────
session = Session()

# Fetch all / first
users = session.scalars(select(User)).all()
users = session.scalars(select(User).where(User.age > 18)).all()
users = session.scalars(
    select(User)
    .where(User.company == "Acme")
    .order_by(-User.age)      # descending; use User.age for ascending
    .limit(10)
    .offset(0)
).all()

user = session.scalars(select(User).where(User.email == "allen@example.com")).first()
user = session.get(User, "<object_id_string>")   # PK lookup; returns None if not found

# Strict single-result fetch
try:
    user = session.scalars(select(User).where(User.email == "allen@example.com")).one()
    # raises NotFound if 0 results; raises MultipleResultsFound if 2+ results
except NotFound:
    ...
except MultipleResultsFound:
    ...

user = session.scalars(
    select(User).where(User.email == "allen@example.com")
).one_or_none()
# returns None if 0 results; raises MultipleResultsFound if 2+ results

# Count and existence check
count = session.scalars(select(User).where(User.company == "Acme")).count()
exists = session.scalars(select(User).where(User.company == "Acme")).exists()

# ── Update ───────────────────────────────────────────────────────────────────
session = Session()
user = session.scalars(select(User).where(User.email == "allen@example.com")).first()
user.email = "new.allen@example.com"   # tracked automatically
session.commit()

# ── Delete ───────────────────────────────────────────────────────────────────
session = Session()
user = session.scalars(select(User).where(User.email == "new.allen@example.com")).first()
session.delete(user)
session.commit()

# ── Bulk Operations ──────────────────────────────────────────────────────────
session = Session()
# Bulk update: returns number of modified documents
modified = session.execute(
    update(User).where(User.company == "Acme").values(company="Acme Corp")
)
# Bulk delete: returns number of deleted documents
deleted = session.execute(
    delete(User).where(User.age < 18)
)

# ── Context manager + flush ──────────────────────────────────────────────────
with Session() as session:
    new_user = User(name="Dave", email="dave@example.com", age=35)
    session.add(new_user)
    session.flush()          # writes immediately; new_user._id is now available
    print(new_user._id)
    session.commit()         # alias for flush()

Async usage

mongotic.asyncio mirrors the sync API on pymongo.AsyncMongoClient. See the async documentation for the full reference.

import asyncio
from mongotic.asyncio import create_async_engine, async_sessionmaker
from mongotic import insert, select, update, delete

async_engine = create_async_engine("mongodb://localhost:27017")
AsyncSession  = async_sessionmaker(bind=async_engine)

async def main():
    async with AsyncSession() as session:
        # Bulk insert
        r = await session.execute(
            insert(User).values([
                {"name": "Alice", "email": "alice@example.com", "age": 30},
            ])
        )
        print(r.inserted_ids)   # ["<ObjectId>"]

        # Query
        adults = await session.scalars(select(User).where(User.age >= 18)).all()

        # Column projection — returns Row objects
        names = await session.scalars(select(User.name)).all()

        # Scalar shortcut
        age = await session.scalar(select(User.age).where(User.name == "Alice"))

        # Bulk update / delete
        await session.execute(update(User).where(User.role == "guest").values(role="member"))
        await session.execute(delete(User).where(User.active == False))

asyncio.run(main())

Contributing

TODO

License

This project is licensed under the MIT License — see the LICENSE file for details.

Support

If you encounter any problems or have suggestions, please open an issue, or feel free to reach out directly.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mongotic-0.5.0.tar.gz (17.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mongotic-0.5.0-py3-none-any.whl (20.7 kB view details)

Uploaded Python 3

File details

Details for the file mongotic-0.5.0.tar.gz.

File metadata

  • Download URL: mongotic-0.5.0.tar.gz
  • Upload date:
  • Size: 17.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.3.2 CPython/3.11.14 Darwin/25.3.0

File hashes

Hashes for mongotic-0.5.0.tar.gz
Algorithm Hash digest
SHA256 40d911b8ff2017869939de084dc182422abc2d5fead92c55f8a7385981216d67
MD5 63f963fae76fb949e30d23aa7835ab02
BLAKE2b-256 014d4fd0487de3ff9634120d2b5ae526d54856dde70b9fa493e68ef520e20b21

See more details on using hashes here.

File details

Details for the file mongotic-0.5.0-py3-none-any.whl.

File metadata

  • Download URL: mongotic-0.5.0-py3-none-any.whl
  • Upload date:
  • Size: 20.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.3.2 CPython/3.11.14 Darwin/25.3.0

File hashes

Hashes for mongotic-0.5.0-py3-none-any.whl
Algorithm Hash digest
SHA256 36a8186f9b010bc1f8cb6d0e10167ceb19bbadf01d5da5a3b3b02207bdce8298
MD5 20d66f36e4639bfe1453e106d8c249fb
BLAKE2b-256 28c9a768b3f2891139f426b9432a5ba8168f9b5292ade3c984b316433abc7b64

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page