Build data streaming pipelines using faststan
Project description
FastSTAN
Easily deploy NATS Streaming subscribers using Python.
Features
- Define subscribers using sync and async python functions
- Automatic data parsing and validation using type annotations and pydantic
- Support custom validation using any function
- Allow several subscribers on same channel
- Support all subscription configuration available in stan.py
- Healthcheck available using HTTP GET request to monitor the applications
- (TODO) Metrics available using HTTP GET requests to monitor subsriptions status
- All of FastAPI features
Quick start
- Install the package from pypi:
pip install faststan
- Create your first subscriber. Create a file named
app.py
and write the following lines:
from faststan import FastSTAN
app = FastSTAN()
@app.stan.subscribe("demo")
def on_event(message: str):
print(f"INFO :: Received new message: {message}")
- Start your subscriber:
uvicorn app:app
- Or if you are in a jupyter notebook environment, start the subscriptions:
await app.stan.run()
Advanced features
Using error callbacks
from faststan import FastSTAN
app = FastSTAN()
def handle_error(error):
print("ERROR: {error}")
@app.stan.subscribe("demo", error_cb=handle_error)
def on_event(message: str):
print(f"INFO :: Received new message: {message}")
Using pydantic models
You can use pydantic models in order to automatically parse incoming messages:
from pydantic import BaseModel
from faststan import FastSTAN
class Event(BaseModel):
timestamp: int
temperature: float
humidity: float
app = FastSTAN()
@app.stan.subscribe("event")
def on_event(event):
msg = f"INFO :: {event.timestamp} :: Temperature: {event.temperature} | Humidity: {event.humidity}"
print(msg)
Using pydantic models with numpy or pandas
import numpy as np
from pydantic import BaseModel
from faststan import FastSTAN
class NumpyEvent(BaseModel):
values: np.ndarray
timestamp: int
@validator("temperature", pre=True)
def validate_array(cls, value):
return np.array(value, dtype=np.float32)
@validator("humidity", pre=True)
def validate_array(cls, value):
return np.array(value, dtype=np.float32)
class Config:
arbitrary_types_allowed = True
@app.stan.subscribe("event")
def on_event(event: NumpyEvent):
print(
f"INFO :: {event.timestamp} :: Temperature values: {event.values[0]} | Humidity values: {event.values[1]}"
)
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
faststan-0.1.6.tar.gz
(7.6 kB
view hashes)