Skip to main content

Python library async ORM for mongodb. Object as pyDantic

Project description

Aio Mongo Pydantic ORM (ampo)

Features:

  • The ORM is based on pydantic
  • Asynchronous
  • Many to many relationships
  • One to many relationships
  • Support MongoDB from 4.2
  • Python 3.9+

Usage

All example run into:

python -m asyncio

Create and get object

from ampo import CollectionWorker, AMPODatabase, ORMConfig, init_collection

# Initilize DB before calls db methods
AMPODatabase(url="mongodb://test")

# Pydantic Model
class ModelA(CollectionWorker):
    field1: str
    field2: int

    model_config = ORMConfig(
        orm_collection="test"
    )

await init_collection()

inst_a = ModelA("test", 123)
await inst_a.save()

# Get object
inst_a = await ModelA.get(field1="test")

Get all objects

Support additional options for this method. See find().

Example:

from ampo import CollectionWorker, AMPODatabase, ORMConfig, init_collection

# Initilize DB before calls db methods
AMPODatabase(url="mongodb://test")

# Pydantic Model
class ModelA(CollectionWorker):
    field1: str
    field2: int
    
    model_config = ORMConfig(
        orm_collection="test"
    )

await init_collection()

inst_a = ModelA("test", 123)
await inst_a.save()

# Get all objects
insts = await ModelA.get_all()

# Get all objects, by filter, and addional options
insts = await ModelA.get_all(
    filter={"field1": "test"},
    sort=[("field2", 1)],
    limit=10,
    skip=0
)

Id

For search by 'id' usages in filter '_id' or 'id' name.

For type ObjectId, use 'PydanticObjectId'.

Example:

from bson.objectid import ObjectId
from ampo import (
    CollectionWorker,
    AMPODatabase,
    ORMConfig,
    init_collection,
    PydanticObjectId
)


# Initilize DB before calls db methods
AMPODatabase(url="mongodb://test")

# Pydantic Model
class ModelA(CollectionWorker):
    field1: PydanticObjectId
    
    model_config = ORMConfig(orm_collection="test")

await init_collection()

inst_a = ModelA(field1=ObjectId("63538168e94461001215836a"))
await inst_a.save()

Indexes

# import

# Initilize DB before calls db methods
AMPODatabase(url="mongodb://test")

# Pydantic Model
class ModelA(CollectionWorker):
    field1: str

    model_config = ORMConfig(
        orm_collection="test",
        orm_indexes=[
            {
                "keys": ["field1"],
                "options": {
                    "unique": True
                }
            }
        ]
    )

# This method create indexes
# Call only one time
await init_collection()

Suppport options:

  • unique
  • expireAfterSeconds

Keys is list of fields.

TTL Index

It works only with single field (TTL Indexes).

You should set the option 'expireAfterSeconds', and field 'keys' should have only single field.

Example:

# import

# Initilize DB before calls db methods
AMPODatabase(url="mongodb://test")

# Pydantic Model
class ModelA(CollectionWorker):
    field1: datetime

    model_config = ORMConfig(
        orm_collection="test",
        orm_indexes=[
            {
                "keys": ["field1"],
                "options": {
                    "expireAfterSeconds": 10
                }
            }
        ]
    )

# optional, set new value
ModelA.update_expiration_value("field1", 20)

await init_collection()

if you want to set the 'expireAfterSeconds' only from method 'update_expiration_value', set it to '-1'. if you want skip the index changed, call method 'expiration_index_skip' before init_collection.

Indexes in replica set cluster

The replica set cluster has a specific behavior when creating indexes. If one of the nodes in the cluster is not reachable, the index creation will wait for the node to become available. See Index Builds in Replicated Environments. Change this behavior by setting the 'commit_quorum' option to 'majority'. See createIndexes.

Supported only from MongoDB version 4.4.

Example:

# import

# Initilize DB before calls db methods
AMPODatabase(url="mongodb://test")

