Skip to main content

Extension of FastAPI with Kafka event handlers

Project description

FastKafkaAPI

Install

pip install fast-kafka-api

How to use

We’ll create a simple example of using FastKafkaAPI to consume input data from a topic and produce predictions using it into an another topic.

First, we define message types used in Kafka topic as follows:

from typing import List
from pydantic import BaseModel, Field, NonNegativeInt

class InputData(BaseModel):
    user_id: NonNegativeInt = Field(
        ..., example=202020, description="ID of a user"
    )
    feature_1: List[float] = Field(
        ...,
        example=[1.2, 2.3, 4.5, 6.7, 0.1],
        description="input feature 1",
    )
    feature_2: List[int] = Field(
        ...,
        example=[2, 4, 3, 1, 0],
        description="input feature 2",
    )

class Prediction(BaseModel):
    user_id: NonNegativeInt = Field(
        ..., example=202020, description="ID of a user"
    )
    score: float = Field(
        ...,
        example=0.4321,
        description="Prediction score (e.g. the probability of churn in the next 28 days)",
        ge=0.0,
        le=1.0,
    )

Next, we create an FastKafkaAPI object initialized to use Kafka brokers:

from os import environ
from fast_kafka_api.application import FastKafkaAPI

title = "Example for FastKafkaAPI"
description = "A simple example on how to use FastKafkaAPI"
version = "0.0.1"
openapi_url = "/openapi.json"

contact = dict(name="airt.ai", url="https://airt.ai", email="info@airt.ai")

kafka_brokers = {
    "localhost": {
        "url": "kafka",
        "description": "local development kafka",
        "port": 9092,
    },
    "production": {
        "url": "kafka.acme.com",
        "description": "production kafka",
        "port": 9092,
        "protocol": "kafka-secure",
        "security": {"type": "plain"},
    },
}

kafka_server_url = environ["KAFKA_HOSTNAME"]
kafka_server_port = environ["KAFKA_PORT"]
kafka_config = {
    "bootstrap_servers": f"{kafka_server_url}:{kafka_server_port}",
}

app = FastKafkaAPI(
    title=title,
    contact=contact,
    version=version,
    kafka_brokers=kafka_brokers,
    kafka_config=kafka_config,
    description=description,
    docs_url=None,
    redoc_url=None,
)
[INFO] fast_kafka_api._components.asyncapi: ok

Next, we use decorators to define which functions are called in input data (consuming data) and which functions are producing messages (producing data):

import unittest

@app.consumes(topic="input_data")
async def on_input_data(msg: InputData):
    print(f"msg={msg}")
    score = await model.predict(feature_1=msg.feature_1, feature_2=msg.feature_2)
    await to_predictions(user_id=msg.user_id, score=score)

@app.produces(topic="predictions")
async def to_predictions(user_id: int, score: float) -> Prediction:
    prediction = Prediction(user_id=user_id, score=score)
    print(f"prediction={prediction}")
    return prediction

Finally, start the service using uvicorn:

import uvicorn

uvicorn.run(app, host="0.0.0.0", port=4000)
INFO:     Started server process [4860]
INFO:     Waiting for application startup.

[INFO] fast_kafka_api._components.asyncapi: Async specifications generated at: 'asyncapi/spec/asyncapi.yml'
[INFO] fast_kafka_api._components.asyncapi: Async docs generated at 'asyncapi/docs'
[INFO] fast_kafka_api._components.asyncapi: Output of '$ npx -y -p @asyncapi/generator ag asyncapi/spec/asyncapi.yml @asyncapi/html-template -o asyncapi/docs --force-write'

Done! ✨
Check out your shiny new generated files at /work/fast-kafka-api/nbs/asyncapi/docs.


[INFO] fast_kafka_api._components.aiokafka_loop: aiokafka_consumer_loop() starting..
[INFO] fast_kafka_api._components.aiokafka_loop: aiokafka_consumer_loop(): Consumer created.

INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:4000 (Press CTRL+C to quit)

[INFO] fast_kafka_api._components.aiokafka_loop: aiokafka_consumer_loop(): Consumer started.
[INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'input_data'})
[INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'input_data'}
[INFO] fast_kafka_api._components.aiokafka_loop: aiokafka_consumer_loop(): Consumer subscribed.
[INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'input_data': 1}. 

INFO:     Shutting down
INFO:     Waiting for application shutdown.

[INFO] fast_kafka_api._components.aiokafka_loop: aiokafka_consumer_loop(): Consumer stopped.
[INFO] fast_kafka_api._components.aiokafka_loop: aiokafka_consumer_loop() finished.

INFO:     Application shutdown complete.
INFO:     Finished server process [4860]

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fast-kafka-api-0.0.2rc0.tar.gz (25.8 kB view hashes)

Uploaded Source

Built Distribution

fast_kafka_api-0.0.2rc0-py3-none-any.whl (32.9 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page