An asynchronous Object Document Mapper (ODM) for MongoDB built on top of Motor.
Project description
Description | Badge |
---|---|
PyPI - Version | |
Downloads | |
PyPI License | |
GitHub Contributors | |
Code Coverage | |
Code Style | |
Documentation |
Author: Patrick Prunty.
motormongo
is an Object Document Mapper (ODM) for MongoDB built on top
of motor
, the MongoDB
recommended asynchronous Python driver for MongoDB Python applications, designed to work with Tornado or
asyncio and enable non-blocking access to MongoDB.
Asynchronous operations in a backend system, built using FastAPI for
example, enhances performance and scalability by enabling non-blocking, concurrent handling of multiple I/O requests,
leading to more efficient use of server resources, by forcing the CPU usage on the backend server's main thread to be
maximized across concurrent requests. For more low-level details on the advantages of asynchronous motormongo
over
existing MongoDB ODMs, such as mongoengine
see here.
The interface for instantiating Document classes follows similar logic
to mongoengine
, enabling ease-of-transition and
migration from mongoengine
to the asynchronous motormongo
.
Check out the documentation site here.
- Installation
- Quickstart
- Why use motormongo?
- motormongo Fields
- CRUD classmethods
- CRUD instance methods
- Aggregation Operations
- Polymorphism and Inheritance
- Pooling Options and Configuration
- FastAPI integration
- License
Installation
To install motormongo
, you can use pip
inside your virtual environment:
python -m pip install motormongo
Alternatively, to install motormongo
into your poetry
environment:
poetry add motormongo
Quickstart
Step 1. Create a motormongo client:
import asyncio
from motormongo import DataBase
async def init_db():
# This 'connect' method needs to be called inside of an async function
await DataBase.connect(uri="<mongo_uri>", db="<mongo_database>")
if __name__ == "__main__":
asyncio.run(init_db())
or, in a FastAPI application:
from fastapi import FastAPI
from motormongo import DataBase
app = FastAPI()
@app.on_event("startup")
async def startup_db_client():
await DataBase.connect(uri="<mongo_uri>", db="<mongo_database>")
@app.on_event("shutdown")
async def shutdown_db_client():
await DataBase.close()
The mongo_uri
should look something like this:
mongodb+srv://<username>:<password>@<cluster>.mongodb.net
and database
should be the name of an existing MongoDB database in your MongoDB instance.
For details on how to set up a local or cloud MongoDB database instance, see here.
You can also specify and pass pooling_options
to the Motor on the DataBase.connect()
method, like so:
import asyncio
from motormongo import DataBase
# Example pooling options
pooling_options = {
'maxPoolSize': 50,
'minPoolSize': 10,
'maxIdleTimeMS': 30000,
'waitQueueTimeoutMS': 5000,
'connectTimeoutMS': 10000,
'socketTimeoutMS': 20000
}
async def init_db():
# This 'connect' method needs to be called inside of an async function
await DataBase.connect(uri="<mongo_uri>", db="<mongo_database>", **pooling_options)
if __name__ == "__main__":
asyncio.run(init_db())
See Pooling Options Configuration section for more details.
Step 2. Define a motormongo Document:
Define a motormongo
User
document:
import re
import bcrypt
from motormongo import Document, BinaryField, StringField
def hash_password(password: str) -> bytes:
# Example hashing function
return bcrypt.hashpw(password.encode('utf-8'), salt=bcrypt.gensalt())
class User(Document):
username = StringField(help_text="The username for the user", min_length=3, max_length=50)
email = StringField(help_text="The email for the user", regex=re.compile(r'^\S+@\S+\.\S+$')) # Simple email regex
password = BinaryField(help_text="The hashed password for the user", hash_function=hash_password)
def verify_password(self, password: str) -> bool:
""" Utility function which can be used to validate user's salted password later...
ex. user = await User.find_one({"_id": request.user_id})
is_authenticated = user.verify_password(request.password)
"""
return bcrypt.checkpw(password.encode("utf-8"), self.password)
class Meta:
collection = "users" # < If not provided, will default to class name (ex. User->user, UserDetails->user_details)
created_at_timestamp = True # < Provide a DateTimeField for document creation
updated_at_timestamp = True # < Provide a DateTimeField for document updates
Step 3: Create a MongoDB document using the User class
import bcrypt
await User.insert_one(
{
"username": "johndoe",
"email": "johndoe@portmarnock.ie",
"password": "password123"
# < hash_functon will hash the string literal password and store binary field in the database
}
)
Step 4: Validate user was created in your MongoDB collection
You can do this by using MongoDB compass GUI, or alternatively, add a query to find all documents in the user collection after doing the insert in step 3:
users = User.find_many({})
if users:
print("User collection contains the following documents:")
for user in users:
print(user.to_dict())
else:
print("User collection failed to update! Check your MongoDB connection details and try again!")
Step 5: Put all the code above into one file and run it
python main.py
or in a FastAPI application:
uvicorn main:app --reload
Please refer to FastAPI Documentation for more details on how to get setup with FastAPI.
Congratulations 🎉
You've successfully created your first motormongo
Object Document Mapper class. 🥳
The subsequent sections detail the datatype fields provided by motormongo
, as well as the CRUD
operations available on the classmethods and object instance methods of a motormongo
document.
If you wish to get straight into how to integrate motormongo
with your FastAPI
application, skip ahead to the
FastAPI Integration section.
Why use motormongo?
Integrating motormongo
, an asynchronous Object-Document Mapper (ODM) for MongoDB, into a backend built with an
asynchronous web framework like FastAPI
, enhances the system's ability to handle I/O-bound operations efficiently.
Using the await
keyword with motormongo
operations allows the event loop to manage concurrent requests effectively,
freeing up the main thread to handle other tasks while waiting for database operations to complete.
Understanding the Efficiency of Asynchronous Operations
Traditional Approach with pymongo
and mongoengine
(Synchronous)
Typically, a web server handling multiple requests that involve fetching documents from MongoDB would face bottlenecks with synchronous database operations:
from fastapi import FastAPI, HTTPException
from mongoengine import connect, Document, StringField
from fastapi.encoders import jsonable_encoder
connect(db="testdb", host="mongodb://localhost:27017/", alias="default")
class User(Document):
name = StringField(required=True)
app = FastAPI()
@app.get("/get_data/")
async def get_data(name: str):
user = User.objects(name=name).first()
if user:
return jsonable_encoder(user.to_mongo().to_dict())
else:
raise HTTPException(status_code=404, detail="User not found")
In this setup, operations such as User.objects(name=name).first()
block the thread until completion, hindering the
server's ability to process other requests concurrently, leading to inefficient resource use and potential performance
bottlenecks.
Asynchronous Approach with motor
and motormongo
Switching to an asynchronous ODM like motormongo
allows FastAPI to handle database operations without blocking:
from fastapi import FastAPI
from motormongo import DataBase, Document, StringField
class User(Document):
name = StringField(required=True)
app = FastAPI()
@app.on_event("startup")
async def startup_db_client():
await DataBase.connect(uri="mongodb://localhost:27017/", db="testdb")
@app.get("/get_data/")
async def get_data(name: str):
user = await User.find_one({'name': name})
if user:
return user.to_dict()
else:
return {"message": "User not found"}
Using await
with motormongo
operations such as User.find_one()
allows the application to perform non-blocking
database operations. This asynchronous model is particularly advantageous for I/O-bound applications, allowing the
server to handle multiple requests efficiently by utilizing Python's asyncio
.
Maximizing Performance with Multiple FastAPI Workers
To further enhance the performance of FastAPI applications utilizing motormongo
, deploying multiple worker processes
can significantly increase the application's ability to handle high volumes of concurrent requests:
- Scalability: Deploying FastAPI with multiple workers enables the application to scale across multiple CPU cores, offering better handling of concurrent requests by running multiple instances of the application, each in its own process.
- Resource Utilization: More workers mean that the application can utilize more system resources, effectively distributing the load and preventing any single worker from becoming a bottleneck.
- Deployment Strategy: Use an ASGI server like
uvicorn
with the--workers
option to specify the number of worker processes. For example,uvicorn app:app --workers 4
would run the application with four worker processes.
By leveraging motormongo
with FastAPI, developers can build backend systems capable of handling asynchronous I/O-bound
operations efficiently. This setup not only improves the application's responsiveness and throughput by utilizing the
asynchronous capabilities of Python's asyncio
but also maximizes performance through the strategic deployment of
multiple workers. Together, these strategies enable the creation of highly scalable, efficient, and modern web
applications.
motormongo Fields
motormongo
supports a variety of field types to accurately define the schema of your MongoDB documents. Each field
type is designed to handle specific data types and validations:
- BinaryField: Stores binary data, useful for storing encoded or hashed data like passwords.
- BooleanField: Stores boolean values (
True
orFalse
). - DateTimeField: Manages date and time, with options for automatically setting current date/time on creation or update.
- EmbeddedDocumentField: For fields that should contain values from a predefined enumeration.
- EnumField: For fields that should contain values from a predefined enumeration.
- FloatField: Handles floating-point numbers, with options to specify minimum and maximum values.
- GeoJSONField: Manages geographical data in GeoJSON format, with an option to return data as JSON.
- IntegerField: Manages integer data, allowing specifications for minimum and maximum values.
- ListField: Handles lists of items, which can be of any type.
- ReferenceField: Creates a reference to another document.
- StringField: Handles string data with options for minimum and maximum length, and regex validation.
All fields have the following default parameters:
default
: Specifies the default value for the field if no value is provided. This parameter can be a static value or a callable object. The callable object is useful for dynamic values like generating timestamps or unique identifiers.required
: A boolean indicating whether the field is mandatory. If set to True, the document cannot be saved without providing a value for this field.unique
: A boolean specifying if the field value should be unique across the collection. This is crucial for fields like email addresses or usernames.
BinaryField
The BinaryField
is designed for storing binary data within a database. It offers capabilities for encoding, hashing,
and decoding data, making it versatile for handling various types of binary data, including but not limited to encrypted
or hashed content.
Parameters:
hash_function
: (Optional) A callable that hashes input data. The function should have a type annotation to indicate whether it expects astr
orbytes
as input. This annotation is crucial as it dictates whether theBinaryField
should encode the string before hashing. If the annotation indicatesstr
, the field will pass the string directly to thehash_function
. Ifbytes
, theBinaryField
will encode the string (using the providedencode
function or default UTF-8 encoding) before hashing.return_decoded
: (Optional) A boolean indicating whether to decode binary data when it is retrieved from the database. If set toTrue
, the stored binary data will be decoded back into a string using the provideddecode
function or default UTF-8 decoding. This is useful for data that was encoded but not hashed, as hashed data cannot be meaningfully decoded.encode
: (Optional) A function to encode a string to bytes before storage. If not provided, the class defaults to UTF-8 encoding. This function is used when the input data is a string and needs to be stored as binary data, or before hashing if thehash_function
expectsbytes
.decode
: (Optional) A function to decode bytes back to a string when data is retrieved from the database. This parameter is only relevant ifreturn_decoded
isTrue
. If not provided, the class defaults to UTF-8 decoding.
Important: For the hash_function
to work correctly with the BinaryField
, it must include type annotations for
its parameters. This enables the BinaryField
to determine the correct processing strategy (i.e., whether to encode the
string before hashing).
Example Usage:
from motormongo import Document, BinaryField, StringField
import bcrypt
# Hash function with type annotation indicating it expects a 'str'
def hash_password(password: str) -> bytes:
return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt())
class User(Document):
username = StringField(min_length=3, max_length=50)
# Note: 'hash_function' requires a type annotation
password = BinaryField(hash_function=hash_password, return_decoded=False)
def verify_password(self, password: str) -> bool:
# Verifies if the provided password matches the stored hash
return bcrypt.checkpw(password.encode("utf-8"), self.password)
# Creating a user instance with a hashed password
user = User(username="johndoe", password="secret")
inserted_user = await user.save()
# Authentication checks
is_authenticated = inserted_user.verify_password("wrongpassword") # Expected to return False
is_authenticated = inserted_user.verify_password("secret") # Expected to return True
BooleanField
The BooleanField
is used for storing boolean values (True or False). It ensures that the data stored in this field is
strictly boolean.
Parameters:
- There are no specific parameters unique to BooleanField other than those inherited from the base Field class.
from motormongo import Document, BooleanField, StringField
class Product(Document):
name = StringField(min_length=1, max_length=100)
is_available = BooleanField(default=False)
# Create a product indicating its availability
product = Product(name="Gadget", is_available=True)
await product.save()
DateTimeField
The DateTimeField
handles date and time values, with options to automatically update these values on document creation
or modification.
Parameters:
auto_now
: Automatically update the field to the current datetime when the document is saved.auto_now_add
: Automatically set the field to the current datetime when the document is created.datetime_formats
: List of string formats to parse datetime strings.
Example Usage:
from motormongo import Document, DateTimeField
class Event(Document):
start_time = DateTimeField(auto_now_add=True)
# Create an event with the current start time
event = Event()
await event.save()
EmbeddedDocumentField
The EmbeddedDocumentField
is used for embedding documents within a document, supporting nested document structures.
This field allows you to include complex data structures as part of your document.
Parameters:
document_type
: The class of the embedded document, which must be a subclass ofEmbeddedDocument
,BaseModel
from Pydantic, ordict
representation of theEmbeddedDocument
.
Example Usage:
from motormongo import Document, EmbeddedDocument, EmbeddedDocumentField, StringField
from pydantic import BaseModel
class Address(EmbeddedDocument):
street = StringField()
city = StringField()
class User(Document):
name = StringField()
address = EmbeddedDocumentField(document_type=Address)
class PydanticAddress(BaseModel):
street: str
city: str
# Create a user with an embedded address document
user = User(name="John Doe", address={"street": "123 Elm St", "city": "Springfield"})
# user = User(name="John Doe", address=Address(street="123 Elm St", city="Springfield")) # Also works
# user = User(name="John Doe", address=PydanticAddress(street="123 Elm St", city="Springfield")) # Also works
await user.save()
EnumField
The EnumField
is designed to store enumerated values, allowing for validation against a predefined set of options.
Parameters:
enum
: The enumeration class that defines valid values for the field.
Example Usage:
import enum
from motormongo import Document, EnumField
class UserStatus(enum.Enum):
ACTIVE = 'active'
INACTIVE = 'inactive'
BANNED = 'banned'
class User(Document):
status = EnumField(enum=UserStatus)
# Create a user and set their status using the EnumField
user = User(status=UserStatus.ACTIVE)
# user = User(status="active") # Also works
await user.save()
FloatField
The FloatField
handles floating-point numbers, with options to specify minimum and maximum values.
Parameters:
min_value
: (Optional) The minimum allowable value.max_value
: (Optional) The maximum allowable value.
Example Usage:
from motormongo import Document, FloatField
class Measurement(Document):
temperature = FloatField(min_value=-273.15) # Absolute zero constraint
# Record a temperature measurement
measurement = Measurement(temperature=25.5)
await measurement.save()
GeoJSONField
The GeoJSONField
is designed for storing geographical coordinates in GeoJSON format.
Parameters:
return_as_list
: (Optional) IfTrue
, returns the coordinates as a [longitude, latitude] list instead of a GeoJSON object.
Example Usage:
from motormongo import Document, GeoJSONField
class Location(Document):
point = GeoJSONField()
# Create a location point
location = Location(point={"type": "Point", "coordinates": [-73.856077, 40.848447]}) # Could also use
# location = Location(point=[-73.856077, 40.848447]) # This would also work
await location.save()
IntegerField
The IntegerField
is used for storing integer values, with optional validation for minimum and maximum values.
Parameters:
min_value
: (Optional) The minimum allowable value.max_value
: (Optional) The maximum allowable value.
Example Usage:
from motormongo import Document, IntegerField
class Product(Document):
quantity = IntegerField(min_value=0)
# Create a product with quantity validation
product = Product(quantity=10)
await product.save()
ListField
The ListField
is used for storing a list of items, optionally validating the type of items in the list.
Parameters:
field
: (Optional) AField
instance specifying the type of items in the list.
Example Usage:
from motormongo import Document, ListField, StringField
class ShoppingList(Document):
items = ListField(field=StringField())
# Create a shopping list with string items
shopping_list = ShoppingList(items=["Milk", "Eggs", "Bread"])
await shopping_list.save()
ReferenceField
The ReferenceField
is used to create a reference to another document, typically for creating relationships between
collections.
Parameters:
document
: The class of the document to which the field references.
Example Usage:
from motormongo import Document, ReferenceField, StringField
class User(Document):
name = StringField()
class Post(Document):
author = ReferenceField(document=User)
# Create a user and a post referencing the user
user = User(name="John Doe")
await user.save()
post = Post(author=user)
To fetch the referenced document, you must await the coroutine returned by accessing the reference field. This operation asynchronously retrieves the related document instance from the database.
# Assuming `post` is an instance of the Post document with a reference to a User
# Fetch the user referenced by the post's author field
referenced_user = await post.author
if referenced_user:
print("Referenced User:",
referenced_user.to_dict()) # Should print, {'_id': '65d8bf2dad3fa2e9169d2f94', 'name': 'John Doe'}
else:
print("User not found or failed to fetch.")
This example demonstrates how to access and asynchronously fetch the document referenced by a ReferenceField
. The
await
keyword is crucial because the operation is asynchronous, involving a database query to retrieve the referenced
document.
Note: Ensure that the fetching operation is performed within an asynchronous context, such as an async function. The ReferenceField provides a powerful way to manage relationships between documents, enabling complex data models with interconnected documents.
StringField
The StringField
is used for storing string data in a document. It supports validation for minimum and maximum length
and can enforce a specific regex pattern.
Parameters:
min_length
: (Optional) The minimum length of the string.max_length
: (Optional) The maximum length of the string.regex
: (Optional) A regex pattern that the string must match.
Example Usage:
from motormongo import Document, StringField
class UserProfile(Document):
username = StringField(min_length=3, max_length=50)
bio = StringField(max_length=200, regex=r'^[A-Za-z0-9 ]*$') # Alphanumeric and space only
# Create a user profile with validation
profile = UserProfile(username="user123", bio="I love coding.")
await profile.save()
Class methods
Operations
The following class methods are supported by motormongo
's Document
class:
All examples below assume User
is a subclass of motormongo
provided Document class.
Create
insert_one(document: dict, **kwargs) -> Document
Inserts a single document into the database.
user = await User.insert_one({
"name": "John",
"age": 24,
"alive": True
})
Alternatively, using **kwargs
:
user = await User.insert_one(
name="John",
age=24,
alive=True)
And similarly, with a dictionary:
user_document = {
"name": "John",
"age": 24,
"alive": True
}
user = await User.insert_one(**user_document)
insert_many(List[document]) -> tuple[List['Document'], Any]
users, user_ids = await User.insert_many(
[
{
"name": "John",
"age": 24,
"alive": True
},
{
"name": "Mary",
"age": 2,
"alive": False
}
]
)
or
docs_to_insert = [{"name": "Alice", "age": 30}, {"name": "Bob", "age": 25}]
inserted_docs, inserted_ids = await User.insert_many(docs_to_insert)
Read
find_one(query, **kwargs) -> Document
user = await User.find_one(
{
"_id": "655fc281c440f677fa1e117e"
}
)
Alternatively, using **kwargs
:
user = await User.find_one(_id="655fc281c440f677fa1e117e")
Note: The _id
string datatype here is automatically converted to a BSON ObjectID, however, motormongo
handles
the
scenario when a
BSON ObjectId is passed as the _id
datatype:
from bson import ObjectId
user = await User.find_one(
{
"_id": ObjectId("655fc281c440f677fa1e117e")
}
)
find_many(query, limit, **kwargs) -> List[Document]
users = await User.find_many(age={"$gt": 40}, alive=False, limit=20)
or
query = {"age": {"$gt": 40}, "alive": False}
users = await User.find_many(**query, limit=20)
Update
update_one(query, updated_fields) -> Document
updated_user = await User.update_one(
{
"_id": "655fc281c440f677fa1e117e"
},
{
"name": "new_name",
"age": 30
}
)
or
query_criteria = {"name": "old_name"}
update_data = {"name": "updated_name"}
updated_user = await User.update_one(query_criteria, update_data)
update_many(qeury, fields) -> Tuple[List[Any], int]
updated_users, modified_count = await User.update_many({'age': {'$gt': 40}}, {'category': 'senior'})
another example:
updated_users, modified_count = await User.update_many({'name': 'John Doe'}, {'$inc': {'age': 1}})
Delete
delete_one(query, **kwargs) -> bool
deleted = await User.delete_one({'_id': '507f191e810c19729de860ea'})
Alternatively, using **kwargs
:
deleted = await User.delete_one(name='John Doe')
delete_many(query, **kwargs) -> int
deleted_count = await User.delete_many({'age': {'$gt': 40}})
Another example:
# Delete all users with a specific status
deleted_count = await User.delete_many({'status': 'inactive'})
Alternatively, using **kwargs
:
deleted_count = await User.delete_many(status='inactive')
Mixed
find_one_or_create(query, defaults) -> Tuple['Document', bool]
user, created = await User.find_one_or_create({'username': 'johndoe'}, defaults={'age': 30})
find_one_and_replace(query, replacement) -> Document
replaced_user = await User.find_one_and_replace({'username': 'johndoe'}, {'username': 'johndoe', 'age': 35})
find_one_and_delete(query) -> Document
deleted_user = await User.find_one_and_delete({'username': 'johndoe'})
find_one_and_update_empty_fields(query, update_fields) -> Tuple['Document', bool]
updated_user, updated = await User.find_one_and_update_empty_fields(
{'username': 'johndoe'},
{'email': 'johndoe@example.com', 'age': 30}
)
Instance methods
motormongo
also supports the manipulation of fields on the object instance. This allows
users to programmatically achieve the same operations listed above through the object instance
itself.
Operations
The following are object instance methods are supported by motormongo
's Document
class:
CRUD Type | Operation |
---|---|
Create | save() -> None |
Delete | delete() -> None |
Note: All update operations can be manipulated on the fields in the Document class object itself.
user.save() -> None
# Find user by MongoDB _id
user = await User.find_one(
{
"_id": "655fc281c440f677fa1e117e"
}
)
# If there age is greater than 80, make them dead
if user.age > 80:
user.alive = False
# Persist update on User instance in MongoDB mongo
user.save()
In this example, User.find_one()
returns an instance of User
. If the age field
is greater than 80, the alive field is set to false. The instance of the document in the MongoDB
database is then updated by calling the .save()
method on the User
object instance.
Delete
user.delete() -> None
# Find all users where the user is not alive
users = await User.find_many(
{
"alive": False
}
)
# Recursively delete all User instances in the users list who are not alive
for user in users:
user.delete()
Aggregation
The aggregate
class method is designed to perform aggregation operations on the documents within the collection. It
allows the execution of a sequence of data aggregation operations defined by the pipeline
parameter. This method can
return the results either as a list of documents or as a cursor, based on the return_as_list
flag.
Parameters:
pipeline
: A list of dictionaries defining the aggregation operations to be performed on the collection.return_as_list
(optional): A boolean flag that determines the format of the returned results. If set toTrue
, the method returns a list of documents. IfFalse
(default), it returns a cursor.
Returns:
- If
return_as_list
isTrue
, returns a list of documents resulting from the aggregation pipeline. - If
return_as_list
isFalse
, returns a Cursor to iterate over the results.
Raises:
ValueError
: If an error occurs during the execution of the pipeline.
Example Usage:
from yourmodule import YourDocumentClass
# Connect to the database (Assuming the database connection is already set up)
# Define an aggregation pipeline
pipeline = [
{"$match": {"status": "active"}},
{"$project": {"_id": 0, "username": 1, "status": 1}},
{"$sort": {"username": 1}}
]
# Execute the aggregation without returning a list
cursor = await YourDocumentClass.aggregate(pipeline)
async for doc in cursor:
print(doc)
# Execute the aggregation and return results as a list
docs_list = await YourDocumentClass.aggregate(pipeline, return_as_list=True)
print(docs_list)
Polymorphism and Inheritance
This part of the documentation provides an overview of implementing and using polymorphism and inheritance using the
motormongo
framework, enabling flexible and organized data models for various use cases.
Base Model: Item
The Item
class serves as the base model for different types of items stored in a MongoDB collection. It defines common
fields and methods that are shared across all item types.
from motormongo import Document, StringField, FloatField
class Item(Document):
name = StringField()
cost = FloatField()
Subclass Models
Subclasses of Item
can introduce specific fields or override methods to cater to different item categories.
Book
A Book
represents a specific type of Item
with additional attributes related to books.
class Book(Item):
title = StringField()
author = StringField()
isbn = StringField()
Electronics
An Electronics
item represents electronic goods with attributes like warranty period and brand.
class Electronics(Item):
warranty_period = StringField() # E.g., "2 years"
brand = StringField()
Usage
Creating and Inserting Items
To insert items into the database, use the insert_one
method. The item's type is managed automatically.
# Insert a book
book = await Book.insert_one(title="1984", author="George Orwell", isbn="123456789", cost=20.0, name="Book")
# Insert an electronics item
electronics = await Electronics.insert_one(warranty_period="2 years", brand="TechBrand", cost=999.99, name="Laptop")
Querying Items
You can query items of any type using their base or specific models. Polymorphism allows retrieved instances to be of the correct subclass.
# Find a book by ISBN
found_book = await Book.find_one(isbn="123456789")
# Find an electronics item by brand
found_electronics = await Electronics.find_one(brand="TechBrand")
Polymorphic Behavior
The following operations are supported over the base Item
Document class, enabling complex querying over base Item
Document class and all of its subclasses (i.e Book
and Electronics
):
As well as aggregate
operations, see the Aggregation Operation section for more details.
Querying on the base Item
model returns items of all types, correctly instantiated as their specific subclasses. See
below for a logical example of polymorphic querying:
# Find all items with a cost over 50
expensive_items = await Item.find_many(cost={"$gt": 50})
for item in expensive_items:
print(type(item)) # Prints the subclass (Book, Electronics, etc.)
if isinstance(item, Book):
print(f"Book: {item.title} by {item.author}")
elif isinstance(item, Electronics):
print(f"Electronics: {item.brand} with {item.warranty_period} warranty")
Pooling Options Configuration
In motormongo
, you have the flexibility to customize the pooling options for the Motor client. This allows you to
fine-tune the behavior of database connections according to your application's needs. Below are some of the parameters
you can configure, along with their descriptions and example usage.
Configuration Parameters
- Max Pool Size: The maximum number of connections in the connection pool.
- Min Pool Size: The minimum number of connections in the connection pool.
- Max Idle Time: The maximum time (in milliseconds) a connection can remain idle in the pool before being closed.
- Wait Queue Timeout: The time (in milliseconds) a thread will wait for a connection to become available when the pool is exhausted.
- Connect Timeout: The time (in milliseconds) to wait for a connection to the MongoDB server to be established before timing out.
- Socket Timeout: The time (in milliseconds) to wait for a socket read or write to complete before timing out.
Example Configuration
import asyncio
from motormongo import DataBase
# Example pooling options
pooling_options = {
'maxPoolSize': 50,
'minPoolSize': 10,
'maxIdleTimeMS': 30000,
'waitQueueTimeoutMS': 5000,
'connectTimeoutMS': 10000,
'socketTimeoutMS': 20000
}
async def init_db():
# This 'connect' method needs to be called inside of an async function
await DataBase.connect(uri="<mongo_uri>", db="<mongo_database>", **pooling_options)
if __name__ == "__main__":
asyncio.run(init_db())
or in FastAPI:
from fastapi import FastAPI
from motormongo import DataBase
app = FastAPI()
# Example pooling options
pooling_options = {
'maxPoolSize': 50,
'minPoolSize': 10,
'maxIdleTimeMS': 30000,
'waitQueueTimeoutMS': 5000,
'connectTimeoutMS': 10000,
'socketTimeoutMS': 20000
}
@app.on_event("startup")
async def startup_db_client():
await DataBase.connect(uri="<mongodb_uri>", db="<mongodb_db>", **pooling_options)
This configuration demonstrates how to set up motormongo
with specific pooling options to optimize performance
and resource utilization in high-throughput environments.
For more information, consult the official documentation:
FastAPI integration
motormongo
can be easily integrated in FastAPI APIs to leverage the asynchronous ability of
FastAPI. To leverage motormongo
's ease-of-use, Pydantic model's should be created to represent the MongoDB
motormongo
Document as a Pydantic model. Below is a light-weight CRUD FastAPI application using motormongo
:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from motormongo import DataBase, Document, BinaryField, StringField
import re
import bcrypt
def hash_password(password: str) -> bytes:
# Example hashing function
return bcrypt.hashpw(password.encode('utf-8'), salt=bcrypt.gensalt())
class User(Document):
username = StringField(help_text="The username for the user", min_length=3, max_length=50)
email = StringField(help_text="The email for the user", regex=re.compile(r'^\S+@\S+\.\S+$')) # Simple email regex
password = BinaryField(help_text="The hashed password for the user", hash_function=hash_password)
def verify_password(self, password: str) -> bool:
return bcrypt.checkpw(password.encode("utf-8"), self.password)
class Meta:
collection = "users" # < If not provided, will default to class name (ex. User->user, UserDetails->user_details)
created_at_timestamp = True # < Provide a DateTimeField for document creation
updated_at_timestamp = True # < Provide a DateTimeField for document updates
class UserModelRequest(BaseModel):
username: str = Field(example="johndoe")
email: str = Field(example="johndoe@coldmail.com")
password: str = Field(example="password123")
app = FastAPI()
@app.on_event("startup")
async def startup_db_client():
await DataBase.connect(uri="<mongodb_uri>", db="<mongodb_db>")
@app.on_event("shutdown")
async def shutdown_db_client():
await DataBase.close()
@app.post("/users/", status_code=201)
async def create_user(user: UserModelRequest):
new_user = await User.insert_one(**user.dict())
return new_user.to_dict()
@app.post("/user/auth", status_code=200)
async def is_authenticated(username: str, password: str):
user = await User.find_one({"username": username})
if not user:
raise HTTPException(status_code=404, detail="User not found")
if not user.verify_password(password):
raise HTTPException(status_code=401, detail="Unauthorized")
else:
return "You are authenticated! You can see this!"
@app.get("/users")
async def get_users():
users = await User.find_many()
if not users:
raise HTTPException(status_code=404, detail="User not found")
return [user.to_dict() for user in users]
@app.get("/users/{user_id}")
async def get_user(user_id: str):
user = await User.find_one({"_id": user_id})
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user.to_dict()
@app.put("/users/{user_id}", status_code=200)
async def update_user(user_id: str, user_data: UserModelRequest):
updated_user = await User.update_one({"_id": user_id}, user_data.model_dump())
if not updated_user:
raise HTTPException(status_code=404, detail="User not found")
return updated_user.to_dict()
@app.delete("/users/{user_id}", status_code=204)
async def delete_user(user_id: str):
user = await User.find_one({"_id": user_id})
if not user:
raise HTTPException(status_code=404, detail="User not found")
await user.delete()
return {"status": "User deleted successfully"}
License
This project is licensed under the MIT License.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for motormongo-0.1.15-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 0f0474e6a974913a309edc6dcbc7cced7cbf69cdbddbf0ecc6a7cc849bc7c8c1 |
|
MD5 | 4bbead116c4abc07fae8d51449f92878 |
|
BLAKE2b-256 | 073e98cbbd5d1466d13b48ab6955e6a6545e437dced7c947d7277fa658122f9b |