Skip to main content

Broadcast MongoDB change streams to WebSocket, Redis, HTTP, and more

Project description

MongoDB Change Broadcaster

PyPI Version Python Versions

A real-time change stream broadcaster for MongoDB, supporting multiple delivery channels (WebSocket, Redis, HTTP, etc.) with extensible architecture.

Read the blog post on the implementation here.

Building a Versatile Data Streaming System with Broadcaster Package

Features

  • 📡 Listen to MongoDB change streams
  • 🚀 Built-in channels: WebSocket, Redis, HTTP, and Database Logging
  • 🔌 Extensible architecture for custom channels
  • ⚡ Async-first implementation
  • 🛠️ Configurable pipelines and filtering

Installation

pip install mongo-broadcaster

# Optional dependencies for specific channels:
pip install mongo_broadcaster[fastapi]  # WebSocket
pip install mongo_broadcaster[redis]    # Redis Pub/Sub support

Basic Usage

from mongo_broadcaster import (
    MongoChangeBroadcaster,
    BroadcasterConfig,
    CollectionConfig
)
from mongo_broadcaster.channels import WebSocketChannel

# Initialize with MongoDB connection
config = BroadcasterConfig(
    mongo_uri="mongodb://localhost:27017",
    collections=[
        CollectionConfig(
            collection_name="users",
            fields_to_watch=["name", "email"],
            recipient_identifier="fullDocument._id"
        )
     ]
)

broadcaster = MongoChangeBroadcaster(config)
broadcaster.add_channel(WebSocketChannel())

# Start listening (typically in your app startup)
await broadcaster.start()

Built-in Channels

Channel Description Ideal For
WebSocketChannel Real-time browser updates Live dashboards
RedisPubSubChannel Pub/Sub messaging Microservices
HTTPCallbackChannel Webhook notifications Third-party integrations
DatabaseChannel Persistent change logging Audit trails

Extending with Custom Channels

Implement your own channel by subclassing BaseChannel:

from mongo_broadcaster.channels.base import BaseChannel
from typing import Any, Dict


class CustomMQTTChannel(BaseChannel):
    def __init__(self, broker_url: str):
        self.broker_url = broker_url
        self.client = None

    async def connect(self):
	"""Initialize your connection"""
	self.client = await setup_mqtt_client(self.broker_url)

    async def send(self, recipient: str, message: Dict[str, Any]):
	"""Send to specific recipient"""
	await self.client.publish(f"changes/{recipient}", message)

    async def broadcast(self, message: Dict[str, Any]):
	"""Send to all subscribers"""
	await self.client.publish("changes/all", message)

    async def disconnect(self):
	"""Clean up resources"""
	await self.client.disconnect()

# Usage:
broadcaster.add_channel(CustomMQTTChannel("mqtt://localhost"))

Configuration Options

CollectionConfig

CollectionConfig(
    collection_name: str,
    database_name: Optional[str] = None,
	# Fields to include in change events
    fields_to_watch: List[str] = [],
	# Dot-notation path to identify recipients (e.g., "fullDocument._id")
    recipient_identifier: Optional[str] = None,
	# MongoDB change stream options
    change_stream_config: ChangeStreamConfig = ChangeStreamConfig()
)

Examples

FastAPI WebSocket Endpoint

from fastapi import FastAPI, WebSocket

app = FastAPI()
ws_channel = WebSocketChannel()


@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
    await ws_channel.connect(client_id, websocket)
    try:
        while True:
            await websocket.receive_text()
    except WebSocketDisconnect:
	await ws_channel.disconnect(client_id)

Please see the examples folder for more.

Contributing

To add new channels:

  1. Create a subclass of BaseChannel
  2. Implement required methods:
  • connect()
  • send()
  • broadcast()
  • disconnect()
  1. Submit a PR!

License

MIT

TODO

  • Write tests

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mongo_broadcaster-0.1.4.tar.gz (12.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mongo_broadcaster-0.1.4-py3-none-any.whl (16.0 kB view details)

Uploaded Python 3

File details

Details for the file mongo_broadcaster-0.1.4.tar.gz.

File metadata

  • Download URL: mongo_broadcaster-0.1.4.tar.gz
  • Upload date:
  • Size: 12.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.7

File hashes

Hashes for mongo_broadcaster-0.1.4.tar.gz
Algorithm Hash digest
SHA256 f3b33c9483255f6541ffa4f10e9addb7e8ec3a89a9eb79a0e936b989cd2e0dfa
MD5 d02eb54f517dd4fc8693e15c76a5d71e
BLAKE2b-256 f10ac7dfc0661ec86b066e9e3c4a0257ddfbef5ad6b35aa2d71e67f50638c9bb

See more details on using hashes here.

File details

Details for the file mongo_broadcaster-0.1.4-py3-none-any.whl.

File metadata

File hashes

Hashes for mongo_broadcaster-0.1.4-py3-none-any.whl
Algorithm Hash digest
SHA256 119df2363d03b68d07f8e8070be3b643c64ecdb7848ce8cc29a7d87104eed87a
MD5 e8b5680c01afe5331dd5a59bd85d0503
BLAKE2b-256 dd0cb5a101f0313a03b4f0a987e5784af4f24b27329329c7b67c1ba79672cced

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page