Skip to main content

Build data streaming pipelines using faststan

Project description

FastSTAN

Pipeline status Coverage report Packaging: poetry Style: flake8 Format: black Packaging: pytest PyPI Documentation License: MIT

Easily deploy NATS Streaming subscribers using Python.

Features

  • Define subscribers using sync and async python functions
  • Automatic data parsing and validation using type annotations and pydantic
  • Support custom validation using any function
  • Allow several subscribers on same channel
  • Support all subscription configuration available in stan.py
  • Healthcheck available using HTTP GET request to monitor the applications
  • (TODO) Metrics available using HTTP GET requests to monitor subsriptions status
  • All of FastAPI features

Quick start

  • Install the package from pypi:
pip install faststan
  • Create your first subscriber. Create a file named app.py and write the following lines:
from faststan import FastSTAN

app = FastSTAN()

@app.stan.subscribe("demo")
def on_event(message: str):
    print(f"INFO :: Received new message: {message}")
  • Start your subscriber:
uvicorn app:app
  • Or if you are in a jupyter notebook environment, start the subscriptions:
await app.stan.run()

Advanced features

Using error callbacks

from faststan import FastSTAN


app = FastSTAN()


def handle_error(error):
    print("ERROR: {error}")


@app.stan.subscribe("demo", error_cb=handle_error)
def on_event(message: str):
    print(f"INFO :: Received new message: {message}")

Using pydantic models

You can use pydantic models in order to automatically parse incoming messages:

from pydantic import BaseModel
from faststan import FastSTAN


class Event(BaseModel):
    timestamp: int
    temperature: float
    humidity: float


app = FastSTAN()


@app.stan.subscribe("event")
def on_event(event):
    msg = f"INFO :: {event.timestamp} :: Temperature: {event.temperature} | Humidity: {event.humidity}"
    print(msg)

Using pydantic models with numpy or pandas

import numpy as np
from pydantic import BaseModel
from faststan import FastSTAN


class NumpyEvent(BaseModel):
    values: np.ndarray
    timestamp: int

    @validator("temperature", pre=True)
    def validate_array(cls, value):
        return np.array(value, dtype=np.float32)

    @validator("humidity", pre=True)
    def validate_array(cls, value):
        return np.array(value, dtype=np.float32)

    class Config:
        arbitrary_types_allowed = True

@app.stan.subscribe("event")
def on_event(event: NumpyEvent):
    print(
        f"INFO :: {event.timestamp} :: Temperature values: {event.values[0]} | Humidity values: {event.values[1]}"
    )

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

faststan-0.1.6.tar.gz (7.6 kB view hashes)

Uploaded Source

Built Distribution

faststan-0.1.6-py3-none-any.whl (7.4 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page