Skip to main content

Python library async ORM for mongodb. Object as pyDantic

Project description

Aio Mongo Pydantic ORM (ampo)

Features:

  • The ORM is based on pydantic
  • Asynchronous
  • Many to many relationships
  • One to many relationships
  • Support MongoDB from 4.2
  • Python 3.9+

Usage

All example run into:

python -m asyncio

Create and get object

from ampo import CollectionWorker, AMPODatabase, ORMConfig, init_collection

# Initilize DB before calls db methods
AMPODatabase(url="mongodb://test")

# Pydantic Model
class ModelA(CollectionWorker):
    field1: str
    field2: int

    model_config = ORMConfig(
        orm_collection="test"
    )

await init_collection()

inst_a = ModelA(field1="test", field2=123)
await inst_a.save()

# Get object
inst_a = await ModelA.get(filter={"field1": "test"})

Get all objects

Support additional options for this method. See find().

Example:

from ampo import CollectionWorker, AMPODatabase, ORMConfig, init_collection

# Initilize DB before calls db methods
AMPODatabase(url="mongodb://test")

# Pydantic Model
class ModelA(CollectionWorker):
    field1: str
    field2: int
    
    model_config = ORMConfig(
        orm_collection="test"
    )

await init_collection()

inst_a = ModelA("test", 123)
await inst_a.save()

# Get all objects
insts = await ModelA.get_all()

# Get all objects, by filter, and addional options
insts = await ModelA.get_all(
    filter={"field1": "test"},
    sort=[("field2", 1)],
    limit=10,
    skip=0
)

Id

For search by 'id' usages in filter '_id' or 'id' name.

For type ObjectId, use 'PydanticObjectId'.

Example:

from bson.objectid import ObjectId
from ampo import (
    CollectionWorker,
    AMPODatabase,
    ORMConfig,
    init_collection,
    PydanticObjectId
)


# Initilize DB before calls db methods
AMPODatabase(url="mongodb://test")

# Pydantic Model
class ModelA(CollectionWorker):
    field1: PydanticObjectId
    
    model_config = ORMConfig(orm_collection="test")

await init_collection()

inst_a = ModelA(field1=ObjectId("63538168e94461001215836a"))
await inst_a.save()

Indexes

# import

# Initilize DB before calls db methods
AMPODatabase(url="mongodb://test")

# Pydantic Model
class ModelA(CollectionWorker):
    field1: str

    model_config = ORMConfig(
        orm_collection="test",
        orm_indexes=[
            {
                "keys": ["field1"],
                "options": {
                    "unique": True
                }
            }
        ]
    )

# This method create indexes
# Call only one time
await init_collection()

Suppport options:

  • unique
  • expireAfterSeconds

Keys is list of fields.

TTL Index

It works only with single field (TTL Indexes).

You should set the option 'expireAfterSeconds', and field 'keys' should have only single field.

Example:

# import

# Initilize DB before calls db methods
AMPODatabase(url="mongodb://test")

# Pydantic Model
class ModelA(CollectionWorker):
    field1: datetime

    model_config = ORMConfig(
        orm_collection="test",
        orm_indexes=[
            {
                "keys": ["field1"],
                "options": {
                    "expireAfterSeconds": 10
                }
            }
        ]
    )

# optional, set new value
ModelA.update_expiration_value("field1", 20)

await init_collection()

if you want to set the 'expireAfterSeconds' only from method 'update_expiration_value', set it to '-1'. if you want skip the index changed, call method 'expiration_index_skip' before init_collection.

Indexes in replica set cluster

The replica set cluster has a specific behavior when creating indexes. If one of the nodes in the cluster is not reachable, the index creation will wait for the node to become available. See Index Builds in Replicated Environments. Change this behavior by setting the 'commit_quorum' option to 'majority'. See createIndexes.

Supported only from MongoDB version 4.4.

Example:

# import

# Initilize DB before calls db methods
AMPODatabase(url="mongodb://test")

