⛓️ Combination of asyncapi(documentation) & socketio pub/sub using aiokafka as the client manager multinode backend services
Project description
aiosox: repo link
Quick example
can be installed using pip/poetry:
poetry shell
poetry run uvicorn example:app --port=8001
from typing import List
from fastapi import FastAPI
from pydantic import BaseModel
from aiosox import SioAuth, SioNamespace, SocketIoServer
def get_app():
applitcation = FastAPI(title="tester")
return applitcation
app = get_app()
sio_server: SocketIoServer = SocketIoServer(app=app, kafka_url="localhost:29092")
user_namespapce: SioNamespace = SioNamespace("/user", socket_io_server=sio_server)
sio_server._sio.register_namespace(user_namespapce)
@app.on_event("startup")
async def on_start():
await sio_server.start()
@app.on_event("shutdown")
async def on_shutdown():
await sio_server.shutdown()
class UserY(BaseModel):
name: str
class UserT(BaseModel):
name: str
what: List[UserY]
class OfferT(BaseModel):
title: str
on_failed_emmiter = user_namespapce.create_emitter("failed", model=OfferT | UserT)
@user_namespapce.on(
"submit",
description="when user submits a form",
payload_model=UserT | UserY,
response_model=List[UserT],
auth=SioAuth.jwt,
)
async def on_submit(sid, data):
print(
sid,
data,
)
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
aiosox-0.3.0.tar.gz
(19.1 kB
view hashes)
Built Distribution
aiosox-0.3.0-py3-none-any.whl
(27.2 kB
view hashes)