Skip to main content

Fast as websocket easy as http

Project description

X-raptor

banner

By: CenturyBoys

⚠️ Fast as a hell, CAUTION!!!

This package is being developed and is in the testing process. 🚨 NOT USE THIS PACKAGE IN PRODUCTION !!!

Fast as websocket easy as http, this package is an abstraction of websockets package to allow user to register get, post, sub, unsub asynchronous callbacks. For this all message must be a requests or a response object.

To allow multiple asynchronous responses on routes X-raptor use the request_id as antenna. Those antennas are pubsub channels that yield string messages.

Registering a route

To register a route you can use the xraptor.XRaptor.register to get the route instance and use the as_ (as_get, as_post, as_sub, as_unsub,) decorator. See below an example

import xraptor


@xraptor.XRaptor.register("/send_message_to_chat_room").as_post
async def send_message(
        request: xraptor.Request
) -> xraptor.Response:
    ...

Start server

import xraptor
import asyncio

_xraptor = xraptor.XRaptor("localhost", 8765)

xraptor.antennas.RedisAntenna.set_config({"url": "redis://:@localhost:6379/0"})

_xraptor.set_antenna(xraptor.antennas.RedisAntenna)

asyncio.run(_xraptor.load_routes().serve())

📡 Antenna

There is a default antenna (that use memory queue) configuration but is not recommended to use, you have two options implements your own antenna class using the interface or use one of the extra packages.

from abc import ABC, abstractmethod
from typing import AsyncIterator, Awaitable


class Antenna(ABC):

    @abstractmethod
    def subscribe(self, key: str) -> AsyncIterator[str]:
        """
        async generator that will yield message from the key's channel 
        :param key: pubsub channel
        :return: str message async generator
        """

    @abstractmethod
    def post(self, key: str, message: str) -> Awaitable:
        """
        async function that will publish a message to a key's channel 
        :param key: pubsub channel
        :param message: message
        :return: 
        """

    @abstractmethod
    def is_alive(self, antenna_id: str) -> Awaitable[bool]:
        """
        verify that antenna_id still alive
        :param antenna_id:
        :return:
        """

    @classmethod
    @abstractmethod
    def set_config(cls, config: dict):
        """
        set config map for this antenna
        :param config:
        :return:
        """

📤 Broadcast

The library provides a broadcast room implementation that enables users to register and receive messages within a shared space. This functionality is similar to a chat room where multiple users can join and automatically receive all messages posted without requiring constant polling.

This broadcast implementation use the registered antenna to handle request and (un)subscriptions

from typing import Self


class Broadcast:
    @classmethod
    def get(cls, broadcast_id: str) -> Self:
        """
        correct way to get a broadcast instance
        :param broadcast_id: string identifier
        :return: Broadcast object instance
        """

    def add_member(self, member: str):
        """
        add member on this chat room and if is the first to coming in, will open the room.
        :param member: member is an antenna id coming from request
        :return:
        """

    def remove_member(self, member: str):
        """
        remove member from this chat room and if is the last to coming out, will close the room.
        :param member: member is an antenna id coming from request
        :return:
        """

Extras

Redis

This extra add the redis package in version ^5.0.8.

How to install extra packages?

poetry add xraptor -E redis_version
OR
pip install 'xraptor[redis_version]'

Redis antenna need string connection that you will configure on his antenna using the set_config.

import xraptor

...

xraptor.antennas.RedisAntenna.set_config({"url": "redis://:@localhost:6379/0"})

...

🧮 Full Example

A very simple chat implementation was created to test sub, poss and unsub routes.

The test work using the redis_edition.

  • The server.py implementation can be found here.
  • The client.py implementation can be found here.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

xraptor-0.3.2.tar.gz (13.0 kB view details)

Uploaded Source

Built Distribution

xraptor-0.3.2-py3-none-any.whl (16.0 kB view details)

Uploaded Python 3

File details

Details for the file xraptor-0.3.2.tar.gz.

File metadata

  • Download URL: xraptor-0.3.2.tar.gz
  • Upload date:
  • Size: 13.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.12.6 Linux/6.1.0-26-amd64

File hashes

Hashes for xraptor-0.3.2.tar.gz
Algorithm Hash digest
SHA256 58ef5f7388d3020157cf4d1e6685fef22afad1c3131dcd3825fea78e8ff25deb
MD5 9e6c8b014141aafb31d6331230a3cae9
BLAKE2b-256 8900c0114ba835aeaf1a7167bf3d06f3999063ebe9718c7b81731599d2bbf6ba

See more details on using hashes here.

File details

Details for the file xraptor-0.3.2-py3-none-any.whl.

File metadata

  • Download URL: xraptor-0.3.2-py3-none-any.whl
  • Upload date:
  • Size: 16.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.12.6 Linux/6.1.0-26-amd64

File hashes

Hashes for xraptor-0.3.2-py3-none-any.whl
Algorithm Hash digest
SHA256 19cca2b1c5f0b7b14fc7ff741db1d22a9794864811464b049fd8779595d859f8
MD5 29e6cd041e13c5d220739a89a6a8d3a6
BLAKE2b-256 d076dfe9e9821cc813c51a9ca0ec2afd7d65dd0b6497cfe8719f657fa677545b

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page