# Pydantic Model
class ModelA(CollectionWorker):
    field1: str

    model_config = ORMConfig(
        orm_collection="test",
        orm_indexes=[
            {
                "keys": ["field1"],
                "commit_quorum": "majority",
            }
        ]
    )

await init_collection()

Relationships between documents

Embeded

It is supported by default. Just, you need create the embedded document as class of pydantic - 'BaseModel'. It will be stored into db as object.

Example:

from pydantic import BaseModel

class Embeded(BaseModel):
    name: str

class ModelA(CollectionWorker):
    field1: str
    field2: Embeded

    model_config = ORMConfig(
        orm_collection="test"
    )

Lock Record

It is a mechanism that allows you to retrieve a record with a lock. It is based on the findOneAndUpdate(). When the record is found, the field "lock_field" is set to True. And when the next search is made, this record will be skipped.

Example:

import datetime
from typing import Optional
from ampo import CollectionWorker, AMPODatabase, ORMConfig, init_collection

# Pydantic Model
class ModelA(CollectionWorker):
    field1: str
    lfield: bool = False
    field_dt_start: Optional[datetime.datetime] = None

    model_config = ORMConfig(
        orm_collection="test",
        orm_lock_record={
            "lock_field": "lfield",
            "lock_field_time_start": "field_dt_start",
        }
    )

await init_collection()

inst_a = ModelA("test", 123)
await inst_a.save()

inst_a = await ModelA.get_and_lock(field1="test")
# process
await inst_a.reset_lock()

# as context
async with ModelA.get_and_lock_context(field1="test") as inst_a:
    pass
    # process

# as context, version II
# timeout on wait
try:
  async with ModelA.get_lock_wait_context(
      filter={"field1": "test"}, timeout=10
  ) as inst_a:
      pass
except (TimeoutError, ValueError) as e:
    print("Error:", e)

Mehanism reset lock. If lock exist more than time, set 'lock_max_period_sec', lock will be reset. Default value is 15 minutes.

Hooks

Example:

import datetime
from typing import Optional
from ampo import CollectionWorker, AMPODatabase, ORMConfig, init_collection

# hooks
await def hook(obj, context: dict):
    print("Call hook for", obj)

# Pydantic Model
class ModelA(CollectionWorker):
    field1: str

    model_config = ORMConfig(
        orm_collection="test",
        orm_hooks={
            "pre_save": [hook],
            "post_save": [],
            "pre_delete": [],
            "post_delete": [],
        }
    )

a = ModelA(field1="test")
await a.save(context={"any": "any"})

Development

Style:

Run tests:

env TEST_MONGO_URL=mongodb://localhost/test pytest

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ampo-0.7.1.tar.gz (14.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ampo-0.7.1-py3-none-any.whl (13.7 kB view details)

Uploaded Python 3

File details

Details for the file ampo-0.7.1.tar.gz.

File metadata

  • Download URL: ampo-0.7.1.tar.gz
  • Upload date:
  • Size: 14.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for ampo-0.7.1.tar.gz
Algorithm Hash digest
SHA256 e2fb05f7ff02a60570bca1327180fd4fb4ded9761cb5e706e9ad002a768e2294
MD5 8e82fe0facf37ecb0d947e7b721cc31c
BLAKE2b-256 755762b001c6ebb986c62091c7bae01d0612f955f65b2fe540fd4ff71e53b847

See more details on using hashes here.

File details

Details for the file ampo-0.7.1-py3-none-any.whl.

File metadata

  • Download URL: ampo-0.7.1-py3-none-any.whl
  • Upload date:
  • Size: 13.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for ampo-0.7.1-py3-none-any.whl
Algorithm Hash digest
SHA256 05b2ce648281b85dee3b4e2294d3b4e84a0d0df2e90dda76c72e82a574f3acd4
MD5 69aa72db1519465909cc5fd63c787312
BLAKE2b-256 fabec1793ab3157d5ba8be2e1a4dbc2b56e1cd98a46a842238fd1c31ad4f2bc1

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page