Build data streaming pipelines using NATS or NATS streaming
Project description
FastSTAN
Easily deploy NATS and NATS Streaming subscribers using Python.
Features
- Define subscribers using sync and async python functions
- Automatic data parsing and validation using type annotations and pydantic
- Support all subscription configuration available in stan.py and nats.py
- Start subscriptions or services from command line
- Publish messages from command line
Quick start
- Install the package from pypi:
pip install faststan
Using the command line
Create your first NATS subscriber:
- Create a file named
app.py
and write the following lines:
from pydantic import BaseModel
class NewEvent(BaseModel):
name: str
datetime: int
def on_event(event: NewEvent):
print(f"INFO :: Received new message: {event}")
- Start your subscriber:
nats sub start demo --function app:on_event
- Publish a message:
nats pub demo --name "John Doe" --datetime 1602661983
NATS Streaming behave the same way:
- Define your subscription:
from pydantic import BaseModel
class Greetings(BaseModel):
message: str
def on_event(event: NewEvent) -> Greetings:
print(f"Info :: Received new request.")
return Greetings(message=f"Welcome to {event.name}!"
- Start it using
stan sub start
command:
stan sub start demo --function app:on_event
- And publish message using
stan pub
command:
stan pub demo --name "John Doe"
Using Python API
In this example, we will build a machine learning service that perform a prediction using an ONNX model. This service will be impletended using the [request/reply] pattern.
Before running the example, make sure you have the dependencies installed:
onnxruntime
numpy
httpx
import asyncio
from typing import List, Dict
from faststan.nats import FastNATS
from pydantic import BaseModel, validator
from httpx import AsyncClient
import numpy as np
import onnxruntime as rt
async def load_predictor(
app: FastNATS,
url: str = "https://s3-per-grenoble.ams3.digitaloceanspaces.com/models/rf_iris.onnx",
) -> None:
"""Load an ONNX model and return a predictor for this model."""
async with AsyncClient() as http_client:
http_response = await http_client.get(url)
sess = rt.InferenceSession(http_response.content)
input_name = sess.get_inputs()[0].name
label_name = sess.get_outputs()[0].name
proba_name = sess.get_outputs()[1].name
def predict(data: np.ndarray):
"""Perform prediction for given data."""
return sess.run([label_name, proba_name], {input_name: data})
app.state["predictor"] = predict
class Event(BaseModel):
"""Incoming data expected by the predictor."""
values: np.ndarray
timestamp: int
@validator("values", pre=True)
def validate_array(cls, value):
"""Cast data to numpy array with float32 precision.
A ValidationError will be raise if any error is raised in this function.
"""
return np.array(value, dtype=np.float32)
class Config:
# This must be set to True in order to let pydantic handle numpy types
arbitrary_types_allowed = True
class Result(BaseModel):
"""Result returned by the predictor."""
probabilities: List[
Dict[int, float]
] # Example: [{ 0: 0.25, 1:0.75}, {0: 0.15, 1: 0.85}]
labels: List[int] # Example: [1, 1]
app = FastNATS()
app.state = {}
await load_predictor(app)
await app.connect()
@app.reply("predict")
def predict(event: Event) -> Result:
print(f"{event.timestamp} :: Received new event data")
labels, probas = app.state["predictor"](event.values)
return {"probabilities": probas, "labels": labels.tolist()}
await app.start()
- You can now publish messages on the service:
from faststan import FastNATS
async with FastNATS() as nats_client:
reply_msg = await nats_client.request_json(
"predict", {"values": [[0, 0, 0, 0]], "timestamp": 1602661983}
)
print(f"Received a reply: {reply_msg}")
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
faststan-0.20.0.tar.gz
(15.4 kB
view details)
Built Distribution
faststan-0.20.0-py3-none-any.whl
(20.3 kB
view details)
File details
Details for the file faststan-0.20.0.tar.gz
.
File metadata
- Download URL: faststan-0.20.0.tar.gz
- Upload date:
- Size: 15.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.1.2 CPython/3.8.5 Windows/10
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 0fd6b65893d5d2ec8de6174396c1a1363c316034be3a92a7960f5324ff0f4738 |
|
MD5 | cde1ed2c259b1c789300b1e363837874 |
|
BLAKE2b-256 | 1e7b663dd537e21f93e8dd8e0bd9371d9e39faec2d7513919e36c12097b65ad9 |
File details
Details for the file faststan-0.20.0-py3-none-any.whl
.
File metadata
- Download URL: faststan-0.20.0-py3-none-any.whl
- Upload date:
- Size: 20.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.1.2 CPython/3.8.5 Windows/10
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 2f9799a4ac961660b3b480edd65f93c210daaea97ed33a6df532625cf3fbac89 |
|
MD5 | 670bd7984c3aef1d162c5b9e5a5d9f45 |
|
BLAKE2b-256 | 8471a58a476c6dbc2caca043f0d6cb061ce8e66b0deb69d7bdf36be4560d2f65 |