Skip to main content

⛓️ Combination of asyncapi(documentation) & socketio pub/sub using aiokafka as the client manager multinode backend services

Project description

Sample image

aiosox: repo link

Quick example

can be installed using pip/poetry:

poetry shell
 
poetry run uvicorn example:app --port=8001
from typing import List

from fastapi import FastAPI
from pydantic import BaseModel

from aiosox import SioAuth, SioNamespace, SocketIoServer


def get_app():
    applitcation = FastAPI(title="tester")
    return applitcation


app = get_app()

sio_server: SocketIoServer = SocketIoServer(app=app, kafka_url="localhost:29092")
user_namespapce: SioNamespace = SioNamespace("/user", socket_io_server=sio_server)
sio_server._sio.register_namespace(user_namespapce)


@app.on_event("startup")
async def on_start():
    await sio_server.start()


@app.on_event("shutdown")
async def on_shutdown():
    await sio_server.shutdown()


class UserY(BaseModel):
    name: str


class UserT(BaseModel):
    name: str
    what: List[UserY]


class OfferT(BaseModel):
    title: str


on_failed_emmiter = user_namespapce.create_emitter("failed", model=OfferT | UserT)


@user_namespapce.on(
    "submit",
    description="when user submits a form",
    payload_model=UserT | UserY,
    response_model=List[UserT],
    auth=SioAuth.jwt,
)
async def on_submit(sid, data):
    print(
        sid,
        data,
    )

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aiosox-0.3.0.tar.gz (19.1 kB view hashes)

Uploaded Source

Built Distribution

aiosox-0.3.0-py3-none-any.whl (27.2 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page