Skip to main content

No project description provided

Project description

Mongo Observer

Mongo Observer aims to provide an easy way to asynchronously subscribe to state change events on a given mongo collection.

Installation

pip install mongo_observer

Usage

The Observer

The Observer handles state change observation on a given collection and dispatches events, delegating the responsibility to a handler.

Handlers

Handlers are objects that implement the OperationHandler protocol and implement the following abstract methods:

async def on_insert(self, operation: Dict[str, Any])

Where operation is a dict containing a document corresponding to an operation on oplog. It will contain the following keys:

  • ts: Timestamp of the operationa
  • h: An unique signed long identifier of the operation
  • op: A character representing the type of the operation
  • ns: A namespace string formed with the concatenation of ‘database.collection’
  • o: The inserted document

async def on_update(self, operation: Dict[str, Any])

Where operation is a dict containing a document corresponding to an operation on oplog. It will contain the following keys:

  • ts: Timestamp of the operation
  • h: An unique signed long identifier of the operation
  • op: A character representing the type of the operation
  • ns: A namespace string formed with the concatenation of ‘database.collection’
  • o: The operation data performed on the document
  • o2: A dict with a single _id key of the document to be updated

async def on_delete(self, operation: Dict[str, Any])

Where operation is a dict containing a document corresponding to an operation on oplog. It will contain the following keys:

  • ts: Timestamp of the operation
  • h: An unique signed long identifier of the operation
  • op: A character representing the type of the operation
  • ns: A namespace string formed with the concatenation of ‘database.collection’
  • o: A dict with a single _id key, of the ddeleted document

ReactiveCollection

A ReactiveCollection is a read-only, in-memory, non-persistent replica of a remote mongo collection. It reacts to state changes caused by write operations (inserts, updates and deletes) on the remote collection.

import asyncio

from motor.motor_asyncio import AsyncIOMotorClient

from mongo_observer.observer import Observer
from mongo_observer.operation_handlers import ReactiveCollection


async def run(loop):
    client = AsyncIOMotorClient('mongodb://127.0.0.1')

    collection_to_observe = client['your_db']['your_collection']
    reactive_collection = await ReactiveCollection.init_async(collection_to_observe)

    observer = await Observer.init_async(oplog=client['local']['oplog.rs'],
                                         operation_handler=reactive_collection,
                                         namespace_filter='your_db.your_collection')

    loop.create_task(observer.observe_changes())


loop = asyncio.get_event_loop()
loop.run_until_complete(run(loop))
loop.run_forever()

Once ReactiveCollection.init_async runs, it will perform an initial sync with the remote collection. We then connect it as the operation_handler of an Observer, and we are ready to observe_changes to the current state of the collection.

ReactivePartialCollection

to-do

Project details


Release history Release notifications

This version
History Node

0.0.1

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Filename, size & hash SHA256 hash help File type Python version Upload date
mongo_observer-0.0.1.tar.gz (4.9 kB) Copy SHA256 hash SHA256 Source None

Supported by

Elastic Elastic Search Pingdom Pingdom Monitoring Google Google BigQuery Sentry Sentry Error logging AWS AWS Cloud computing DataDog DataDog Monitoring Fastly Fastly CDN SignalFx SignalFx Supporter DigiCert DigiCert EV certificate StatusPage StatusPage Status page