Skip to main content

No project description provided

Project description

Mongo Observer

Mongo Observer aims to provide an easy way to asynchronously subscribe to state change events on a given mongo collection.

Installation

pip install mongo_observer

Usage

The Observer

The Observer handles state change observation on a given collection and dispatches events, delegating the responsibility to a handler.

Handlers

Handlers are objects that implement the OperationHandler protocol and implement the following abstract methods:

async def on_insert(self, operation: Dict[str, Any])

Where operation is a dict containing a document corresponding to an operation on oplog. It will contain the following keys:

  • ts: Timestamp of the operationa

  • h: An unique signed long identifier of the operation

  • op: A character representing the type of the operation

  • ns: A namespace string formed with the concatenation of ‘database.collection’

  • o: The inserted document

async def on_update(self, operation: Dict[str, Any])

Where operation is a dict containing a document corresponding to an operation on oplog. It will contain the following keys:

  • ts: Timestamp of the operation

  • h: An unique signed long identifier of the operation

  • op: A character representing the type of the operation

  • ns: A namespace string formed with the concatenation of ‘database.collection’

  • o: The operation data performed on the document

  • o2: A dict with a single _id key of the document to be updated

async def on_delete(self, operation: Dict[str, Any])

Where operation is a dict containing a document corresponding to an operation on oplog. It will contain the following keys:

  • ts: Timestamp of the operation

  • h: An unique signed long identifier of the operation

  • op: A character representing the type of the operation

  • ns: A namespace string formed with the concatenation of ‘database.collection’

  • o: A dict with a single _id key, of the ddeleted document

ReactiveCollection

A ReactiveCollection is a read-only, in-memory, non-persistent replica of a remote mongo collection. It reacts to state changes caused by write operations (inserts, updates and deletes) on the remote collection.

import asyncio

from motor.motor_asyncio import AsyncIOMotorClient

from mongo_observer.observer import Observer
from mongo_observer.operation_handlers import ReactiveCollection


async def run(loop):
    client = AsyncIOMotorClient('mongodb://127.0.0.1')

    collection_to_observe = client['your_db']['your_collection']
    reactive_collection = await ReactiveCollection.init_async(collection_to_observe)

    observer = await Observer.init_async(oplog=client['local']['oplog.rs'],
                                         operation_handler=reactive_collection,
                                         namespace_filter='your_db.your_collection')

    loop.create_task(observer.observe_changes())


loop = asyncio.get_event_loop()
loop.run_until_complete(run(loop))
loop.run_forever()

Once ReactiveCollection.init_async runs, it will perform an initial sync with the remote collection. We then connect it as the operation_handler of an Observer, and we are ready to observe_changes to the current state of the collection.

ReactivePartialCollection

to-do

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mongo_observer-0.0.1.tar.gz (4.9 kB view details)

Uploaded Source

File details

Details for the file mongo_observer-0.0.1.tar.gz.

File metadata

File hashes

Hashes for mongo_observer-0.0.1.tar.gz
Algorithm Hash digest
SHA256 bf948966477feb8e73364a5d47862f96ece3ca0e6955778a7f07ae1c0e13be8a
MD5 f3a4f36e2e784e0814fc7990a831c1c0
BLAKE2b-256 b917b13d50987b88b4520624514b305acfa9670161767086aa5fd94303991115

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page