Skip to main content

No project description provided

Project description

Mongo Observer

Mongo Observer aims to provide an easy way to asynchronously subscribe to state change events on a given mongo collection.

Installation

pip install mongo_observer

Usage

The Observer

The Observer handles state change observation on a given collection and dispatches events, delegating the responsibility to a handler.

Handlers

Handlers are objects that implement the OperationHandler protocol and implement the following abstract methods:

async def on_insert(self, operation: Dict[str, Any])

Where operation is a dict containing a document corresponding to an operation on oplog. It will contain the following keys:

  • ts: Timestamp of the operationa

  • h: An unique signed long identifier of the operation

  • op: A character representing the type of the operation

  • ns: A namespace string formed with the concatenation of ‘database.collection’

  • o: The inserted document

async def on_update(self, operation: Dict[str, Any])

Where operation is a dict containing a document corresponding to an operation on oplog. It will contain the following keys:

  • ts: Timestamp of the operation

  • h: An unique signed long identifier of the operation

  • op: A character representing the type of the operation

  • ns: A namespace string formed with the concatenation of ‘database.collection’

  • o: The operation data performed on the document

  • o2: A dict with a single _id key of the document to be updated

async def on_delete(self, operation: Dict[str, Any])

Where operation is a dict containing a document corresponding to an operation on oplog. It will contain the following keys:

  • ts: Timestamp of the operation

  • h: An unique signed long identifier of the operation

  • op: A character representing the type of the operation

  • ns: A namespace string formed with the concatenation of ‘database.collection’

  • o: A dict with a single _id key, of the ddeleted document

ReactiveCollection

A ReactiveCollection is a read-only, in-memory, non-persistent replica of a remote mongo collection. It reacts to state changes caused by write operations (inserts, updates and deletes) on the remote collection.

import asyncio

from motor.motor_asyncio import AsyncIOMotorClient

from mongo_observer.observer import Observer
from mongo_observer.operation_handlers import ReactiveCollection


async def run(loop):
    client = AsyncIOMotorClient('mongodb://127.0.0.1')

    collection_to_observe = client['your_db']['your_collection']
    reactive_collection = await ReactiveCollection.init_async(collection_to_observe)

    observer = await Observer.init_async(oplog=client['local']['oplog.rs'],
                                         operation_handler=reactive_collection,
                                         namespace_filter='your_db.your_collection')

    loop.create_task(observer.observe_changes())


loop = asyncio.get_event_loop()
loop.run_until_complete(run(loop))
loop.run_forever()

Once ReactiveCollection.init_async runs, it will perform an initial sync with the remote collection. We then connect it as the operation_handler of an Observer, and we are ready to observe_changes to the current state of the collection.

ReactivePartialCollection

to-do

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mongo_observer-0.0.1.tar.gz (4.9 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page