Skip to main content

Python library async ORM for mongodb. Object as pyDantic

Project description

Aio Mongo Pydantic ORM (ampo)

Features:

  • The ORM is based on pydantic
  • Asynchronous
  • Many to many relationships
  • One to many relationships
  • Support MongoDB from 4.2
  • Python 3.8+

Usage

All example run into:

python -m asyncio

Create and get object

from ampo import CollectionWorker, AMPODatabase, ORMConfig, init_collection

# Initilize DB before calls db methods
AMPODatabase(url="mongodb://test")

# Pydantic Model
class ModelA(CollectionWorker):
    field1: str
    field2: int

    model_config = ORMConfig(
        orm_collection="test"
    )

await init_collection()

inst_a = ModelA("test", 123)
await inst_a.save()

# Get object
inst_a = await ModelA.get(field1="test")

Get all objects

Support additional options for this method. See find().

Example:

from ampo import CollectionWorker, AMPODatabase, ORMConfig, init_collection

# Initilize DB before calls db methods
AMPODatabase(url="mongodb://test")

# Pydantic Model
class ModelA(CollectionWorker):
    field1: str
    field2: int
    
    model_config = ORMConfig(
        orm_collection="test"
    )

await init_collection()

inst_a = ModelA("test", 123)
await inst_a.save()

# Get all objects
insts = await ModelA.get_all()

# Get all objects, by filter, and addional options
insts = await ModelA.get_all(
    filter={"field1": "test"},
    sort=[("field2", 1)],
    limit=10,
    skip=0
)

Id

For search by 'id' usages in filter '_id' or 'id' name.

For type ObjectId, use 'PydanticObjectId'.

Example:

from bson.objectid import ObjectId
from ampo import (
    CollectionWorker,
    AMPODatabase,
    ORMConfig,
    init_collection,
    PydanticObjectId
)


# Initilize DB before calls db methods
AMPODatabase(url="mongodb://test")

# Pydantic Model
class ModelA(CollectionWorker):
    field1: PydanticObjectId
    
    model_config = ORMConfig(orm_collection="test")

await init_collection()

inst_a = ModelA(field1=ObjectId("63538168e94461001215836a"))
await inst_a.save()

Indexes

# import

# Initilize DB before calls db methods
AMPODatabase(url="mongodb://test")

# Pydantic Model
class ModelA(CollectionWorker):
    field1: str

    model_config = ORMConfig(
        orm_collection="test",
        orm_indexes=[
            {
                "keys": ["field1"],
                "options": {
                    "unique": True
                }
            }
        ]
    )

# This method create indexes
# Call only one time
await init_collection()

Suppport options:

  • unique
  • expireAfterSeconds

Keys is list of fields.

TTL Index

It works only with single field (TTL Indexes).

You should set the option 'expireAfterSeconds', and field 'keys' should have only single field.

Example:

# import

# Initilize DB before calls db methods
AMPODatabase(url="mongodb://test")

# Pydantic Model
class ModelA(CollectionWorker):
    field1: datetime

    model_config = ORMConfig(
        orm_collection="test",
        orm_indexes=[
            {
                "keys": ["field1"],
                "options": {
                    "expireAfterSeconds": 10
                }
            }
        ]
    )

# optional, set new value
ModelA.update_expiration_value("field1", 20)

await init_collection()

if you want to set the 'expireAfterSeconds' only from method 'update_expiration_value', set it to '-1'. if you want skip the index changed, call method 'expiration_index_skip' before init_collection.

Indexes in replica set cluster

The replica set cluster has a specific behavior when creating indexes. If one of the nodes in the cluster is not reachable, the index creation will wait for the node to become available. See Index Builds in Replicated Environments. Change this behavior by setting the 'commit_quorum' option to 'majority'. See createIndexes.

Supported only from MongoDB version 4.4.

Example:

# import

# Initilize DB before calls db methods
AMPODatabase(url="mongodb://test")

