Skip to main content

Python library async ORM for mongodb. Object as pyDantic

Project description

Aio Mongo Pydantic ORM (ampo)

Features:

  • The ORM is based on pydantic
  • Asynchronous
  • Many to many relationships
  • One to many relationships
  • Support MongoDB from 4.2
  • Python 3.8+

Usage

All example run into:

python -m asyncio

Create and get object

from ampo import CollectionWorker, AMPODatabase, ORMConfig, init_collection

# Initilize DB before calls db methods
AMPODatabase(url="mongodb://test")

# Pydantic Model
class ModelA(CollectionWorker):
    field1: str
    field2: int

    model_config = ORMConfig(
        orm_collection="test"
    )

await init_collection()

inst_a = ModelA("test", 123)
await inst_a.save()

# Get object
inst_a = await ModelA.get(field1="test")

Get all objects

Support additional options for this method. See find().

Example:

from ampo import CollectionWorker, AMPODatabase, ORMConfig, init_collection

# Initilize DB before calls db methods
AMPODatabase(url="mongodb://test")

# Pydantic Model
class ModelA(CollectionWorker):
    field1: str
    field2: int
    
    model_config = ORMConfig(
        orm_collection="test"
    )

await init_collection()

inst_a = ModelA("test", 123)
await inst_a.save()

# Get all objects
insts = await ModelA.get_all()

# Get all objects, by filter, and addional options
insts = await ModelA.get_all(
    filter={"field1": "test"},
    sort=[("field2", 1)],
    limit=10,
    skip=0
)

Id

For search by 'id' usages in filter '_id' or 'id' name.

For type ObjectId, use 'PydanticObjectId'.

Example:

from bson.objectid import ObjectId
from ampo import (
    CollectionWorker,
    AMPODatabase,
    ORMConfig,
    init_collection,
    PydanticObjectId
)


# Initilize DB before calls db methods
AMPODatabase(url="mongodb://test")

# Pydantic Model
class ModelA(CollectionWorker):
    field1: PydanticObjectId
    
    model_config = ORMConfig(orm_collection="test")

await init_collection()

inst_a = ModelA(field1=ObjectId("63538168e94461001215836a"))
await inst_a.save()

Indexes

# import

# Initilize DB before calls db methods
AMPODatabase(url="mongodb://test")

# Pydantic Model
class ModelA(CollectionWorker):
    field1: str

    model_config = ORMConfig(
        orm_collection="test",
        orm_indexes=[
            {
                "keys": ["field1"],
                "options": {
                    "unique": True
                }
            }
        ]
    )

# This method create indexes
# Call only one time
await init_collection()

Suppport options:

  • unique
  • expireAfterSeconds

Keys is list of fields.

TTL Index

It works only with single field (TTL Indexes).

You should set the option 'expireAfterSeconds', and field 'keys' should have only single field.

Example:

# import

# Initilize DB before calls db methods
AMPODatabase(url="mongodb://test")

# Pydantic Model
class ModelA(CollectionWorker):
    field1: datetime

    model_config = ORMConfig(
        orm_collection="test",
        orm_indexes=[
            {
                "keys": ["field1"],
                "options": {
                    "expireAfterSeconds": 10
                }
            }
        ]
    )

# optional, set new value
ModelA.update_expiration_value("field1", 20)

await init_collection()

if you want to set the 'expireAfterSeconds' only from method 'update_expiration_value', set it to '-1'. if you want skip the index changed, call method 'expiration_index_skip' before init_collection.

Indexes in replica set cluster

The replica set cluster has a specific behavior when creating indexes. If one of the nodes in the cluster is not reachable, the index creation will wait for the node to become available. See Index Builds in Replicated Environments. Change this behavior by setting the 'commit_quorum' option to 'majority'. See createIndexes.

Supported only from MongoDB version 4.4.

Example:

# import

# Initilize DB before calls db methods
AMPODatabase(url="mongodb://test")

