A toolkit for Event-Driven websocket management
Project description
Zaptools
A toolkit for Event-Driven websocket management
Also Supported
| Lang | Side | View Source |
|---|---|---|
| |
Client/Server | zaptools_dart |
| |
Client/Server | zaptools_go |
Getting Started
Zaptools provides tools for building event-driven websocket integration. It includes pre-existing classes to seamless integration with FastApi and Sanic.
installation
pip install zaptools # windows
pip3 install zaptools # mac
FastAPI
from fastapi import FastAPI, WebSocket
from zaptools.tools import EventRegister, EventContext
from zaptools.connectors import FastApiConnector
app:FastAPI = FastAPI()
register: EventRegister = EventRegister()
@register.on_event("hello")
async def hello_trigger(ctx: EventContext):
conn = ctx.connection
await conn.send("hello", "HELLO FROM SERVER !!!")
@app.websocket("/ws")
async def websocket_endpoint(ws: WebSocket):
connector = FastApiConnector(reg, ws)
await connector.start()
Firstly create a FastAPI and EventRegister instance. EventRegister has the responsability to create events.
from fastapi import FastAPI, WebSocket
from zaptools.tools import EventRegister, EventContext, Connector
from zaptools.adapters import FastApiAdapter
app:FastAPI = FastAPI()
register: EventRegister = EventRegister()
For Creating events use the decorator syntax.
This will creates an event named "hello" and it will call hello_trigger function when an event named "hello" is received.
@register.on_event("hello")
async def hello_trigger(ctx: EventContext):
conn = ctx.connection
await conn.send("hello", "HELLO FROM SERVER !!!")
Event it is a class with name("hello") and the callback(hello_trigger)
For connecting EventRegister with the websocket class provided by FastAPI framework, there is a FastApiConnector, use the plug_and_start static method of the FastApiConnector, it will start to receive events.
@app.websocket("/ws")
async def websocket_endpoint(ws: WebSocket):
connector = FastApiConnector(reg, ws)
await connector.start()
It's the same way for Sanic Framework
Sanic
from sanic import Sanic, Request, Websocket
from zaptools.tools import EventRegister, EventContext
from zaptools.connectors import SanicConnector
app = Sanic("MyHelloWorldApp")
register: EventRegister = EventRegister()
@register.on_event("hello")
async def hello_trigger(ctx: EventContext):
conn = ctx.connection
await conn.send("hello", "HELLO FROM SERVER !!!")
@app.websocket("/")
async def websocket(request: Request, ws: Websocket):
connector = SanicConnector(reg, ws)
await connector.start()
EventContext object
Each element is triggered with a EventContext object. This EventContext object contains information about the current event and which WebSocketConnection is invoking it.
EventContext.event_name # name of current event
EventContext.payload # payload the data from the connection
EventContext.connection # WebSocketConnection
Sending Events
In order to response to the client use the WebSocketConnection.send(event:str, payload:Any), this object is provided by the Context.
@register.on_event("hello")
async def hello_trigger(ctx: EventContext):
conn = ctx.connection
conn.send("hello", "HELLO FROM SERVER !!!") # sending "hello" event to client with a payload.
WebSocketConnection
WebSocketConnection provides a easy interaction with the websocket.
WebSocketConnection.id # ID of connection
await WebSocketConnection.send(event:str, payload:Any) #Send Event to the client
await WebSocketConnection.close() # Close the websocket connection
Coroutines need to be awaited.
Events
The "connected", "disconnected" and "error" events can be used to trigger an action when a connection is started and after it is closed or when a error ocurred in a event.
@register.on_event("connected")
async def connected_trigger(ctx: EventContext):
print("Connection started")
@register.on_event("disconnected")
async def disconnected_trigger(ctx: EventContext):
print("Connection closed")
@register.on_event("error")
async def disconnected_trigger(ctx: EventContext):
print("An error ocurred in a event")
print(ctx.payload) # display error details
Error details in
payload
Client
Zaptools provides a python client to connect with others zaptools server
from zaptools.client import ZapClient
client = ZapClient()
await client.connect("ws://localhost:8000/") #Connect to the server
await client.send("event1", {"hello":"from client"}, {}) # send a event
# A generator with all event stream
# Receive all events
async for event in client.event_stream():
print(event.payload)
# A generator with all connection state
# Receive connection state
# ONLINE, OFFLINE, CONNNECTING and ERROR state
async for state in client.connection_state():
print(state)
Connection Router
The ConnectionRouter class allows managing multiple WebSocket connections, providing methods to add, remove, retrieve, and send events to specific connections or all connections.
# Add a connectin to the router
ConnectionRouter.add_connection(connection: WebSocketConnection) -> None
# Remove a connection from the router
ConnectionRouter.remove_connection(connection: WebSocketConnection) -> None
# Retrieve a connection by its ID
ConnectionRouter.get_connection(id: str) -> WebSocketConnection | None
# Retrieve all connections
ConnectionRouter.get_all_connections() -> list[WebSocketConnection]
# Broadcast an event to all connections could be have exclusion list
ConnectionRouter.broadcast(event: str, payload: Any, exclude: list[WebSocketConnection] = [])
# Send an event to a specific connection
ConnectionRouter.send_to_connection(id: str, event: str, payload: Any) -> bool
Example:
from fastapi import FastAPI, WebSocket
from zaptools import EventRegister, EventContext, ConnectionRouter
from zaptools.connectors import FastApiConnector
app: FastAPI = FastAPI()
reg: EventRegister = EventRegister()
connection_router = ConnectionRouter()
@reg.on_event("connected")
async def on_connected_trigger(ctx: EventContext):
# add the connection to the router
connection_router.add_connection(ctx.connection)
# send a event to all connections in the router
await connection_router.broadcast(
"new-user",
"new user connected",
exclude=[ctx.connection], # exclude the sender
)
@reg.on_event("disconnected")
async def on_disconnected_trigger(ctx: EventContext):
# remove the connection from the router
connection_router.remove_connection(ctx.connection)
# send a event to all connections in the router
await connection_router.broadcast(
"user-disconnected",
"user disconnected",
exclude=[ctx.connection],
)
@app.websocket("/")
async def websocket_endpoint(ws: WebSocket):
conn = FastApiConnector(register=reg, websocket=ws)
await conn.start()
Contributions are wellcome
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file zaptools-0.5.2.tar.gz.
File metadata
- Download URL: zaptools-0.5.2.tar.gz
- Upload date:
- Size: 13.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1f49c356f4cefb4c7566a198502b48142e1a6705059ab4a1e50da950d0035e1e
|
|
| MD5 |
cbf49bb2b1b18f879c1457765cec38d2
|
|
| BLAKE2b-256 |
6c00da01b8ab1ef6fa1e78dfa8f1b066b7a938a06970e44ac337b53e580c5865
|
Provenance
The following attestation bundles were made for zaptools-0.5.2.tar.gz:
Publisher:
publish.yaml on NathanDraco22/zaptools-python
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
zaptools-0.5.2.tar.gz -
Subject digest:
1f49c356f4cefb4c7566a198502b48142e1a6705059ab4a1e50da950d0035e1e - Sigstore transparency entry: 200478507
- Sigstore integration time:
-
Permalink:
NathanDraco22/zaptools-python@e60ebe9edbf55ffaaa62773cf151a7d8c0e37a10 -
Branch / Tag:
refs/tags/v0.5.2 - Owner: https://github.com/NathanDraco22
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yaml@e60ebe9edbf55ffaaa62773cf151a7d8c0e37a10 -
Trigger Event:
release
-
Statement type:
File details
Details for the file zaptools-0.5.2-py3-none-any.whl.
File metadata
- Download URL: zaptools-0.5.2-py3-none-any.whl
- Upload date:
- Size: 13.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
333859e9eb64fd95f7a2b8bdd5eec02718e8b94e97b61235a9f7a4e988ac3108
|
|
| MD5 |
0c46586ec65fdddfbf6851a341cc98b8
|
|
| BLAKE2b-256 |
e09d2f2e954166610eae086b02c5bb4c715b1f488e61b82fe3d2d0d84a0a12ac
|
Provenance
The following attestation bundles were made for zaptools-0.5.2-py3-none-any.whl:
Publisher:
publish.yaml on NathanDraco22/zaptools-python
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
zaptools-0.5.2-py3-none-any.whl -
Subject digest:
333859e9eb64fd95f7a2b8bdd5eec02718e8b94e97b61235a9f7a4e988ac3108 - Sigstore transparency entry: 200478508
- Sigstore integration time:
-
Permalink:
NathanDraco22/zaptools-python@e60ebe9edbf55ffaaa62773cf151a7d8c0e37a10 -
Branch / Tag:
refs/tags/v0.5.2 - Owner: https://github.com/NathanDraco22
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yaml@e60ebe9edbf55ffaaa62773cf151a7d8c0e37a10 -
Trigger Event:
release
-
Statement type: