Skip to main content

Fast as websocket easy as http

Project description

X-raptor

banner

By: CenturyBoys

⚠️ Fast as a hell, CAUTION!!!

This package is being developed and is in the testing process. 🚨 NOT USE THIS PACKAGE IN PRODUCTION !!!

Fast as websocket easy as http, this package is an abstraction of websockets package to allow user to register get, post, sub, unsub asynchronous callbacks. For this all message must be a requests or a response object.

To allow multiple asynchronous responses on routes X-raptor use the request_id as antenna. Those antennas are pubsub channels that yield string messages.

Registering a route

To register a route you can use the xraptor.XRaptor.register to get the route instance and use the as_ (as_get, as_post, as_sub, as_unsub,) decorator. See below an example

import xraptor


@xraptor.XRaptor.register("/send_message_to_chat_room").as_post
async def send_message(
        request: xraptor.Request
) -> xraptor.Response:
    ...

Start server

import xraptor
import asyncio

_xraptor = xraptor.XRaptor("localhost", 8765)

xraptor.antennas.RedisAntenna.set_config({"url": "redis://:@localhost:6379/0"})

_xraptor.set_antenna(xraptor.antennas.RedisAntenna)

asyncio.run(_xraptor.load_routes().serve())

📡 Antenna

There is a default antenna (that use memory queue) configuration but is not recommended to use, you have two options implements your own antenna class using the interface or use one of the extra packages.

from abc import ABC, abstractmethod
from typing import AsyncIterator, Awaitable


class Antenna(ABC):

    @abstractmethod
    def subscribe(self, key: str) -> AsyncIterator[str]:
        """
        async generator that will yield message from the key's channel 
        :param key: pubsub channel
        :return: str message async generator
        """

    @abstractmethod
    def post(self, key: str, message: str) -> Awaitable:
        """
        async function that will publish a message to a key's channel 
        :param key: pubsub channel
        :param message: message
        :return: 
        """

    @abstractmethod
    def is_alive(self, antenna_id: str) -> Awaitable[bool]:
        """
        verify that antenna_id still alive
        :param antenna_id:
        :return:
        """

    @classmethod
    @abstractmethod
    def set_config(cls, config: dict):
        """
        set config map for this antenna
        :param config:
        :return:
        """

📤 Broadcast

The library provides a broadcast room implementation that enables users to register and receive messages within a shared space. This functionality is similar to a chat room where multiple users can join and automatically receive all messages posted without requiring constant polling.

This broadcast implementation use the registered antenna to handle request and (un)subscriptions

from typing import Self


class Broadcast:
    @classmethod
    def get(cls, broadcast_id: str) -> Self:
        """
        correct way to get a broadcast instance
        :param broadcast_id: string identifier
        :return: Broadcast object instance
        """

    def add_member(self, member: str):
        """
        add member on this chat room and if is the first to coming in, will open the room.
        :param member: member is an antenna id coming from request
        :return:
        """

    def remove_member(self, member: str):
        """
        remove member from this chat room and if is the last to coming out, will close the room.
        :param member: member is an antenna id coming from request
        :return:
        """

Extras

Redis

This extra add the redis package in version ^5.0.8.

How to install extra packages?

poetry add xraptor -E redis_version
OR
pip install 'xraptor[redis_version]'

Redis antenna need string connection that you will configure on his antenna using the set_config.

import xraptor

...

xraptor.antennas.RedisAntenna.set_config({"url": "redis://:@localhost:6379/0"})

...

🧮 Full Example

A very simple chat implementation was created to test sub, poss and unsub routes.

The test work using the redis_edition.

  • The server.py implementation can be found here.
  • The client.py implementation can be found here.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

xraptor-0.3.0.tar.gz (13.0 kB view details)

Uploaded Source

Built Distribution

xraptor-0.3.0-py3-none-any.whl (15.8 kB view details)

Uploaded Python 3

File details

Details for the file xraptor-0.3.0.tar.gz.

File metadata

  • Download URL: xraptor-0.3.0.tar.gz
  • Upload date:
  • Size: 13.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.12.6 Linux/6.1.0-25-amd64

File hashes

Hashes for xraptor-0.3.0.tar.gz
Algorithm Hash digest
SHA256 2c1c6da8f924c03384a0ce3eb827f575c5be6db963196354cc0765f7afca604f
MD5 e284bcaac72df6e41e8d31eca79748ac
BLAKE2b-256 66bccad031954880e67fe1a0b53ad13f5e3fe80a4c9fdc80d7f4e1bd3f6d2699

See more details on using hashes here.

File details

Details for the file xraptor-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: xraptor-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 15.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.12.6 Linux/6.1.0-25-amd64

File hashes

Hashes for xraptor-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 1a94a5ac6cb75ceb25b63859ee1a8894a518556b6979f4c2a14e37300b401861
MD5 f084eed98a947b76b727479aaa6b26b4
BLAKE2b-256 11b1de7fe45da40f35dc77ee3345e78306c50cabcd06088f22e3080bdf67bdb3

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page