Skip to main content

Broadcast MongoDB change streams to WebSocket, Redis, HTTP, and more

Project description

MongoDB Change Broadcaster

PyPI Version Python Versions

A real-time change stream broadcaster for MongoDB, supporting multiple delivery channels (WebSocket, Redis, HTTP, etc.) with extensible architecture.

Read the blog post on the implementation here.

Building a Versatile Data Streaming System with Broadcaster Package

Features

  • 📡 Listen to MongoDB change streams
  • 🚀 Built-in channels: WebSocket, Redis, HTTP, and Database Logging
  • 🔌 Extensible architecture for custom channels
  • ⚡ Async-first implementation
  • 🛠️ Configurable pipelines and filtering

Installation

pip install mongo-broadcaster

# Optional dependencies for specific channels:
pip install mongo_broadcaster[fastapi]  # WebSocket
pip install mongo_broadcaster[redis]    # Redis Pub/Sub support

Basic Usage

from mongo_broadcaster import (
    MongoChangeBroadcaster,
    BroadcasterConfig,
    CollectionConfig
)
from mongo_broadcaster.channels import WebSocketChannel

# Initialize with MongoDB connection
config = BroadcasterConfig(
    mongo_uri="mongodb://localhost:27017",
    collections=[
        CollectionConfig(
            collection_name="users",
            fields_to_watch=["name", "email"],
            recipient_identifier="fullDocument._id"
        )
     ]
)

broadcaster = MongoChangeBroadcaster(config)
broadcaster.add_channel(WebSocketChannel())

# Start listening (typically in your app startup)
await broadcaster.start()

Built-in Channels

Channel Description Ideal For
WebSocketChannel Real-time browser updates Live dashboards
RedisPubSubChannel Pub/Sub messaging Microservices
HTTPCallbackChannel Webhook notifications Third-party integrations
DatabaseChannel Persistent change logging Audit trails

Extending with Custom Channels

Implement your own channel by subclassing BaseChannel:

from mongo_broadcaster.channels.base import BaseChannel
from typing import Any, Dict


class CustomMQTTChannel(BaseChannel):
    def __init__(self, broker_url: str):
        self.broker_url = broker_url
        self.client = None

    async def connect(self):
	"""Initialize your connection"""
	self.client = await setup_mqtt_client(self.broker_url)

    async def send(self, recipient: str, message: Dict[str, Any]):
	"""Send to specific recipient"""
	await self.client.publish(f"changes/{recipient}", message)

    async def broadcast(self, message: Dict[str, Any]):
	"""Send to all subscribers"""
	await self.client.publish("changes/all", message)

    async def disconnect(self):
	"""Clean up resources"""
	await self.client.disconnect()

# Usage:
broadcaster.add_channel(CustomMQTTChannel("mqtt://localhost"))

Configuration Options

CollectionConfig

CollectionConfig(
    collection_name: str,
    database_name: Optional[str] = None,
	# Fields to include in change events
    fields_to_watch: List[str] = [],
	# Dot-notation path to identify recipients (e.g., "fullDocument._id")
    recipient_identifier: Optional[str] = None,
	# MongoDB change stream options
    change_stream_config: ChangeStreamConfig = ChangeStreamConfig()
)

Examples

FastAPI WebSocket Endpoint

from fastapi import FastAPI, WebSocket

app = FastAPI()
ws_channel = WebSocketChannel()


@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
    await ws_channel.connect(client_id, websocket)
    try:
        while True:
            await websocket.receive_text()
    except WebSocketDisconnect:
	await ws_channel.disconnect(client_id)

Please see the examples folder for more.

Contributing

To add new channels:

  1. Create a subclass of BaseChannel
  2. Implement required methods:
  • connect()
  • send()
  • broadcast()
  • disconnect()
  1. Submit a PR!

License

MIT

TODO

  • Write tests

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mongo_broadcaster-0.1.2.tar.gz (12.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mongo_broadcaster-0.1.2-py3-none-any.whl (15.6 kB view details)

Uploaded Python 3

File details

Details for the file mongo_broadcaster-0.1.2.tar.gz.

File metadata

  • Download URL: mongo_broadcaster-0.1.2.tar.gz
  • Upload date:
  • Size: 12.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.7

File hashes

Hashes for mongo_broadcaster-0.1.2.tar.gz
Algorithm Hash digest
SHA256 1ec84d6c92b58a5dbcdc2e387fca46d3176332aefc72184938e8ead12d4d0de9
MD5 ae6fa9471e73cbec51fbc8e62515a3c2
BLAKE2b-256 aa6a6767f15139aba8e160889901f2162f9b61e4ad1e027716bff1ab04b622e7

See more details on using hashes here.

File details

Details for the file mongo_broadcaster-0.1.2-py3-none-any.whl.

File metadata

File hashes

Hashes for mongo_broadcaster-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 bbcef27d4cf7ad818e926e3a701ba9360277c134a4e87adcb3836044bb46373b
MD5 6e69c50c85542867db16e65b298d544a
BLAKE2b-256 ec40994d842e218293a0e881990f8f5c0c96e8311f924591298c6072b431c1e1

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page