Skip to main content

Build data streaming pipelines using NATS or NATS streaming

Project description

FastSTAN

Pipeline status Coverage report Packaging: poetry Style: flake8 Format: black Packaging: pytest PyPI Documentation License: MIT Gitpod ready-to-code

Easily deploy NATS and NATS Streaming subscribers using Python.

Features

  • Define subscribers using sync and async python functions
  • Automatic data parsing and validation using type annotations and pydantic
  • Support all subscription configuration available in stan.py and nats.py
  • Start subscriptions or services from command line
  • Publish messages from command line

Quick start

  • Install the package from pypi:
pip install faststan

Using the command line

Create your first NATS subscriber:

  • Create a file named app.py and write the following lines:
from pydantic import BaseModel

class NewEvent(BaseModel):
    name: str
    datetime: int

def on_event(event: NewEvent):
    print(f"INFO :: Received new message: {event}")
  • Start your subscriber:
nats sub start demo --function app:on_event
  • Publish a message:
nats pub demo --name "John Doe" --datetime 1602661983

NATS Streaming behave the same way:

  • Define your subscription:
from pydantic import BaseModel


class Greetings(BaseModel):
    message: str

def on_event(event: NewEvent) -> Greetings:
    print(f"Info :: Received new request.")
    return Greetings(message=f"Welcome to {event.name}!"
  • Start it using stan sub start command:
stan sub start demo --function app:on_event
  • And publish message using stan pub command:
stan pub demo --name "John Doe"

Using Python API

In this example, we will build a machine learning service that perform a prediction using an ONNX model. This service will be impletended using the [request/reply] pattern.

Before running the example, make sure you have the dependencies installed:

  • onnxruntime
  • numpy
  • httpx
import asyncio
from typing import List, Dict
from faststan.nats import FastNATS
from pydantic import BaseModel, validator

from httpx import AsyncClient
import numpy as np
import onnxruntime as rt


async def load_predictor(
    app: FastNATS,
    url: str = "https://s3-per-grenoble.ams3.digitaloceanspaces.com/models/rf_iris.onnx",
) -> None:
    """Load an ONNX model and return a predictor for this model."""

    async with AsyncClient() as http_client:
        http_response = await http_client.get(url)

    sess = rt.InferenceSession(http_response.content)

    input_name = sess.get_inputs()[0].name
    label_name = sess.get_outputs()[0].name
    proba_name = sess.get_outputs()[1].name

    def predict(data: np.ndarray):
        """Perform prediction for given data."""
        return sess.run([label_name, proba_name], {input_name: data})

    app.state["predictor"] = predict


class Event(BaseModel):
    """Incoming data expected by the predictor."""

    values: np.ndarray
    timestamp: int

    @validator("values", pre=True)
    def validate_array(cls, value):
        """Cast data to numpy array with float32 precision.

        A ValidationError will be raise if any error is raised in this function.
        """
        return np.array(value, dtype=np.float32)

    class Config:
        # This must be set to True in order to let pydantic handle numpy types
        arbitrary_types_allowed = True


class Result(BaseModel):
    """Result returned by the predictor."""

    probabilities: List[
        Dict[int, float]
    ]  # Example: [{ 0: 0.25, 1:0.75}, {0: 0.15, 1: 0.85}]
    labels: List[int]  # Example: [1, 1]


app = FastNATS()
app.state = {}

await load_predictor(app)
await app.connect()


@app.reply("predict")
def predict(event: Event) -> Result:
    print(f"{event.timestamp} :: Received new event data")
    labels, probas = app.state["predictor"](event.values)
    return {"probabilities": probas, "labels": labels.tolist()}


await app.start()
  • You can now publish messages on the service:
from faststan import FastNATS


async with FastNATS() as nats_client:
    reply_msg = await nats_client.request_json(
        "predict", {"values": [[0, 0, 0, 0]], "timestamp": 1602661983}
    )

print(f"Received a reply: {reply_msg}")

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

faststan-0.20.0.tar.gz (15.4 kB view details)

Uploaded Source

Built Distribution

faststan-0.20.0-py3-none-any.whl (20.3 kB view details)

Uploaded Python 3

File details

Details for the file faststan-0.20.0.tar.gz.

File metadata

  • Download URL: faststan-0.20.0.tar.gz
  • Upload date:
  • Size: 15.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.2 CPython/3.8.5 Windows/10

File hashes

Hashes for faststan-0.20.0.tar.gz
Algorithm Hash digest
SHA256 0fd6b65893d5d2ec8de6174396c1a1363c316034be3a92a7960f5324ff0f4738
MD5 cde1ed2c259b1c789300b1e363837874
BLAKE2b-256 1e7b663dd537e21f93e8dd8e0bd9371d9e39faec2d7513919e36c12097b65ad9

See more details on using hashes here.

File details

Details for the file faststan-0.20.0-py3-none-any.whl.

File metadata

  • Download URL: faststan-0.20.0-py3-none-any.whl
  • Upload date:
  • Size: 20.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.2 CPython/3.8.5 Windows/10

File hashes

Hashes for faststan-0.20.0-py3-none-any.whl
Algorithm Hash digest
SHA256 2f9799a4ac961660b3b480edd65f93c210daaea97ed33a6df532625cf3fbac89
MD5 670bd7984c3aef1d162c5b9e5a5d9f45
BLAKE2b-256 8471a58a476c6dbc2caca043f0d6cb061ce8e66b0deb69d7bdf36be4560d2f65

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page