Skip to main content

Python library async ORM for mongodb. Object as pyDantic

Project description

Aio Mongo Pydantic ORM (ampo)

Features:

  • The ORM is based on pydantic
  • Asynchronous
  • Many to many relationships
  • One to many relationships
  • Support MongoDB from 4.2
  • Python 3.8+

Usage

All example run into:

python -m asyncio

Create and get object

from ampo import CollectionWorker, AMPODatabase, ORMConfig, init_collection

# Initilize DB before calls db methods
AMPODatabase(url="mongodb://test")

# Pydantic Model
class ModelA(CollectionWorker):
    field1: str
    field2: int

    model_config = ORMConfig(
        orm_collection="test"
    )

await init_collection()

inst_a = ModelA("test", 123)
await inst_a.save()

# Get object
inst_a = await ModelA.get(field1="test")

Get all objects

Support additional options for this method. See find().

Example:

from ampo import CollectionWorker, AMPODatabase, ORMConfig, init_collection

# Initilize DB before calls db methods
AMPODatabase(url="mongodb://test")

# Pydantic Model
class ModelA(CollectionWorker):
    field1: str
    field2: int
    
    model_config = ORMConfig(
        orm_collection="test"
    )

await init_collection()

inst_a = ModelA("test", 123)
await inst_a.save()

# Get all objects
insts = await ModelA.get_all()

# Get all objects, by filter, and addional options
insts = await ModelA.get_all(
    filter={"field1": "test"},
    sort=[("field2", 1)],
    limit=10,
    skip=0
)

Id

For search by 'id' usages in filter '_id' or 'id' name.

For type ObjectId, use 'PydanticObjectId'.

Example:

from bson.objectid import ObjectId
from ampo import (
    CollectionWorker,
    AMPODatabase,
    ORMConfig,
    init_collection,
    PydanticObjectId
)


# Initilize DB before calls db methods
AMPODatabase(url="mongodb://test")

# Pydantic Model
class ModelA(CollectionWorker):
    field1: PydanticObjectId
    
    model_config = ORMConfig(orm_collection="test")

await init_collection()

inst_a = ModelA(field1=ObjectId("63538168e94461001215836a"))
await inst_a.save()

Indexes

# import

# Initilize DB before calls db methods
AMPODatabase(url="mongodb://test")

# Pydantic Model
class ModelA(CollectionWorker):
    field1: str

    model_config = ORMConfig(
        orm_collection="test",
        orm_indexes=[
            {
                "keys": ["field1"],
                "options": {
                    "unique": True
                }
            }
        ]
    )

# This method create indexes
# Call only one time
await init_collection()

Suppport options:

  • unique
  • expireAfterSeconds

Keys is list of fields.

TTL Index

It works only with single field (TTL Indexes).

You should set the option 'expireAfterSeconds', and field 'keys' should have only single field.

Example:

# import

# Initilize DB before calls db methods
AMPODatabase(url="mongodb://test")

# Pydantic Model
class ModelA(CollectionWorker):
    field1: datetime

    model_config = ORMConfig(
        orm_collection="test",
        orm_indexes=[
            {
                "keys": ["field1"],
                "options": {
                    "expireAfterSeconds": 10
                }
            }
        ]
    )

# optional, set new value
ModelA.update_expiration_value("field1", 20)

await init_collection()

if you want to set the 'expireAfterSeconds' only from method 'update_expiration_value', set it to '-1'. if you want skip the index changed, call method 'expiration_index_skip' before init_collection.

Indexes in replica set cluster

The replica set cluster has a specific behavior when creating indexes. If one of the nodes in the cluster is not reachable, the index creation will wait for the node to become available. See Index Builds in Replicated Environments. Change this behavior by setting the 'commit_quorum' option to 'majority'. See createIndexes.

Supported only from MongoDB version 4.4.

Example:

# import

# Initilize DB before calls db methods
AMPODatabase(url="mongodb://test")