# Pydantic Model
class ModelA(CollectionWorker):
    field1: str

    model_config = ORMConfig(
        orm_collection="test",
        orm_indexes=[
            {
                "keys": ["field1"],
                "commit_quorum": "majority",
            }
        ]
    )

await init_collection()

Relationships between documents

Embeded

It is supported by default. Just, you need create the embedded document as class of pydantic - 'BaseModel'. It will be stored into db as object.

Example:

from pydantic import BaseModel

class Embeded(BaseModel):
    name: str

class ModelA(CollectionWorker):
    field1: str
    field2: Embeded

    model_config = ORMConfig(
        orm_collection="test"
    )

Lock Record

It is a mechanism that allows you to retrieve a record with a lock. It is based on the findOneAndUpdate(). When the record is found, the field "lock_field" is set to True. And when the next search is made, this record will be skipped.

Example:

import datetime
from typing import Optional
from ampo import CollectionWorker, AMPODatabase, ORMConfig, init_collection

# Pydantic Model
class ModelA(CollectionWorker):
    field1: str
    lfield: bool = False
    field_dt_start: Optional[datetime.datetime] = None

    model_config = ORMConfig(
        orm_collection="test",
        orm_lock_record={
            "lock_field": "lfield",
            "lock_field_time_start": "field_dt_start",
        }
    )

await init_collection()

inst_a = ModelA("test", 123)
await inst_a.save()

inst_a = await ModelA.get_and_lock(field1="test")
# process
await inst_a.reset_lock()

# as context
async with ModelA.get_and_lock_context(field1="test") as inst_a:
    pass
    # process

# as context, version II
# timeout on wait
try:
  async with ModelA.get_lock_wait_context(
      filter={"field1": "test"}, timeout=10
  ) as inst_a:
      pass
except (TimeoutError, ValueError) as e:
    print("Error:", e)

Mehanism reset lock. If lock exist more than time, set 'lock_max_period_sec', lock will be reset. Default value is 15 minutes.

Hooks

Example:

import datetime
from typing import Optional
from ampo import CollectionWorker, AMPODatabase, ORMConfig, init_collection

# hooks
await def hook(obj, context: dict):
    print("Call hook for", obj)

# Pydantic Model
class ModelA(CollectionWorker):
    field1: str

    model_config = ORMConfig(
        orm_collection="test",
        orm_hooks={
            "pre_save": [hook],
            "post_save": [],
            "pre_delete": [],
            "post_delete": [],
        }
    )

a = ModelA(field1="test")
await a.save(context={"any": "any"})

Development

Style:

Run tests:

env TEST_MONGO_URL=mongodb://localhost/test pytest

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ampo-0.6.0.tar.gz (13.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ampo-0.6.0-py3-none-any.whl (13.5 kB view details)

Uploaded Python 3

File details

Details for the file ampo-0.6.0.tar.gz.

File metadata

  • Download URL: ampo-0.6.0.tar.gz
  • Upload date:
  • Size: 13.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for ampo-0.6.0.tar.gz
Algorithm Hash digest
SHA256 f2b852a18926567443b735964b61a8e4560d815405a2d1f86e4c6f31e61ef487
MD5 6e1f33b26587ef24a4f54ae5bddb9d6c
BLAKE2b-256 7a7d3d4fe0791b2175648c2ea4e77e4df344c0cbcfe8175f7ffaaf033a6e5e48

See more details on using hashes here.

File details

Details for the file ampo-0.6.0-py3-none-any.whl.

File metadata

  • Download URL: ampo-0.6.0-py3-none-any.whl
  • Upload date:
  • Size: 13.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for ampo-0.6.0-py3-none-any.whl
Algorithm Hash digest
SHA256 f270f402b288ba879596e823d84236de0a0d501b5ee4f428fb4302ad4ea386ea
MD5 00258cd6201546e193025ed1779c7db1
BLAKE2b-256 15febd09456d3b515fbcdc491edbd4eeb531d9414fe216c94e759c7af5186ea1

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page