Skip to main content

Rhubarb is a library that simplifies realtime streaming for a number of backends into a single API

Project description

Rhubarb

Release Build Python Version Dependencies Status codecov Documentation Status Code style: black Security: bandit Pre-commit Semantic Versions Commit activity License

Rhubarb is a library that simplifies realtime streaming of events for a number of backends in to a single API. Currently supports Postgres, kafka, RabbitMQ, redis as well as an internal memory backend useful for testing.

Installation

There are a number of backends that can be used with Rhubarb:

Kafka Postgres Redis RabbitMQ

pip install rhubarb-py[kafka] pip install rhubarb-py[postgres] pip install rhubarb-py[redis] pip install rhubarb-py[rabbitmq]

Backends

  • Rhubarb("redis://localhost:6379/0")
  • Rhubarb("kafka://localhost:9092")
  • Rhubarb("postgres://postgres:postgres@localhost:5432/rhubarb")
  • Rhubarb("amqp://guest:guest@localhost/")
  • Rhubarb("memory://")

Quick start

Simple event consumer

async with Rhubarb("redis://localhost:6379/0") as events:
    async with events.subscribe(channel="CHATROOM") as subscriber:
        async for event in subscriber:
            await websocket.send_text(event.message)

Simple event producer

async with Rhubarb("redis://localhost:6379/0") as events:
    await events.publish("test message")

History retrieval

async with Rhubarb("redis://localhost:6379/0") as events:
    async with events.subscribe(channel="CHATROOM", history=10) as subscriber: # read the last 10 events published to the channel
        async for event in subscriber:
            await websocket.send_text(event.message)

Custom serializer & deserializer

async with Rhubarb("redis://localhost:6379/0", serializer=json.dumps, deserializer=json.loads) as events:
    async with events.subscribe(channel="CHATROOM", history=10) as subscriber: # read the last 10 events published to the channel
        async for event in subscriber:
            await websocket.send_text(event.message)

Group subscribing (at-most-once processing)

async with Rhubarb("redis://localhost:6379/0", serializer=json.dumps, deserializer=json.loads) as events:
    async with events.subscribe(
        "TEST-GROUP-CHANNEL", group_name="TEST_GROUP", consumer_name="sub_1"
    ) as subscriber_1:
        async for event in subscriber:
            await process_job(event)

Example

A minimal working example can be found in example directory.

import os

from starlette.applications import Starlette
from starlette.concurrency import run_until_first_complete
from starlette.responses import HTMLResponse
from starlette.routing import Route, WebSocketRoute

from rhubarb import Rhubarb

URL = os.environ.get("URL", "redis://localhost:6379/0")

events = Rhubarb(URL)

html = """
<!DOCTYPE html>
<html>
    <head>
        <title>Chat</title>
    </head>
    <body>
        <h1>WebSocket Chat</h1>
        <form action="" onsubmit="sendMessage(event)">
            <input type="text" id="messageText" autocomplete="off"/>
            <button>Send</button>
        </form>
        <ul id='messages'>
        </ul>
        <script>
            var ws = new WebSocket("ws://localhost:8000/ws");
            ws.onmessage = function(event) {
                var messages = document.getElementById('messages')
                var message = document.createElement('li')
                var content = document.createTextNode(event.data)
                message.appendChild(content)
                messages.appendChild(message)
            };
            function sendMessage(event) {
                var input = document.getElementById("messageText")
                ws.send(input.value)
                input.value = ''
                event.preventDefault()
            }
        </script>
    </body>
</html>
"""


async def homepage(_):
    return HTMLResponse(html)


async def room_consumer(websocket):
    async for message in websocket.iter_text():
        await events.publish(channel="chatroom", message=message)


async def room_producer(websocket):
    async with events.subscribe(channel="chatroom") as subscriber:
        async for event in subscriber:
            await websocket.send_text(event.message)


async def ws(websocket):
    await websocket.accept()
    await run_until_first_complete(
        (room_consumer, {"websocket": websocket}),
        (room_producer, {"websocket": websocket}),
    )


routes = [
    Route("/", homepage),
    WebSocketRoute("/ws", ws, name="chatroom_ws"),
]


app = Starlette(
    routes=routes,
    on_startup=[events.connect],
    on_shutdown=[events.disconnect],
)

🛡 License

License

This project is licensed under the terms of the MIT license. See LICENSE for more details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rhubarb_py-1.8.6.tar.gz (16.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

rhubarb_py-1.8.6-py3-none-any.whl (18.8 kB view details)

Uploaded Python 3

File details

Details for the file rhubarb_py-1.8.6.tar.gz.

File metadata

  • Download URL: rhubarb_py-1.8.6.tar.gz
  • Upload date:
  • Size: 16.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.5.1 CPython/3.10.6 Linux/5.15.0-1038-azure

File hashes

Hashes for rhubarb_py-1.8.6.tar.gz
Algorithm Hash digest
SHA256 f9ea7a52afb84156cd4a077a5006b2294b0d7cdfc8da9f94cead83067347aace
MD5 11185f5fe01aa824986847b204d2a40b
BLAKE2b-256 24971d76bc0b2fd86190e02fd466a5de44f4e55571e8cf7a16cbf59acd5e3baf

See more details on using hashes here.

File details

Details for the file rhubarb_py-1.8.6-py3-none-any.whl.

File metadata

  • Download URL: rhubarb_py-1.8.6-py3-none-any.whl
  • Upload date:
  • Size: 18.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.5.1 CPython/3.10.6 Linux/5.15.0-1038-azure

File hashes

Hashes for rhubarb_py-1.8.6-py3-none-any.whl
Algorithm Hash digest
SHA256 064f1473605a35324f63d92b738c6b5ffa7fdc17d4fe9094409cf1dcff539ed4
MD5 84445e467c85810ca45f01150a0894fa
BLAKE2b-256 abe4dcd07d414bbc5f3a0cae8847c3794cfb9ae7287307d79cea3795dd735c2d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page