Build data streaming pipelines using faststan
Project description
FastSTAN
Easily deploy NATS Streaming subscribers using Python.
Features
-
Define subscribers using sync and async python functions
-
Automatic data parsing and validation using type annotations and pydantic
-
Support custom validation using any function
-
Allow several subscribers on same channel
-
Healthcheck available using HTTP GET request to monitor the applications
-
(TODO) Metrics available using HTTP GET requests
-
All of FastAPI features
Quick start
- Install the package from pypi:
pip install faststan
- Create your first subscriber:
from faststan.faststan import FastSTAN
app = FastSTAN()
@app.subscribe("demo")
def on_event(message: str):
print(f"INFO :: Received new message: {message}")
- Start your subscriber:
uvicorn app:app
Advanced features
Using error callbacks
from faststan.faststan import FastSTAN
app = FastSTAN()
def handle_error(error):
print("ERROR: {error}")
@app.subscribe("demo", error_cb=handle_error)
def on_event(message: str):
print(f"INFO :: Received new message: {message}")
Using pydantic models
You can use pydantic models in order to automatically parse incoming messages:
from pydantic import BaseModel
from faststan.faststan import FastSTAN
class Event(BaseModel):
timestamp: int
temperature: float
humidity: float
app = FastSTAN()
@app.subscribe("event")
def on_event(event):
msg = f"INFO :: {event.timestamp} :: Temperature: {event.temperature} | Humidity: {event.humidity}"
print(msg)
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
faststan-0.1.1.tar.gz
(5.0 kB
view hashes)