Skip to main content

Build data streaming pipelines using NATS or NATS streaming

Project description

FastSTAN

Pipeline status Coverage report Packaging: poetry Style: flake8 Format: black Packaging: pytest PyPI Documentation License: MIT Gitpod ready-to-code

Easily deploy NATS and NATS Streaming subscribers using Python.

Features

  • Define subscribers using sync and async python functions
  • Automatic data parsing and validation using type annotations and pydantic
  • Support all subscription configuration available in stan.py and nats.py
  • Start subscriptions or services from command line
  • Publish messages from command line

Quick start

  • Install the package from pypi:
pip install faststan

Using the command line

Create your first NATS subscriber:

  • Create a file named app.py and write the following lines:
from pydantic import BaseModel

class NewEvent(BaseModel):
    name: str
    datetime: int

def on_event(event: NewEvent):
    print(f"INFO :: Received new message: {event}")
  • Start your subscriber:
nats sub start demo --function app:on_event
  • Publish a message:
nats pub demo --name "John Doe" --datetime 1602661983

NATS Streaming behave the same way:

  • Define your subscription:
from pydantic import BaseModel


class Greetings(BaseModel):
    message: str

def on_event(event: NewEvent) -> Greetings:
    print(f"Info :: Received new request.")
    return Greetings(message=f"Welcome to {event.name}!"
  • Start it using stan sub start command:
stan sub start demo --function app:on_event
  • And publish message using stan pub command:
stan pub demo --name "John Doe"

Using Python API

In this example, we will build a machine learning service that perform a prediction using an ONNX model. This service will be impletended using the [request/reply] pattern.

Before running the example, make sure you have the dependencies installed:

  • onnxruntime
  • numpy
  • httpx
import asyncio
from typing import List, Dict
from faststan.nats import FastNATS
from pydantic import BaseModel, validator

from httpx import AsyncClient
import numpy as np
import onnxruntime as rt


async def load_predictor(
    app: FastNATS,
    url: str = "https://s3-per-grenoble.ams3.digitaloceanspaces.com/models/rf_iris.onnx",
) -> None:
    """Load an ONNX model and return a predictor for this model."""

    async with AsyncClient() as http_client:
        http_response = await http_client.get(url)

    sess = rt.InferenceSession(http_response.content)

    input_name = sess.get_inputs()[0].name
    label_name = sess.get_outputs()[0].name
    proba_name = sess.get_outputs()[1].name

    def predict(data: np.ndarray):
        """Perform prediction for given data."""
        return sess.run([label_name, proba_name], {input_name: data})

    app.state["predictor"] = predict


class Event(BaseModel):
    """Incoming data expected by the predictor."""

    values: np.ndarray
    timestamp: int

    @validator("values", pre=True)
    def validate_array(cls, value):
        """Cast data to numpy array with float32 precision.

        A ValidationError will be raise if any error is raised in this function.
        """
        return np.array(value, dtype=np.float32)

    class Config:
        # This must be set to True in order to let pydantic handle numpy types
        arbitrary_types_allowed = True


class Result(BaseModel):
    """Result returned by the predictor."""

    probabilities: List[
        Dict[int, float]
    ]  # Example: [{ 0: 0.25, 1:0.75}, {0: 0.15, 1: 0.85}]
    labels: List[int]  # Example: [1, 1]


app = FastNATS()
app.state = {}

await load_predictor(app)
await app.connect()


@app.reply("predict")
def predict(event: Event) -> Result:
    print(f"{event.timestamp} :: Received new event data")
    labels, probas = app.state["predictor"](event.values)
    return {"probabilities": probas, "labels": labels.tolist()}


await app.start()
  • You can now publish messages on the service:
from faststan import FastNATS


async with FastNATS() as nats_client:
    reply_msg = await nats_client.request_json(
        "predict", {"values": [[0, 0, 0, 0]], "timestamp": 1602661983}
    )

print(f"Received a reply: {reply_msg}")

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

faststan-0.20.0.tar.gz (15.4 kB view hashes)

Uploaded source

Built Distribution

faststan-0.20.0-py3-none-any.whl (20.3 kB view hashes)

Uploaded py3

Supported by

AWS AWS Cloud computing Datadog Datadog Monitoring Facebook / Instagram Facebook / Instagram PSF Sponsor Fastly Fastly CDN Google Google Object Storage and Download Analytics Huawei Huawei PSF Sponsor Microsoft Microsoft PSF Sponsor NVIDIA NVIDIA PSF Sponsor Pingdom Pingdom Monitoring Salesforce Salesforce PSF Sponsor Sentry Sentry Error logging StatusPage StatusPage Status page