Skip to main content

Broadcast MongoDB change streams to WebSocket, Redis, HTTP, and more

Project description

MongoDB Change Broadcaster

PyPI Version Python Versions

A real-time change stream broadcaster for MongoDB, supporting multiple delivery channels (WebSocket, Redis, HTTP, etc.) with extensible architecture.

Read the blog post on the implementation here.

Building a Versatile Data Streaming System with Broadcaster Package

Features

  • 📡 Listen to MongoDB change streams
  • 🚀 Built-in channels: WebSocket, Redis, HTTP, and Database Logging
  • 🔌 Extensible architecture for custom channels
  • ⚡ Async-first implementation
  • 🛠️ Configurable pipelines and filtering

Installation

pip install mongo-broadcaster

# Optional dependencies for specific channels:
pip install mongo_broadcaster[fastapi]  # WebSocket
pip install mongo_broadcaster[redis]    # Redis Pub/Sub support

Basic Usage

from mongo_broadcaster import (
    MongoChangeBroadcaster,
    BroadcasterConfig,
    CollectionConfig
)
from mongo_broadcaster.channels import WebSocketChannel

# Initialize with MongoDB connection
config = BroadcasterConfig(
    mongo_uri="mongodb://localhost:27017",
    collections=[
        CollectionConfig(
            collection_name="users",
            fields_to_watch=["name", "email"],
            recipient_identifier="fullDocument._id"
        )
     ]
)

broadcaster = MongoChangeBroadcaster(config)
broadcaster.add_channel(WebSocketChannel())

# Start listening (typically in your app startup)
await broadcaster.start()

Built-in Channels

Channel Description Ideal For
WebSocketChannel Real-time browser updates Live dashboards
RedisPubSubChannel Pub/Sub messaging Microservices
HTTPCallbackChannel Webhook notifications Third-party integrations
DatabaseChannel Persistent change logging Audit trails

Extending with Custom Channels

Implement your own channel by subclassing BaseChannel:

from mongo_broadcaster.channels.base import BaseChannel
from typing import Any, Dict


class CustomMQTTChannel(BaseChannel):
    def __init__(self, broker_url: str):
        self.broker_url = broker_url
        self.client = None

    async def connect(self):
	"""Initialize your connection"""
	self.client = await setup_mqtt_client(self.broker_url)

    async def send(self, recipient: str, message: Dict[str, Any]):
	"""Send to specific recipient"""
	await self.client.publish(f"changes/{recipient}", message)

    async def broadcast(self, message: Dict[str, Any]):
	"""Send to all subscribers"""
	await self.client.publish("changes/all", message)

    async def disconnect(self):
	"""Clean up resources"""
	await self.client.disconnect()

# Usage:
broadcaster.add_channel(CustomMQTTChannel("mqtt://localhost"))

Configuration Options

CollectionConfig

CollectionConfig(
    collection_name: str,
    database_name: Optional[str] = None,
	# Fields to include in change events
    fields_to_watch: List[str] = [],
	# Dot-notation path to identify recipients (e.g., "fullDocument._id")
    recipient_identifier: Optional[str] = None,
	# MongoDB change stream options
    change_stream_config: ChangeStreamConfig = ChangeStreamConfig()
)

Examples

FastAPI WebSocket Endpoint

from fastapi import FastAPI, WebSocket

app = FastAPI()
ws_channel = WebSocketChannel()


@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
    await ws_channel.connect(client_id, websocket)
    try:
        while True:
            await websocket.receive_text()
    except WebSocketDisconnect:
	await ws_channel.disconnect(client_id)

Please see the examples folder for more.

Contributing

To add new channels:

  1. Create a subclass of BaseChannel
  2. Implement required methods:
  • connect()
  • send()
  • broadcast()
  • disconnect()
  1. Submit a PR!

License

MIT

TODO

  • Write tests

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mongo_broadcaster-0.1.6.tar.gz (12.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mongo_broadcaster-0.1.6-py3-none-any.whl (16.1 kB view details)

Uploaded Python 3

File details

Details for the file mongo_broadcaster-0.1.6.tar.gz.

File metadata

  • Download URL: mongo_broadcaster-0.1.6.tar.gz
  • Upload date:
  • Size: 12.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.7

File hashes

Hashes for mongo_broadcaster-0.1.6.tar.gz
Algorithm Hash digest
SHA256 4343f50039d27fcf39c0b414af5707fb1baaff58a69740bcfbf5b3a9a24402c6
MD5 abd38fd67d0c5143bcfd4ac195079159
BLAKE2b-256 c8019a8862cebf66bcd281ef84f964728aaa4db31ac1d9f4c8cad3c924cb4961

See more details on using hashes here.

File details

Details for the file mongo_broadcaster-0.1.6-py3-none-any.whl.

File metadata

File hashes

Hashes for mongo_broadcaster-0.1.6-py3-none-any.whl
Algorithm Hash digest
SHA256 6a655a1889f55c4720526f6404ff7c1c11bb7954e6b513566ca3889bf5a27f7e
MD5 dd067a824fe5345507c2d8ec38c6ba74
BLAKE2b-256 6009ddce248fb44e9075bc7d39707a3e1e98e4322c23d101648f5ed7efbad42f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page