Skip to main content

A toolkit for Event-Driven websocket management

Project description

Zaptools

A toolkit for Event-Driven websocket management

PyPI version

Also Supported

Lang Side View Source
python Client/Server zaptools_dart
python Client/Server zaptools_go

Getting Started

Zaptools provides tools for building event-driven websocket integration. It includes pre-existing classes to seamless integration with FastApi and Sanic.

installation

pip install zaptools # windows
pip3 install zaptools # mac

FastAPI

from fastapi import FastAPI, WebSocket
from zaptools.tools import EventRegister, EventContext
from zaptools.connectors import FastApiConnector

app:FastAPI = FastAPI()
register: EventRegister = EventRegister() 

@register.on_event("hello") 
async def hello_trigger(ctx: EventContext):
    conn = ctx.connection
    await conn.send("hello", "HELLO FROM SERVER !!!") 


@app.websocket("/ws")
async def websocket_endpoint(ws: WebSocket):
    connector = FastApiConnector(reg, ws)
    await connector.start()

Firstly create a FastAPI and EventRegister instance. EventRegister has the responsability to create events.

from fastapi import FastAPI, WebSocket
from zaptools.tools import EventRegister, EventContext, Connector
from zaptools.adapters import FastApiAdapter

app:FastAPI = FastAPI()
register: EventRegister = EventRegister() 

For Creating events use the decorator syntax. This will creates an event named "hello" and it will call hello_trigger function when an event named "hello" is received.

@register.on_event("hello") 
async def hello_trigger(ctx: EventContext):
    conn = ctx.connection
    await conn.send("hello", "HELLO FROM SERVER !!!") 

Event it is a class with name("hello") and the callback(hello_trigger)

For connecting EventRegister with the websocket class provided by FastAPI framework, there is a FastApiConnector, use the plug_and_start static method of the FastApiConnector, it will start to receive events.

@app.websocket("/ws")
async def websocket_endpoint(ws: WebSocket):
    connector = FastApiConnector(reg, ws)
    await connector.start()

It's the same way for Sanic Framework

Sanic

from sanic import Sanic, Request, Websocket

from zaptools.tools import EventRegister, EventContext
from zaptools.connectors import SanicConnector

app = Sanic("MyHelloWorldApp")
register: EventRegister = EventRegister()

@register.on_event("hello") 
async def hello_trigger(ctx: EventContext):
    conn = ctx.connection
    await conn.send("hello", "HELLO FROM SERVER !!!") 

@app.websocket("/")
async def websocket(request: Request, ws: Websocket):
    connector = SanicConnector(reg, ws)
    await connector.start()

EventContext object

Each element is triggered with a EventContext object. This EventContext object contains information about the current event and which WebSocketConnection is invoking it.

EventContext.event_name # name of current event
EventContext.payload # payload the data from the connection
EventContext.connection # WebSocketConnection 

Sending Events

In order to response to the client use the WebSocketConnection.send(event:str, payload:Any), this object is provided by the Context.

@register.on_event("hello") 
async def hello_trigger(ctx: EventContext):
    conn = ctx.connection
    conn.send("hello", "HELLO FROM SERVER !!!") # sending "hello" event to client with a payload.

WebSocketConnection

WebSocketConnection provides a easy interaction with the websocket.

WebSocketConnection.id # ID of connection

await WebSocketConnection.send(event:str, payload:Any) #Send Event to the client

await WebSocketConnection.close() # Close the websocket connection

Coroutines need to be awaited.

Events

The "connected", "disconnected" and "error" events can be used to trigger an action when a connection is started and after it is closed or when a error ocurred in a event.

@register.on_event("connected")
async def connected_trigger(ctx: EventContext):
    print("Connection started")

@register.on_event("disconnected")
async def disconnected_trigger(ctx: EventContext):
    print("Connection closed")

@register.on_event("error")
async def disconnected_trigger(ctx: EventContext):
    print("An error ocurred in a event")
    print(ctx.payload) # display error details

Error details in payload

Client

Zaptools provides a python client to connect with others zaptools server

from zaptools.client import ZapClient