# Pydantic Model
class ModelA(CollectionWorker):
    field1: str

    model_config = ORMConfig(
        orm_collection="test",
        orm_indexes=[
            {
                "keys": ["field1"],
                "commit_quorum": "majority",
            }
        ]
    )

await init_collection()

Relationships between documents

Embeded

It is supported by default. Just, you need create the embedded document as class of pydantic - 'BaseModel'. It will be stored into db as object.

Example:

from pydantic import BaseModel

class Embeded(BaseModel):
    name: str

class ModelA(CollectionWorker):
    field1: str
    field2: Embeded

    model_config = ORMConfig(
        orm_collection="test"
    )

Lock Record

It is a mechanism that allows you to retrieve a record with a lock. It is based on the findOneAndUpdate(). When the record is found, the field "lock_field" is set to True. And when the next search is made, this record will be skipped.

Example:

import datetime
from typing import Optional
from ampo import CollectionWorker, AMPODatabase, ORMConfig, init_collection

# Pydantic Model
class ModelA(CollectionWorker):
    field1: str
    lfield: bool = False
    field_dt_start: Optional[datetime.datetime] = None

    model_config = ORMConfig(
        orm_collection="test",
        orm_lock_record={
            "lock_field": "lfield",
            "lock_field_time_start": "field_dt_start",
        }
    )

await init_collection()

inst_a = ModelA("test", 123)
await inst_a.save()

inst_a = await ModelA.get_and_lock(field1="test")
# process
await inst_a.reset_lock()

# as context
async with ModelA.get_and_lock_context(field1="test") as inst_a:
    pass
    # process

# as context, version II
# timeout on wait
try:
  async with ModelA.get_lock_wait_context(
      filter={"field1": "test"}, timeout=10
  ) as inst_a:
      pass
except (TimeoutError, ValueError) as e:
    print("Error:", e)

Mehanism reset lock. If lock exist more than time, set 'lock_max_period_sec', lock will be reset. Default value is 15 minutes.

Hooks

Example:

import datetime
from typing import Optional
from ampo import CollectionWorker, AMPODatabase, ORMConfig, init_collection

# hooks
await def hook(obj, context: dict):
    print("Call hook for", obj)

# Pydantic Model
class ModelA(CollectionWorker):
    field1: str

    model_config = ORMConfig(
        orm_collection="test",
        orm_hooks={
            "pre_save": [hook],
            "post_save": [],
            "pre_delete": [],
            "post_delete": [],
        }
    )

a = ModelA(field1="test")
await a.save(context={"any": "any"})

Development

Style:

Run tests:

env TEST_MONGO_URL=mongodb://localhost/test pytest

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ampo-0.4.1.tar.gz (12.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ampo-0.4.1-py3-none-any.whl (12.1 kB view details)

Uploaded Python 3

File details

Details for the file ampo-0.4.1.tar.gz.

File metadata

  • Download URL: ampo-0.4.1.tar.gz
  • Upload date:
  • Size: 12.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for ampo-0.4.1.tar.gz
Algorithm Hash digest
SHA256 27946e3fc290e913a187e3ab4c5a80fa2f74a80d43626476f1f88d9953e6d225
MD5 ec2cdc2d4b41363e699e4e9bcc7487b5
BLAKE2b-256 aece058bddf4345eb5dd3322217b1be09e1cc6a0e43d31de5c064673c9bbcead

See more details on using hashes here.

File details

Details for the file ampo-0.4.1-py3-none-any.whl.

File metadata

  • Download URL: ampo-0.4.1-py3-none-any.whl
  • Upload date:
  • Size: 12.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for ampo-0.4.1-py3-none-any.whl
Algorithm Hash digest
SHA256 013bcc8c58c31223d5b59d0f39dcc339189d2e9388f5cb38a7f1e73a7ace55d2
MD5 3e5087a1f3f877e5adb4c38ac0fe1783
BLAKE2b-256 2a5ac23c290843706bd4563ab97f75b675e1ea2aada542a13817d436d47108c9

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page