# Pydantic Model
class ModelA(CollectionWorker):
    field1: str

    model_config = ORMConfig(
        orm_collection="test",
        orm_indexes=[
            {
                "keys": ["field1"],
                "commit_quorum": "majority",
            }
        ]
    )

await init_collection()

Relationships between documents

Embeded

It is supported by default. Just, you need create the embedded document as class of pydantic - 'BaseModel'. It will be stored into db as object.

Example:

from pydantic import BaseModel

class Embeded(BaseModel):
    name: str

class ModelA(CollectionWorker):
    field1: str
    field2: Embeded

    model_config = ORMConfig(
        orm_collection="test"
    )

Lock Record

It is a mechanism that allows you to retrieve a record with a lock. It is based on the findOneAndUpdate(). When the record is found, the field "lock_field" is set to True. And when the next search is made, this record will be skipped.

Example:

import datetime
from typing import Optional
from ampo import CollectionWorker, AMPODatabase, ORMConfig, init_collection

# Pydantic Model
class ModelA(CollectionWorker):
    field1: str
    lfield: bool = False
    field_dt_start: Optional[datetime.datetime] = None

    model_config = ORMConfig(
        orm_collection="test",
        orm_lock_record={
            "lock_field": "lfield",
            "lock_field_time_start": "field_dt_start",
        }
    )

await init_collection()

inst_a = ModelA("test", 123)
await inst_a.save()

inst_a = await ModelA.get_and_lock(field1="test")
# process
await inst_a.reset_lock()

# as context
async with ModelA.get_and_lock_context(field1="test") as inst_a:
    pass
    # process

# as context, version II
# timeout on wait
try:
  async with ModelA.get_lock_wait_context(
      filter={"field1": "test"}, timeout=10
  ) as inst_a:
      pass
except (TimeoutError, ValueError) as e:
    print("Error:", e)

Mehanism reset lock. If lock exist more than time, set 'lock_max_period_sec', lock will be reset. Default value is 15 minutes.

Hooks

Example:

import datetime
from typing import Optional
from ampo import CollectionWorker, AMPODatabase, ORMConfig, init_collection

# hooks
await def hook(obj, context: dict):
    print("Call hook for", obj)

# Pydantic Model
class ModelA(CollectionWorker):
    field1: str

    model_config = ORMConfig(
        orm_collection="test",
        orm_hooks={
            "pre_save": [hook],
            "post_save": [],
            "pre_delete": [],
            "post_delete": [],
        }
    )

a = ModelA(field1="test")
await a.save(context={"any": "any"})

Development

Style:

Run tests:

env TEST_MONGO_URL=mongodb://localhost/test pytest

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ampo-0.7.0.tar.gz (14.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ampo-0.7.0-py3-none-any.whl (13.6 kB view details)

Uploaded Python 3

File details

Details for the file ampo-0.7.0.tar.gz.

File metadata

  • Download URL: ampo-0.7.0.tar.gz
  • Upload date:
  • Size: 14.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for ampo-0.7.0.tar.gz
Algorithm Hash digest
SHA256 6a6d73935781c336d4dbe786fccfc99ce4ed79bd3811124e7d51bf89d11a3fed
MD5 bffafa882a8fc034977d496a8375e3e4
BLAKE2b-256 60aba86d765cc1b32fda2a35d287e42bcd22c954d18ccb65eb858ed23f7ee4a8

See more details on using hashes here.

File details

Details for the file ampo-0.7.0-py3-none-any.whl.

File metadata

  • Download URL: ampo-0.7.0-py3-none-any.whl
  • Upload date:
  • Size: 13.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for ampo-0.7.0-py3-none-any.whl
Algorithm Hash digest
SHA256 7d319c7d55ed5d36d675988782c09807938826bd9cad40df08022e3deab93b6e
MD5 b7c5894fa782238ebff764f90b709052
BLAKE2b-256 17ad2f1bee0218077633b2a13a921c3d15475e0a0cebc8e79e0e6b43325264b0

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page