Skip to main content

Rhubarb is a library that simplifies realtime streaming for a number of backends into a single API

Project description

Rhubarb

Release Build Python Version Dependencies Status codecov Documentation Status Code style: black Security: bandit Pre-commit Semantic Versions Commit activity License

Rhubarb is a library that simplifies realtime streaming of events for a number of backends in to a single API. Currently supports Postgres, kafka, RabbitMQ, redis as well as an internal memory backend useful for testing.

Installation

There are a number of backends that can be used with Rhubarb:

Kafka Postgres Redis RabbitMQ

pip install rhubarb-py[kafka] pip install rhubarb-py[postgres] pip install rhubarb-py[redis] pip install rhubarb-py[rabbitmq]

Backends

  • Rhubarb("redis://localhost:6379/0")
  • Rhubarb("kafka://localhost:9092")
  • Rhubarb("postgres://postgres:postgres@localhost:5432/rhubarb")
  • Rhubarb("amqp://guest:guest@localhost/")
  • Rhubarb("memory://")

Quick start

Simple event consumer

async with Rhubarb("redis://localhost:6379/0") as events:
    async with events.subscribe(channel="CHATROOM") as subscriber:
        async for event in subscriber:
            await websocket.send_text(event.message)

Simple event producer

async with Rhubarb("redis://localhost:6379/0") as events:
    await events.publish("test message")

History retrieval

async with Rhubarb("redis://localhost:6379/0") as events:
    async with events.subscribe(channel="CHATROOM", history=10) as subscriber: # read the last 10 events published to the channel
        async for event in subscriber:
            await websocket.send_text(event.message)

Custom serializer & deserializer

async with Rhubarb("redis://localhost:6379/0", serializer=json.dumps, deserializer=json.loads) as events:
    async with events.subscribe(channel="CHATROOM", history=10) as subscriber: # read the last 10 events published to the channel
        async for event in subscriber:
            await websocket.send_text(event.message)

Group subscribing (at-most-once processing)

async with Rhubarb("redis://localhost:6379/0", serializer=json.dumps, deserializer=json.loads) as events:
    async with events.subscribe(
        "TEST-GROUP-CHANNEL", group_name="TEST_GROUP", consumer_name="sub_1"
    ) as subscriber_1:
        async for event in subscriber:
            await process_job(event)

Example

A minimal working example can be found in example directory.

import os

from starlette.applications import Starlette
from starlette.concurrency import run_until_first_complete
from starlette.responses import HTMLResponse
from starlette.routing import Route, WebSocketRoute

from rhubarb import Rhubarb

URL = os.environ.get("URL", "redis://localhost:6379/0")

events = Rhubarb(URL)

html = """
<!DOCTYPE html>
<html>
    <head>
        <title>Chat</title>
    </head>
    <body>
        <h1>WebSocket Chat</h1>
        <form action="" onsubmit="sendMessage(event)">
            <input type="text" id="messageText" autocomplete="off"/>
            <button>Send</button>
        </form>
        <ul id='messages'>
        </ul>
        <script>
            var ws = new WebSocket("ws://localhost:8000/ws");
            ws.onmessage = function(event) {
                var messages = document.getElementById('messages')
                var message = document.createElement('li')
                var content = document.createTextNode(event.data)
                message.appendChild(content)
                messages.appendChild(message)
            };
            function sendMessage(event) {
                var input = document.getElementById("messageText")
                ws.send(input.value)
                input.value = ''
                event.preventDefault()
            }
        </script>
    </body>
</html>
"""


async def homepage(_):
    return HTMLResponse(html)


async def room_consumer(websocket):
    async for message in websocket.iter_text():
        await events.publish(channel="chatroom", message=message)


async def room_producer(websocket):
    async with events.subscribe(channel="chatroom") as subscriber:
        async for event in subscriber:
            await websocket.send_text(event.message)


async def ws(websocket):
    await websocket.accept()
    await run_until_first_complete(
        (room_consumer, {"websocket": websocket}),
        (room_producer, {"websocket": websocket}),
    )


routes = [
    Route("/", homepage),
    WebSocketRoute("/ws", ws, name="chatroom_ws"),
]


app = Starlette(
    routes=routes,
    on_startup=[events.connect],
    on_shutdown=[events.disconnect],
)

🛡 License

License

This project is licensed under the terms of the MIT license. See LICENSE for more details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rhubarb_py-1.8.6.tar.gz (16.6 kB view hashes)

Uploaded Source

Built Distribution

rhubarb_py-1.8.6-py3-none-any.whl (18.8 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page