Skip to main content

Broadcast MongoDB change streams to WebSocket, Redis, HTTP, and more

Project description

MongoDB Change Broadcaster

PyPI Version Python Versions

A real-time change stream broadcaster for MongoDB, supporting multiple delivery channels (WebSocket, Redis, HTTP, etc.) with extensible architecture.

Read the blog post on the implementation here.

Building a Versatile Data Streaming System with Broadcaster Package

Features

  • 📡 Listen to MongoDB change streams
  • 🚀 Built-in channels: WebSocket, Redis, HTTP, and Database Logging
  • 🔌 Extensible architecture for custom channels
  • ⚡ Async-first implementation
  • 🛠️ Configurable pipelines and filtering

Installation

pip install mongo-broadcaster

# Optional dependencies for specific channels:
pip install mongo_broadcaster[fastapi]  # WebSocket
pip install mongo_broadcaster[redis]    # Redis Pub/Sub support

Basic Usage

from mongo_broadcaster import (
    MongoChangeBroadcaster,
    BroadcasterConfig,
    CollectionConfig
)
from mongo_broadcaster.channels import WebSocketChannel

# Initialize with MongoDB connection
config = BroadcasterConfig(
    mongo_uri="mongodb://localhost:27017",
    collections=[
        CollectionConfig(
            collection_name="users",
            fields_to_watch=["name", "email"],
            recipient_identifier="fullDocument._id"
        )
     ]
)

broadcaster = MongoChangeBroadcaster(config)
broadcaster.add_channel(WebSocketChannel())

# Start listening (typically in your app startup)
await broadcaster.start()

Built-in Channels

Channel Description Ideal For
WebSocketChannel Real-time browser updates Live dashboards
RedisPubSubChannel Pub/Sub messaging Microservices
HTTPCallbackChannel Webhook notifications Third-party integrations
DatabaseChannel Persistent change logging Audit trails

Extending with Custom Channels

Implement your own channel by subclassing BaseChannel:

from mongo_broadcaster.channels.base import BaseChannel
from typing import Any, Dict


class CustomMQTTChannel(BaseChannel):
    def __init__(self, broker_url: str):
        self.broker_url = broker_url
        self.client = None

    async def connect(self):
	"""Initialize your connection"""
	self.client = await setup_mqtt_client(self.broker_url)

    async def send(self, recipient: str, message: Dict[str, Any]):
	"""Send to specific recipient"""
	await self.client.publish(f"changes/{recipient}", message)

    async def broadcast(self, message: Dict[str, Any]):
	"""Send to all subscribers"""
	await self.client.publish("changes/all", message)

    async def disconnect(self):
	"""Clean up resources"""
	await self.client.disconnect()

# Usage:
broadcaster.add_channel(CustomMQTTChannel("mqtt://localhost"))

Configuration Options

CollectionConfig

CollectionConfig(
    collection_name: str,
    database_name: Optional[str] = None,
	# Fields to include in change events
    fields_to_watch: List[str] = [],
	# Dot-notation path to identify recipients (e.g., "fullDocument._id")
    recipient_identifier: Optional[str] = None,
	# MongoDB change stream options
    change_stream_config: ChangeStreamConfig = ChangeStreamConfig()
)

Examples

FastAPI WebSocket Endpoint

from fastapi import FastAPI, WebSocket

app = FastAPI()
ws_channel = WebSocketChannel()


@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
    await ws_channel.connect(client_id, websocket)
    try:
        while True:
            await websocket.receive_text()
    except WebSocketDisconnect:
	await ws_channel.disconnect(client_id)

Please see the examples folder for more.

Contributing

To add new channels:

  1. Create a subclass of BaseChannel
  2. Implement required methods:
  • connect()
  • send()
  • broadcast()
  • disconnect()
  1. Submit a PR!

License

MIT

TODO

  • Write tests

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mongo_broadcaster-0.1.3.tar.gz (12.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mongo_broadcaster-0.1.3-py3-none-any.whl (15.8 kB view details)

Uploaded Python 3

File details

Details for the file mongo_broadcaster-0.1.3.tar.gz.

File metadata

  • Download URL: mongo_broadcaster-0.1.3.tar.gz
  • Upload date:
  • Size: 12.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.7

File hashes

Hashes for mongo_broadcaster-0.1.3.tar.gz
Algorithm Hash digest
SHA256 7805015633198e239826d1b2e1be2527a5382ab816e342b0b547c6b4c1fada14
MD5 43dbdff118328d8424ba02eda61dc3ad
BLAKE2b-256 f6c455a8d761f24b357da56722e41352ca5bbf0c909b5f500ea95e28d36d25dc

See more details on using hashes here.

File details

Details for the file mongo_broadcaster-0.1.3-py3-none-any.whl.

File metadata

File hashes

Hashes for mongo_broadcaster-0.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 a30e2f3858e2c508adc93ed0cab66b4bc84aca8e8d53f64f37bae66c266caa29
MD5 b6fc845b92eaf651e412afffdd4dc3c2
BLAKE2b-256 487ae7dfe9b5f7fc9e0d8006e244b08fddc175b2f069af476f1d43cda4b00b2c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page