Skip to main content

A lightweight Object Document Mapper for MongoDB

Project description

Gault

A lightweight Object Document Mapper (ODM) for MongoDB with Python type hints and state tracking.

Features

  • Type-safe MongoDB documents with Python type hints
  • Field aliasing for database column mapping
  • Query operators with Pythonic syntax
  • Async manager for CRUD operations
  • Aggregation pipeline support
  • Automatic state tracking and dirty field detection
  • Persistence tracking

Installation

pip install gault

Quick Start

from gault import Schema, Model, Field, configure, AsyncManager

# Schema: Persistent documents mapped to MongoDB collections
class Person(Schema, collection="people"):
    id: Field[int] = configure(pk=True)
    name: Field[str]
    age: Field[int] = configure(db_alias="person_age")

# Model: Non-persistent data classes (projections, view models, etc.)
class PersonSummary(Model):
    name: Field[str]
    total: Field[int]

# Create manager
manager = AsyncManager(database)

# Query and modify
person = await manager.get(Person, filter=Person.id == 1)
person.age = 43
await manager.save(person, atomic=True)  # Only updates dirty fields

Schema vs Model

  • Schema: Persistent MongoDB collections. Requires collection parameter and gets registered globally.
  • Model: Non-persistent data structures for aggregation projections, view models, or intermediate transformations.

Field Configuration

Fields can be configured with metadata using the configure() function:

class Person(Schema, collection="people"):
    # Primary key field - used for filtering in save() operations
    id: Field[int] = configure(pk=True)

    # Regular field
    name: Field[str]

    # Field with database alias (field name differs from DB column)
    age: Field[int] = configure(db_alias="person_age")

Note: Fields marked with pk=True are used as the filter criteria in save() operations to identify the document for upsert.

Querying with Filters

Gault provides multiple ways to filter documents using type-safe field expressions.

Operator Expressions

Use class fields with comparison operators to build type-safe queries:

# Comparison operators
Person.age == 42          # Equal
Person.age != 30          # Not equal
Person.age < 50           # Less than
Person.age <= 50          # Less than or equal
Person.age > 18           # Greater than
Person.age >= 18          # Greater than or equal
Person.id.in_([1, 2, 3])  # In list
Person.id.nin([4, 5])     # Not in list

# Logical operators
filter = (Person.age >= 18) & (Person.age < 65)  # AND
filter = (Person.name == "Alice") | (Person.name == "Bob")  # OR
filter = ~(Person.age < 18)  # NOT

# Complex expressions
filter = (Person.age >= 18) & ((Person.name == "Alice") | (Person.name == "Bob"))

Pipeline Filters

For advanced queries, use the Pipeline API with aggregation stages:

from gault import Pipeline

# Match and sort
pipeline = Pipeline().match(Person.age >= 18).sort(Person.age.asc())

# Pagination
pipeline = Pipeline().skip(10).take(20)

# Group and aggregate
from gault import Sum
pipeline = (
    Pipeline()
    .match(Person.age >= 18)
    .group(by=Person.name, accumulators={"total": Sum(Person.age)})
)

# Multiple stages
pipeline = (
    Pipeline()
    .match(Person.age >= 18)
    .sort(Person.age.desc())
    .take(10)
)

Raw MongoDB Queries

You can also use raw MongoDB query dictionaries:

# Dict filter
filter = {"age": {"$gte": 18}}

# Raw pipeline stages
pipeline = [
    {"$match": {"age": {"$gte": 18}}},
    {"$sort": {"age": -1}},
    {"$limit": 10}
]

AsyncManager Methods

find(model, filter=None)

Finds a single document matching the filter. Returns None if not found.

Filter types: Operator expression, Pipeline, dict, or list of stages.

# With operator
person = await manager.find(Person, filter=Person.age == 42)

# With pipeline
pipeline = Pipeline().match(Person.age > 30).sort(Person.name.asc())
person = await manager.find(Person, filter=pipeline)

# With dict
person = await manager.find(Person, filter={"age": 42})

get(model, filter=None)

Like find(), but raises NotFound exception if no document is found.

Filter types: Operator expression, Pipeline, dict, or list of stages.

try:
    person = await manager.get(Person, filter=Person.id == 123)
except NotFound:
    print("Person not found")

select(model, filter=None, skip=None, take=None)

Returns an async iterator of documents matching the filter. Supports pagination.

Filter types: Operator expression, Pipeline, dict, or list of stages.

# Operator with in_()
async for person in manager.select(Person, filter=Person.id.in_([1, 2, 3])):
    print(person.name)

# Pipeline
pipeline = Pipeline().match(Person.age >= 18).sort(Person.age.desc())
async for person in manager.select(Person, filter=pipeline, take=10):
    print(person.name)

# Complex filter
filter = (Person.age >= 18) & (Person.age < 65)
async for person in manager.select(Person, filter=filter):
    print(person.name)

insert(instance)

Inserts a new document into the database. Only works with Schema instances.

new_person = Person(id=1, name="Alice", age=30)
await manager.insert(new_person)

save(instance, refresh=False, atomic=False)