client = ZapClient()
await client.connect("ws://localhost:8000/") #Connect to the server

await client.send("event1", {"hello":"from client"}, {}) # send a event

# A generator with all event stream
# Receive all events
async for event in client.event_stream(): 
        print(event.payload)


# A generator with all connection state
# Receive connection state
# ONLINE, OFFLINE, CONNNECTING and ERROR state
async for state in client.connection_state(): 
        print(state)

Connection Router

The ConnectionRouter class allows managing multiple WebSocket connections, providing methods to add, remove, retrieve, and send events to specific connections or all connections.

# Add a connectin to the router
ConnectionRouter.add_connection(connection: WebSocketConnection) -> None

# Remove a connection from the router
ConnectionRouter.remove_connection(connection: WebSocketConnection) -> None

# Retrieve a connection by its ID
ConnectionRouter.get_connection(id: str) -> WebSocketConnection | None

# Retrieve all connections
ConnectionRouter.get_all_connections() -> list[WebSocketConnection]

# Broadcast an event to all connections could be have exclusion list
ConnectionRouter.broadcast(event: str, payload: Any, exclude: list[WebSocketConnection] = [])

# Send an event to a specific connection
ConnectionRouter.send_to_connection(id: str, event: str, payload: Any) -> bool

Example:

from fastapi import FastAPI, WebSocket
from zaptools import EventRegister, EventContext, ConnectionRouter
from zaptools.connectors import FastApiConnector

app: FastAPI = FastAPI()
reg: EventRegister = EventRegister()

connection_router = ConnectionRouter()


@reg.on_event("connected")
async def on_connected_trigger(ctx: EventContext):
    # add the connection to the router
    connection_router.add_connection(ctx.connection)
    # send a event to all connections in the router
    await connection_router.broadcast(
        "new-user",
        "new user connected",
        exclude=[ctx.connection],  # exclude the sender
    )


@reg.on_event("disconnected")
async def on_disconnected_trigger(ctx: EventContext):
    # remove the connection from the router
    connection_router.remove_connection(ctx.connection)
    # send a event to all connections in the router
    await connection_router.broadcast(
        "user-disconnected",
        "user disconnected",
        exclude=[ctx.connection],
    )


@app.websocket("/")
async def websocket_endpoint(ws: WebSocket):
    conn = FastApiConnector(register=reg, websocket=ws)
    await conn.start()

Contributions are wellcome

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

zaptools-0.5.2.tar.gz (13.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

zaptools-0.5.2-py3-none-any.whl (13.0 kB view details)

Uploaded Python 3

File details

Details for the file zaptools-0.5.2.tar.gz.

File metadata

  • Download URL: zaptools-0.5.2.tar.gz
  • Upload date:
  • Size: 13.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for zaptools-0.5.2.tar.gz
Algorithm Hash digest
SHA256 1f49c356f4cefb4c7566a198502b48142e1a6705059ab4a1e50da950d0035e1e
MD5 cbf49bb2b1b18f879c1457765cec38d2
BLAKE2b-256 6c00da01b8ab1ef6fa1e78dfa8f1b066b7a938a06970e44ac337b53e580c5865

See more details on using hashes here.

Provenance

The following attestation bundles were made for zaptools-0.5.2.tar.gz:

Publisher: publish.yaml on NathanDraco22/zaptools-python

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file zaptools-0.5.2-py3-none-any.whl.

File metadata

  • Download URL: zaptools-0.5.2-py3-none-any.whl
  • Upload date:
  • Size: 13.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for zaptools-0.5.2-py3-none-any.whl
Algorithm Hash digest
SHA256 333859e9eb64fd95f7a2b8bdd5eec02718e8b94e97b61235a9f7a4e988ac3108
MD5 0c46586ec65fdddfbf6851a341cc98b8
BLAKE2b-256 e09d2f2e954166610eae086b02c5bb4c715b1f488e61b82fe3d2d0d84a0a12ac

See more details on using hashes here.

Provenance

The following attestation bundles were made for zaptools-0.5.2-py3-none-any.whl:

Publisher: publish.yaml on NathanDraco22/zaptools-python

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page