Skip to main content

Broadcast MongoDB change streams to WebSocket, Redis, HTTP, and more

Project description

MongoDB Change Broadcaster

PyPI Version Python Versions

A real-time change stream broadcaster for MongoDB, supporting multiple delivery channels (WebSocket, Redis, HTTP, etc.) with extensible architecture.

Read the blog post on the implementation here.

Building a Versatile Data Streaming System with Broadcaster Package

Features

  • 📡 Listen to MongoDB change streams
  • 🚀 Built-in channels: WebSocket, Redis, HTTP, and Database Logging
  • 🔌 Extensible architecture for custom channels
  • ⚡ Async-first implementation
  • 🛠️ Configurable pipelines and filtering

Installation

pip install mongo-broadcaster

# Optional dependencies for specific channels:
pip install mongo_broadcaster[fastapi]  # WebSocket
pip install mongo_broadcaster[redis]    # Redis Pub/Sub support

Basic Usage

from mongo_broadcaster import (
    MongoChangeBroadcaster,
    BroadcasterConfig,
    CollectionConfig
)
from mongo_broadcaster.channels import WebSocketChannel

# Initialize with MongoDB connection
config = BroadcasterConfig(
    mongo_uri="mongodb://localhost:27017",
    collections=[
        CollectionConfig(
            collection_name="users",
            fields_to_watch=["name", "email"],
            recipient_identifier="fullDocument._id"
        )
     ]
)

broadcaster = MongoChangeBroadcaster(config)
broadcaster.add_channel(WebSocketChannel())

# Start listening (typically in your app startup)
await broadcaster.start()

Built-in Channels

Channel Description Ideal For
WebSocketChannel Real-time browser updates Live dashboards
RedisPubSubChannel Pub/Sub messaging Microservices
HTTPCallbackChannel Webhook notifications Third-party integrations
DatabaseChannel Persistent change logging Audit trails

Extending with Custom Channels

Implement your own channel by subclassing BaseChannel:

from mongo_broadcaster.channels.base import BaseChannel
from typing import Any, Dict


class CustomMQTTChannel(BaseChannel):
    def __init__(self, broker_url: str):
        self.broker_url = broker_url
        self.client = None

    async def connect(self):
	"""Initialize your connection"""
	self.client = await setup_mqtt_client(self.broker_url)

    async def send(self, recipient: str, message: Dict[str, Any]):
	"""Send to specific recipient"""
	await self.client.publish(f"changes/{recipient}", message)

    async def broadcast(self, message: Dict[str, Any]):
	"""Send to all subscribers"""
	await self.client.publish("changes/all", message)

    async def disconnect(self):
	"""Clean up resources"""
	await self.client.disconnect()

# Usage:
broadcaster.add_channel(CustomMQTTChannel("mqtt://localhost"))

Configuration Options

CollectionConfig

CollectionConfig(
    collection_name: str,
    database_name: Optional[str] = None,
	# Fields to include in change events
    fields_to_watch: List[str] = [],
	# Dot-notation path to identify recipients (e.g., "fullDocument._id")
    recipient_identifier: Optional[str] = None,
	# MongoDB change stream options
    change_stream_config: ChangeStreamConfig = ChangeStreamConfig()
)

Examples

FastAPI WebSocket Endpoint

from fastapi import FastAPI, WebSocket

app = FastAPI()
ws_channel = WebSocketChannel()


@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
    await ws_channel.connect(client_id, websocket)
    try:
        while True:
            await websocket.receive_text()
    except WebSocketDisconnect:
	await ws_channel.disconnect(client_id)

Please see the examples folder for more.

Contributing

To add new channels:

  1. Create a subclass of BaseChannel
  2. Implement required methods:
  • connect()
  • send()
  • broadcast()
  • disconnect()
  1. Submit a PR!

License

MIT

TODO

  • Write tests

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mongo_broadcaster-0.1.5.tar.gz (12.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mongo_broadcaster-0.1.5-py3-none-any.whl (16.0 kB view details)

Uploaded Python 3

File details

Details for the file mongo_broadcaster-0.1.5.tar.gz.

File metadata

  • Download URL: mongo_broadcaster-0.1.5.tar.gz
  • Upload date:
  • Size: 12.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.7

File hashes

Hashes for mongo_broadcaster-0.1.5.tar.gz
Algorithm Hash digest
SHA256 f80dea341c812259b6bc695d30eaa5c62b3443c5e3d752407a296dbc17f90534
MD5 06e62d0416464ec4f464159899531fcc
BLAKE2b-256 aa9dd9947ce2e1b6c536298f79ac0d7a29443c241a267f2a96dedb6d12b511ea

See more details on using hashes here.

File details

Details for the file mongo_broadcaster-0.1.5-py3-none-any.whl.

File metadata

File hashes

Hashes for mongo_broadcaster-0.1.5-py3-none-any.whl
Algorithm Hash digest
SHA256 40114c012fd7fa498cc4f67a6c7dd80946395aa98d1290380c963ebbd678e20f
MD5 a57238b4d2d81d5f08593662d2bdeb12
BLAKE2b-256 7693df9ccd1e3dd532d6496d810a99e403386e4790b08e83e8b1a5e276b77cde

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page