Upserts a document using find_one_and_update. Supports atomic updates with dirty field tracking.

  • refresh=False: If True, refreshes the instance with the document returned from the database
  • atomic=False: If True and the instance is already persisted, only updates dirty fields
# Create or update
person = Person(id=1, name="Bob", age=25)
await manager.save(person)

# Later, update only changed fields
person.age = 26
await manager.save(person, atomic=True)  # Only updates 'person_age' field

Persistence and Dirty Fields

Gault tracks the persistence state and modifications of your documents automatically.

Persistence Tracking

When documents are loaded from the database or saved, they are marked as persisted:

# Loaded from DB - automatically marked as persisted
person = await manager.find(Person, filter=Person.id == 1)
assert manager.persistence.is_persisted(person)

# Newly created - not yet persisted
new_person = Person(id=2, name="Charlie", age=35)
assert not manager.persistence.is_persisted(new_person)

# After saving - marked as persisted
await manager.save(new_person)
assert manager.persistence.is_persisted(new_person)

Dirty Field Tracking

Gault snapshots document state and tracks which fields have been modified:

person = await manager.get(Person, filter=Person.id == 1)

# Modify some fields
person.name = "New Name"
person.age = 50

# Check which fields changed
dirty_fields = manager.state_tracker.get_dirty_fields(person)
# dirty_fields == {'name', 'age'}

# Atomic save only updates changed fields
await manager.save(person, atomic=True)

Atomic Updates

When using atomic=True, the save() method generates optimal MongoDB updates:

  • Dirty fields: Updated with $set
  • Unchanged fields: Set with $setOnInsert (only on insert, not update)
  • Primary key fields: Used in the filter

This minimizes race conditions and reduces unnecessary updates.

Low Level API: Pipeline Composition

While the high-level Schema and Model API provides a convenient way to work with MongoDB, Gault also offers a powerful low-level API for building complex aggregation pipelines independently. This API allows you to compose pipelines using a fluent interface without defining Schema or Model classes.

Basic Pipeline Construction

The Pipeline class provides methods for building MongoDB aggregation pipelines programmatically:

from gault import Pipeline
from gault.predicates import Field
from gault.accumulators import Sum

# Build a pipeline
pipeline = (
    Pipeline()
    .match({"status": "active"})
    .sort({"created_at": -1})
    .take(10)
)

# Convert to MongoDB stages
stages = pipeline.build()
# [
#     {"$match": {"status": "active"}},
#     {"$sort": {"created_at": -1}},
#     {"$limit": 10}
# ]

Available Pipeline Stages

Filtering and Matching

# Match with raw dict
Pipeline().match({"age": {"$gte": 18}})

# Match with Field predicates
Pipeline().match(Field("age").gte(18) & Field("status").eq("active"))

Sorting and Pagination

# Sort by field
Pipeline().sort({"name": 1, "age": -1})
Pipeline().sort("name")  # Ascending by default

# Pagination
Pipeline().skip(20).take(10)

# Random sampling
Pipeline().sample(5)

Projection

# Dict-based projection
Pipeline().project({"name": True, "age": True})

# Field-based projection
Pipeline().project(
    Field("name").keep(),
    Field("age").keep(alias="person_age"),
    Field("internal_field").remove()
)

# Expression-based projection
Pipeline().project({"fullName": {"$concat": ["$firstName", " ", "$lastName"]}})

Grouping and Aggregation

# Group with accumulators
from gault.accumulators import Sum, Avg, Count

Pipeline().group(
    {"total": Sum("$amount"), "average": Avg("$score")},
    by="$category"
)

# Group all documents (no grouping key)
Pipeline().group(
    {"count": Count()},
    by=None
)

Field Manipulation

# Add or update fields
Pipeline().set({"computedField": {"$multiply": ["$price", "$quantity"]}})
Pipeline().set_field("status", "processed")

# Remove fields
Pipeline().unset("_id", "internal_field")

Array Operations

# Unwind array field
Pipeline().unwind("$tags")

# With options
Pipeline().unwind(
    "$items",
    include_array_index="item_index",
    preserve_null_and_empty_arrays=True
)

Bucketing

# Manual buckets
Pipeline().bucket(
    by="$age",
    boundaries=[0, 18, 65, 100],
    default="other",
    output={"count": Sum(1)}
)

# Automatic buckets
Pipeline().bucket_auto(
    by="$price",
    buckets=5,
    output={"count": Sum(1), "avgPrice": Avg("$price")}
)

Joins and Lookups

# Simple lookup
Pipeline().lookup(
    OtherModel,
    local_field="user_id",
    foreign_field="_id",
    into="user_data"
)

# Lookup with sub-pipeline
from gault.pipelines import CollectionPipeline

sub_pipeline = CollectionPipeline("orders").match({"status": "completed"})
Pipeline().lookup(sub_pipeline, into="orders")

# Graph lookup for hierarchical data
Pipeline().graph_lookup(
    OtherModel,
    start_with="$reports_to",
    local_field="reports_to",
    foreign_field="employee_id",
    into="reporting_chain",
    max_depth=5
)

