A fast and durable PubSub channel over Websockets (using fastapi-websockets-rpc).
Project description
⚡ FASTAPI Websocket Pub/Sub 🗞️
A fast and durable Pub/Sub channel over Websockets. The easiest way to create a live publish / subscribe multi-cast over the web.
Supports and tested on Python >= 3.7
Installation 🛠️
pip install fastapi_websocket_pubsub
Intro
The classic pub/sub pattern made easily accessible and scalable over the web and across your cloud in realtime; while enjoying the benefits of FastAPI (e.g. dependency injection).
FastAPI + WebSockets + PubSub == 💪 ❤️
-
Subscribe
- Clients subscribe to topics (arbitrary strings) and receive relevant events along with structured data (serialized with Pydantic).
# Callback to be called upon event being published on server async def on_event(data): print("We got an event! with data- ", data) # Subscribe for the event client.subscribe("my event", on_event)
- Clients subscribe to topics (arbitrary strings) and receive relevant events along with structured data (serialized with Pydantic).
-
Publish
- Directly from server code to connected clients.
app = FastAPI() endpoint = PubSubEndpoint() endpoint.register_route(app, "/pubsub") endpoint.publish(["my_event_topic"], data=["my", "data", 1])
- From client to client (through the servers)
async with PubSubClient(server_uri="ws://localhost/pubsub") as client: endpoint.publish(["my_event_topic"], data=["my", "data", 1])
- Across server instances (using broadcaster and a backend medium (e.g. Redis, Kafka, ...))
- No matter which server a client connected to - it will get the messages it subscribes too
app = FastAPI() endpoint = PubSubEndpoint(broadcaster="postgres://localhost:5432/") @app.websocket("/pubsub") async def websocket_rpc_endpoint(websocket: WebSocket): async with endpoint.broadcaster: await endpoint.main_loop(websocket)
see examples/pubsub_broadcaster_server_example.py for full usage example
- Directly from server code to connected clients.
Usage example (server publishing following HTTP trigger):
In the code below, a client connects to the server and subscribes to a topic named "triggered". Aside from PubSub websocket, the server also exposes a regular http route, which triggers publication of the event.
Server:
import asyncio
import uvicorn
from fastapi import FastAPI
from fastapi.routing import APIRouter
from fastapi_websocket_pubsub import PubSubEndpoint
app = FastAPI()
# Init endpoint
endpoint = PubSubEndpoint()
# register the endpoint on the app
endpoint.register_route(app, "/pubsub")
# Register a regular HTTP route
@app.get("/trigger")
async def trigger_events():
# Upon request trigger an event
endpoint.publish(["triggered"])
Client:
from fastapi_websocket_pubsub import PubSubClient
# Callback to be called upon event being published on server
async def on_trigger(data):
print("Trigger URL was accessed")
async with PubSubClient(server_uri="ws://localhost/pubsub") as client:
# Subscribe for the event
client.subscribe("triggered", on_event)
More Examples
- See the examples and tests folders for more server and client examples.
- See fastapi-websocket-rpc depends example to see how to combine with FASTAPI dependency injections
What can I do with this?
The combination of Websockets, and bi-directional Pub/Sub is ideal to create realtime data propagation solution at scale over the web.
- Update mechanism
- Remote control mechanism
- Data processing
- Distributed computing
- Realtime communications over the web
Foundations:
-
Based on fastapi-websocket-rpc for a robust realtime bidirectional channel
-
Based on broadcaster for syncing server instances
-
Server Endpoint:
-
Based on FAST-API: enjoy all the benefits of a full ASGI platform, including Async-io and dependency injections (for example to authenticate connections)
-
Based on Pydnatic: easily serialize structured data as part of RPC requests and responses. Simply Pass Pydantic data models as PubSub published data to have it available as part of an event.
-
-
Client :
-
Based on Tenacity: allowing configurable retries to keep to connection alive
- see WebSocketRpcClient.init's retry_config
-
Based on python websockets - a more comprehensive client than the one offered by Fast-api
-
Pull requests - welcome!
- Please include tests for new features
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distributions
Hashes for fastapi_websocket_pubsub-0.1.5.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | c718a1333cd036947058d5ff9cd9c10cf223bfb7c5e4fa0470a40c08885bc5c9 |
|
MD5 | 10905d60e7f0dfa85e46cdb21b63f1e9 |
|
BLAKE2b-256 | 5ed16641181a5e94552e0f45148f5bafe110864dc6ce88876aa918500e9d54f0 |
Hashes for fastapi_websocket_pubsub-0.1.5-py3.8.egg
Algorithm | Hash digest | |
---|---|---|
SHA256 | 11787f2300b297eb10db798defedc44c937486b5b53f8e6ffbf96fbc1307f681 |
|
MD5 | dd660bc1d3939e54d092f38bb43d56f6 |
|
BLAKE2b-256 | 86eb1edae57a5603f6eb8bd747f6de3486e45cdeee7936aedb5d3acf42cd052b |
Hashes for fastapi_websocket_pubsub-0.1.5-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | af4df048c7e3332075971f730446b355591dbd499f7a2d487d6054d0b713f520 |
|
MD5 | f29e17c1200d01852f9fecd7129fc829 |
|
BLAKE2b-256 | 37ecd915b069ac74ba157f832bbd3ed90c3ba00e92a591b2013d411b42cefa15 |