# Pydantic Model
class ModelA(CollectionWorker):
    field1: str

    model_config = ORMConfig(
        orm_collection="test",
        orm_indexes=[
            {
                "keys": ["field1"],
                "commit_quorum": "majority",
            }
        ]
    )

await init_collection()

Relationships between documents

Embeded

It is supported by default. Just, you need create the embedded document as class of pydantic - 'BaseModel'. It will be stored into db as object.

Example:

from pydantic import BaseModel

class Embeded(BaseModel):
    name: str

class ModelA(CollectionWorker):
    field1: str
    field2: Embeded

    model_config = ORMConfig(
        orm_collection="test"
    )

Lock Record

It is a mechanism that allows you to retrieve a record with a lock. It is based on the findOneAndUpdate(). When the record is found, the field "lock_field" is set to True. And when the next search is made, this record will be skipped.

Example:

import datetime
from typing import Optional
from ampo import CollectionWorker, AMPODatabase, ORMConfig, init_collection

# Pydantic Model
class ModelA(CollectionWorker):
    field1: str
    lfield: bool = False
    field_dt_start: Optional[datetime.datetime] = None

    model_config = ORMConfig(
        orm_collection="test",
        orm_lock_record={
            "lock_field": "lfield",
            "lock_field_time_start": "field_dt_start",
        }
    )

await init_collection()

inst_a = ModelA("test", 123)
await inst_a.save()

inst_a = await ModelA.get_and_lock(field1="test")
# process
await inst_a.reset_lock()

# as context
async with ModelA.get_and_lock_context(field1="test") as inst_a:
    pass
    # process

# as context, version II
# timeout on wait
try:
  async with ModelA.get_lock_wait_context(
      filter={"field1": "test"}, timeout=10
  ) as inst_a:
      pass
except (TimeoutError, ValueError) as e:
    print("Error:", e)

Mehanism reset lock. If lock exist more than time, set 'lock_max_period_sec', lock will be reset. Default value is 15 minutes.

Hooks

Example:

import datetime
from typing import Optional
from ampo import CollectionWorker, AMPODatabase, ORMConfig, init_collection

# hooks
await def hook(obj, context: dict):
    print("Call hook for", obj)

# Pydantic Model
class ModelA(CollectionWorker):
    field1: str

    model_config = ORMConfig(
        orm_collection="test",
        orm_hooks={
            "pre_save": [hook],
            "post_save": [],
            "pre_delete": [],
            "post_delete": [],
        }
    )

a = ModelA(field1="test")
await a.save(context={"any": "any"})

Development

Style:

Run tests:

env TEST_MONGO_URL=mongodb://localhost/test pytest

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ampo-0.4.2.tar.gz (12.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ampo-0.4.2-py3-none-any.whl (12.1 kB view details)

Uploaded Python 3

File details

Details for the file ampo-0.4.2.tar.gz.

File metadata

  • Download URL: ampo-0.4.2.tar.gz
  • Upload date:
  • Size: 12.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for ampo-0.4.2.tar.gz
Algorithm Hash digest
SHA256 fd11f371ef600afa39082e95fb9e2f71717bb3e8a22b4ffe945989caa70b702a
MD5 1a9c4645731a3d8e9c9ebe2edc9d762a
BLAKE2b-256 4ff0722853e11e4ffca7c1031330c4d57ad80185c0dd52cc0d1567b4eb80e384

See more details on using hashes here.

File details

Details for the file ampo-0.4.2-py3-none-any.whl.

File metadata

  • Download URL: ampo-0.4.2-py3-none-any.whl
  • Upload date:
  • Size: 12.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for ampo-0.4.2-py3-none-any.whl
Algorithm Hash digest
SHA256 affcaa99a91fc0b8fd486d57ed499da9bd05af6dab4d690d786e3f41c7e9d0e0
MD5 58686e12f080df23f130af533f7eb062
BLAKE2b-256 42ba51cd81f7690f161632b6f149837bb2cb2e5bfed1a91212ee06b9ef46a1df

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page