Faceted Search

# Multiple aggregations in parallel
Pipeline().facet({
    "count": Pipeline().count("total"),
    "avgPrice": Pipeline().group({"value": Avg("$price")}, by=None),
    "categories": Pipeline().group({"count": Sum(1)}, by="$category")
})

Other Stages

# Count documents
Pipeline().count("total")

# Union with another collection
Pipeline().union_with(OtherModel)

# Replace document
Pipeline().replace_with({"newField": "$existingField"})

# Raw stage (for unsupported operations)
Pipeline().raw({"$customStage": {"option": "value"}})

Pipeline Composition

Pipelines are immutable and chainable, making composition elegant:

# Build pipelines incrementally
base = Pipeline().match({"type": "user"})
active_users = base.match({"status": "active"})
premium_users = active_users.match({"plan": "premium"})

# Use pipe() for custom transformations
def add_pagination(p: Pipeline, page: int, size: int) -> Pipeline:
    return p.skip(page * size).take(size)

pipeline = Pipeline().match({"status": "active"}).pipe(add_pagination, 2, 20)

Working with Field References

The low-level API provides Field for building queries without Schema classes:

from gault.predicates import Field

# Field predicates
query = Field("age").gte(18) & Field("country").in_(["US", "CA"])
Pipeline().match(query)

# Field references in expressions
Pipeline().project({
    "fullName": {"$concat": [Field("firstName"), " ", Field("lastName")]}
})

Using with AsyncManager

You can use low-level pipelines with AsyncManager by passing them directly:

from gault import AsyncManager

manager = AsyncManager(database)

# Pass pipeline to manager methods
pipeline = Pipeline().match({"status": "active"}).sort({"created_at": -1})
results = await manager.select(MyModel, filter=pipeline)

# Or build stages manually
stages = pipeline.build()
cursor = database["collection"].aggregate(stages)

In-Memory Pipeline Testing

Use Pipeline.documents() to work with in-memory data:

# Create pipeline with test data
pipeline = Pipeline.documents(
    {"id": 1, "name": "Alice", "age": 30},
    {"id": 2, "name": "Bob", "age": 25},
    {"id": 3, "name": "Charlie", "age": 35}
).match(Field("age").gte(30))

stages = pipeline.build()
# [
#     {"$documents": [{"id": 1, ...}, {"id": 2, ...}, {"id": 3, ...}]},
#     {"$match": {"age": {"$gte": 30}}}
# ]

Accumulators

Gault provides accumulator classes for use in group() and bucket() stages:

from gault.accumulators import (
    AddToSet, Avg, Bottom, BottomN, Count, First, Last,
    Max, Min, Push, Sum, Top, TopN
)

Pipeline().group(
    {
        "total": Sum("$amount"),
        "average": Avg("$score"),
        "unique_tags": AddToSet("$tag"),
        "all_items": Push("$item"),
        "highest": Max("$value"),
        "lowest": Min("$value"),
        "first_seen": First("$timestamp"),
        "last_seen": Last("$timestamp")
    },
    by="$category"
)

Expression Operators

For complex expressions, Gault provides numerous expression operators:

from gault.expressions import Concat, Multiply, Cond, IfNull

Pipeline().project({
    "fullName": Concat(Field("firstName"), " ", Field("lastName")),
    "totalPrice": Multiply(Field("price"), Field("quantity")),
    "displayName": IfNull(Field("nickname"), Field("firstName")),
    "status": Cond(
        Field("active").eq(True),
        "Active",
        "Inactive"
    )
})

Requirements

  • Python >= 3.12
  • PyMongo >= 4.15.4

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

gault-0.12.3.tar.gz (36.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

gault-0.12.3-py3-none-any.whl (41.5 kB view details)

Uploaded Python 3

File details

Details for the file gault-0.12.3.tar.gz.

File metadata

  • Download URL: gault-0.12.3.tar.gz
  • Upload date:
  • Size: 36.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.11 {"installer":{"name":"uv","version":"0.9.11"},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for gault-0.12.3.tar.gz
Algorithm Hash digest
SHA256 994a5115afb8e32159ea9df85f63a933f6a84f64ff06d2918ffb3b78adfc0f6c
MD5 68cb86640d7653b98e9b9b6cf61a97ba
BLAKE2b-256 d02de21ef4be0e46130b522a041014025af0f6573ff3473e4a96d8a5dbc70836

See more details on using hashes here.

File details

Details for the file gault-0.12.3-py3-none-any.whl.

File metadata

  • Download URL: gault-0.12.3-py3-none-any.whl
  • Upload date:
  • Size: 41.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.11 {"installer":{"name":"uv","version":"0.9.11"},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for gault-0.12.3-py3-none-any.whl
Algorithm Hash digest
SHA256 4850459a6b59972aef8f6b2467f4284b2f6b104a0e52a9c5999c07a93c4e7e48
MD5 a82a00331662f3254ff632b299a683e5
BLAKE2b-256 5aaf82a3de57682fa90d1e35999ac289af320638a59b72522de92e42ae3624bc

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page