A lightweight Object Document Mapper for MongoDB
Project description
Gault
A lightweight Object Document Mapper (ODM) for MongoDB with Python type hints and state tracking.
Features
- Type-safe MongoDB documents with Python type hints
- Field aliasing for database column mapping
- Query operators with Pythonic syntax
- Async manager for CRUD operations
- Aggregation pipeline support
- Automatic state tracking and dirty field detection
- Persistence tracking
Installation
pip install gault
Quick Start
from gault import Schema, Model, Field, configure, AsyncManager
# Schema: Persistent documents mapped to MongoDB collections
class Person(Schema, collection="people"):
id: Field[int] = configure(pk=True)
name: Field[str]
age: Field[int] = configure(db_alias="person_age")
# Model: Non-persistent data classes (projections, view models, etc.)
class PersonSummary(Model):
name: Field[str]
total: Field[int]
# Create manager
manager = AsyncManager(database)
# Query and modify
person = await manager.get(Person, filter=Person.id == 1)
person.age = 43
await manager.save(person, atomic=True) # Only updates dirty fields
Schema vs Model
- Schema: Persistent MongoDB collections. Requires
collectionparameter and gets registered globally. - Model: Non-persistent data structures for aggregation projections, view models, or intermediate transformations.
Field Configuration
Fields can be configured with metadata using the configure() function:
class Person(Schema, collection="people"):
# Primary key field - used for filtering in save() operations
id: Field[int] = configure(pk=True)
# Regular field
name: Field[str]
# Field with database alias (field name differs from DB column)
age: Field[int] = configure(db_alias="person_age")
Note: Fields marked with pk=True are used as the filter criteria in save() operations to identify the document for upsert.
Querying with Filters
Gault provides multiple ways to filter documents using type-safe field expressions.
Operator Expressions
Use class fields with comparison operators to build type-safe queries:
# Comparison operators
Person.age == 42 # Equal
Person.age != 30 # Not equal
Person.age < 50 # Less than
Person.age <= 50 # Less than or equal
Person.age > 18 # Greater than
Person.age >= 18 # Greater than or equal
Person.id.in_([1, 2, 3]) # In list
Person.id.nin([4, 5]) # Not in list
# Logical operators
filter = (Person.age >= 18) & (Person.age < 65) # AND
filter = (Person.name == "Alice") | (Person.name == "Bob") # OR
filter = ~(Person.age < 18) # NOT
# Complex expressions
filter = (Person.age >= 18) & ((Person.name == "Alice") | (Person.name == "Bob"))
Pipeline Filters
For advanced queries, use the Pipeline API with aggregation stages:
from gault import Pipeline
# Match and sort
pipeline = Pipeline().match(Person.age >= 18).sort(Person.age.asc())
# Pagination
pipeline = Pipeline().skip(10).take(20)
# Group and aggregate
from gault import Sum
pipeline = (
Pipeline()
.match(Person.age >= 18)
.group(by=Person.name, accumulators={"total": Sum(Person.age)})
)
# Multiple stages
pipeline = (
Pipeline()
.match(Person.age >= 18)
.sort(Person.age.desc())
.take(10)
)
Raw MongoDB Queries
You can also use raw MongoDB query dictionaries:
# Dict filter
filter = {"age": {"$gte": 18}}
# Raw pipeline stages
pipeline = [
{"$match": {"age": {"$gte": 18}}},
{"$sort": {"age": -1}},
{"$limit": 10}
]
AsyncManager Methods
find(model, filter=None)
Finds a single document matching the filter. Returns None if not found.
Filter types: Operator expression, Pipeline, dict, or list of stages.
# With operator
person = await manager.find(Person, filter=Person.age == 42)
# With pipeline
pipeline = Pipeline().match(Person.age > 30).sort(Person.name.asc())
person = await manager.find(Person, filter=pipeline)
# With dict
person = await manager.find(Person, filter={"age": 42})
get(model, filter=None)
Like find(), but raises NotFound exception if no document is found.
Filter types: Operator expression, Pipeline, dict, or list of stages.
try:
person = await manager.get(Person, filter=Person.id == 123)
except NotFound:
print("Person not found")
select(model, filter=None, skip=None, take=None)
Returns an async iterator of documents matching the filter. Supports pagination.
Filter types: Operator expression, Pipeline, dict, or list of stages.
# Operator with in_()
async for person in manager.select(Person, filter=Person.id.in_([1, 2, 3])):
print(person.name)
# Pipeline
pipeline = Pipeline().match(Person.age >= 18).sort(Person.age.desc())
async for person in manager.select(Person, filter=pipeline, take=10):
print(person.name)
# Complex filter
filter = (Person.age >= 18) & (Person.age < 65)
async for person in manager.select(Person, filter=filter):
print(person.name)
insert(instance)
Inserts a new document into the database. Only works with Schema instances.
new_person = Person(id=1, name="Alice", age=30)
await manager.insert(new_person)
save(instance, refresh=False, atomic=False)
Upserts a document using find_one_and_update. Supports atomic updates with dirty field tracking.
refresh=False: IfTrue, refreshes the instance with the document returned from the databaseatomic=False: IfTrueand the instance is already persisted, only updates dirty fields
# Create or update
person = Person(id=1, name="Bob", age=25)
await manager.save(person)
# Later, update only changed fields
person.age = 26
await manager.save(person, atomic=True) # Only updates 'person_age' field
Persistence and Dirty Fields
Gault tracks the persistence state and modifications of your documents automatically.
Persistence Tracking
When documents are loaded from the database or saved, they are marked as persisted:
# Loaded from DB - automatically marked as persisted
person = await manager.find(Person, filter=Person.id == 1)
assert manager.persistence.is_persisted(person)
# Newly created - not yet persisted
new_person = Person(id=2, name="Charlie", age=35)
assert not manager.persistence.is_persisted(new_person)
# After saving - marked as persisted
await manager.save(new_person)
assert manager.persistence.is_persisted(new_person)
Dirty Field Tracking
Gault snapshots document state and tracks which fields have been modified:
person = await manager.get(Person, filter=Person.id == 1)
# Modify some fields
person.name = "New Name"
person.age = 50
# Check which fields changed
dirty_fields = manager.state_tracker.get_dirty_fields(person)
# dirty_fields == {'name', 'age'}
# Atomic save only updates changed fields
await manager.save(person, atomic=True)
Atomic Updates
When using atomic=True, the save() method generates optimal MongoDB updates:
- Dirty fields: Updated with
$set - Unchanged fields: Set with
$setOnInsert(only on insert, not update) - Primary key fields: Used in the filter
This minimizes race conditions and reduces unnecessary updates.
Low Level API: Pipeline Composition
While the high-level Schema and Model API provides a convenient way to work with MongoDB, Gault also offers a powerful low-level API for building complex aggregation pipelines independently. This API allows you to compose pipelines using a fluent interface without defining Schema or Model classes.
Basic Pipeline Construction
The Pipeline class provides methods for building MongoDB aggregation pipelines programmatically:
from gault import Pipeline
from gault.predicates import Field
from gault.accumulators import Sum
# Build a pipeline
pipeline = (
Pipeline()
.match({"status": "active"})
.sort({"created_at": -1})
.take(10)
)
# Convert to MongoDB stages
stages = pipeline.build()
# [
# {"$match": {"status": "active"}},
# {"$sort": {"created_at": -1}},
# {"$limit": 10}
# ]
Available Pipeline Stages
Filtering and Matching
# Match with raw dict
Pipeline().match({"age": {"$gte": 18}})
# Match with Field predicates
Pipeline().match(Field("age").gte(18) & Field("status").eq("active"))
Sorting and Pagination
# Sort by field
Pipeline().sort({"name": 1, "age": -1})
Pipeline().sort("name") # Ascending by default
# Pagination
Pipeline().skip(20).take(10)
# Random sampling
Pipeline().sample(5)
Projection
# Dict-based projection
Pipeline().project({"name": True, "age": True})
# Field-based projection
Pipeline().project(
Field("name").keep(),
Field("age").keep(alias="person_age"),
Field("internal_field").remove()
)
# Expression-based projection
Pipeline().project({"fullName": {"$concat": ["$firstName", " ", "$lastName"]}})
Grouping and Aggregation
# Group with accumulators
from gault.accumulators import Sum, Avg, Count
Pipeline().group(
{"total": Sum("$amount"), "average": Avg("$score")},
by="$category"
)
# Group all documents (no grouping key)
Pipeline().group(
{"count": Count()},
by=None
)
Field Manipulation
# Add or update fields
Pipeline().set({"computedField": {"$multiply": ["$price", "$quantity"]}})
Pipeline().set_field("status", "processed")
# Remove fields
Pipeline().unset("_id", "internal_field")
Array Operations
# Unwind array field
Pipeline().unwind("$tags")
# With options
Pipeline().unwind(
"$items",
include_array_index="item_index",
preserve_null_and_empty_arrays=True
)
Bucketing
# Manual buckets
Pipeline().bucket(
by="$age",
boundaries=[0, 18, 65, 100],
default="other",
output={"count": Sum(1)}
)
# Automatic buckets
Pipeline().bucket_auto(
by="$price",
buckets=5,
output={"count": Sum(1), "avgPrice": Avg("$price")}
)
Joins and Lookups
# Simple lookup
Pipeline().lookup(
OtherModel,
local_field="user_id",
foreign_field="_id",
into="user_data"
)
# Lookup with sub-pipeline
from gault.pipelines import CollectionPipeline
sub_pipeline = CollectionPipeline("orders").match({"status": "completed"})
Pipeline().lookup(sub_pipeline, into="orders")
# Graph lookup for hierarchical data
Pipeline().graph_lookup(
OtherModel,
start_with="$reports_to",
local_field="reports_to",
foreign_field="employee_id",
into="reporting_chain",
max_depth=5
)
Faceted Search
# Multiple aggregations in parallel
Pipeline().facet({
"count": Pipeline().count("total"),
"avgPrice": Pipeline().group({"value": Avg("$price")}, by=None),
"categories": Pipeline().group({"count": Sum(1)}, by="$category")
})
Other Stages
# Count documents
Pipeline().count("total")
# Union with another collection
Pipeline().union_with(OtherModel)
# Replace document
Pipeline().replace_with({"newField": "$existingField"})
# Raw stage (for unsupported operations)
Pipeline().raw({"$customStage": {"option": "value"}})
Pipeline Composition
Pipelines are immutable and chainable, making composition elegant:
# Build pipelines incrementally
base = Pipeline().match({"type": "user"})
active_users = base.match({"status": "active"})
premium_users = active_users.match({"plan": "premium"})
# Use pipe() for custom transformations
def add_pagination(p: Pipeline, page: int, size: int) -> Pipeline:
return p.skip(page * size).take(size)
pipeline = Pipeline().match({"status": "active"}).pipe(add_pagination, 2, 20)
Working with Field References
The low-level API provides Field for building queries without Schema classes:
from gault.predicates import Field
# Field predicates
query = Field("age").gte(18) & Field("country").in_(["US", "CA"])
Pipeline().match(query)
# Field references in expressions
Pipeline().project({
"fullName": {"$concat": [Field("firstName"), " ", Field("lastName")]}
})
Using with AsyncManager
You can use low-level pipelines with AsyncManager by passing them directly:
from gault import AsyncManager
manager = AsyncManager(database)
# Pass pipeline to manager methods
pipeline = Pipeline().match({"status": "active"}).sort({"created_at": -1})
results = await manager.select(MyModel, filter=pipeline)
# Or build stages manually
stages = pipeline.build()
cursor = database["collection"].aggregate(stages)
In-Memory Pipeline Testing
Use Pipeline.documents() to work with in-memory data:
# Create pipeline with test data
pipeline = Pipeline.documents(
{"id": 1, "name": "Alice", "age": 30},
{"id": 2, "name": "Bob", "age": 25},
{"id": 3, "name": "Charlie", "age": 35}
).match(Field("age").gte(30))
stages = pipeline.build()
# [
# {"$documents": [{"id": 1, ...}, {"id": 2, ...}, {"id": 3, ...}]},
# {"$match": {"age": {"$gte": 30}}}
# ]
Accumulators
Gault provides accumulator classes for use in group() and bucket() stages:
from gault.accumulators import (
AddToSet, Avg, Bottom, BottomN, Count, First, Last,
Max, Min, Push, Sum, Top, TopN
)
Pipeline().group(
{
"total": Sum("$amount"),
"average": Avg("$score"),
"unique_tags": AddToSet("$tag"),
"all_items": Push("$item"),
"highest": Max("$value"),
"lowest": Min("$value"),
"first_seen": First("$timestamp"),
"last_seen": Last("$timestamp")
},
by="$category"
)
Expression Operators
For complex expressions, Gault provides numerous expression operators:
from gault.expressions import Concat, Multiply, Cond, IfNull
Pipeline().project({
"fullName": Concat(Field("firstName"), " ", Field("lastName")),
"totalPrice": Multiply(Field("price"), Field("quantity")),
"displayName": IfNull(Field("nickname"), Field("firstName")),
"status": Cond(
Field("active").eq(True),
"Active",
"Inactive"
)
})
Requirements
- Python >= 3.12
- PyMongo >= 4.15.4
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file gault-0.12.3.tar.gz.
File metadata
- Download URL: gault-0.12.3.tar.gz
- Upload date:
- Size: 36.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.11 {"installer":{"name":"uv","version":"0.9.11"},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
994a5115afb8e32159ea9df85f63a933f6a84f64ff06d2918ffb3b78adfc0f6c
|
|
| MD5 |
68cb86640d7653b98e9b9b6cf61a97ba
|
|
| BLAKE2b-256 |
d02de21ef4be0e46130b522a041014025af0f6573ff3473e4a96d8a5dbc70836
|
File details
Details for the file gault-0.12.3-py3-none-any.whl.
File metadata
- Download URL: gault-0.12.3-py3-none-any.whl
- Upload date:
- Size: 41.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.11 {"installer":{"name":"uv","version":"0.9.11"},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4850459a6b59972aef8f6b2467f4284b2f6b104a0e52a9c5999c07a93c4e7e48
|
|
| MD5 |
a82a00331662f3254ff632b299a683e5
|
|
| BLAKE2b-256 |
5aaf82a3de57682fa90d1e35999ac289af320638a59b72522de92e42ae3624bc
|