Skip to main content

Extension of FastAPI with Kafka event handlers

Project description

fast-kafka-api

This file will become your README and also the index of your documentation.

Install

pip install fast_kafka_api

How to use

Fill me in please! Don’t forget code examples:

from os import environ

title = "Example for FastKafkaAPI"
description = "A simple example on how to use FastKafkaAPI"
version = "0.0.1"
openapi_url = "/openapi.json"
favicon_url = "/assets/images/favicon.ico"

contact = dict(name="airt.ai", url="https://airt.ai", email="info@airt.ai")

kafka_brokers = {
    "localhost": {
        "url": "kafka",
        "description": "local development kafka",
        "port": 9092,
    },
    "staging": {
        "url": "kafka.staging.acme.com",
        "description": "staging kafka",
        "port": 9092,
        "protocol": "kafka-secure",
        "security": {"type": "plain"},
    },
    "production": {
        "url": "kafka.acme.com",
        "description": "production kafka",
        "port": 9092,
        "protocol": "kafka-secure",
        "security": {"type": "plain"},
    },
}

kafka_server_url = "kafka"
kafka_server_port = "9092"

kafka_config = {
    "bootstrap.servers": f"{kafka_server_url}:{kafka_server_port}",
    "group.id": f"{kafka_server_url}:{kafka_server_port}_group",
    "auto.offset.reset": "earliest",
}
if "KAFKA_API_KEY" in environ:
    kafka_config = {
        **kafka_config,
        **{
            "security.protocol": "SASL_SSL",
            "sasl.mechanisms": "PLAIN",
            "sasl.username": environ["KAFKA_API_KEY"],
            "sasl.password": environ["KAFKA_API_SECRET"],
        },
    }

app = FastKafkaAPI(
    title=title,
    contact=contact,
    kafka_brokers=kafka_brokers,
    kafka_config=kafka_config,
    description=description,
    version=version,
    docs_url=None,
    redoc_url=None,
)
from typing import *
from datetime import datetime
from fast_kafka_api.application import KafkaMessage
from pydantic import NonNegativeInt, Field

class EventData(KafkaMessage):
    definition_id: str = Field(
        ...,
        example="appLaunch",
        description="name of the event",
        min_length=1,
    )
    occurred_time: datetime = Field(
        ...,
        example="2021-03-28T00:34:08",
        description="local time of the event",
    )
    user_id: NonNegativeInt = Field(
        ..., example=12345678, description="ID of a person"
    )
        
class TrainingDataStatus(KafkaMessage):
    no_of_records: NonNegativeInt = Field(
        ...,
        example=12_345,
        description="number of records (rows) ingested",
    )
    total_no_of_records: NonNegativeInt = Field(
        ...,
        example=1_000_000,
        description="total number of records (rows) to be ingested",
    )
        
_total_no_of_records = 0
_no_of_records_received = 0

@app.consumes("training_data")  # type: ignore
async def on_training_data(msg: EventData, produce: Callable[[str, TrainingDataStatus], None]) -> None:
    global _total_no_of_records
    global _no_of_records_received
    _no_of_records_received = _no_of_records_received + 1

    if _no_of_records_received % 100 == 0:
        training_data_status = TrainingDataStatus(
            no_of_records=_no_of_records_received,
            total_no_of_records=_total_no_of_records,
        )
        produce(topic="training_data_status", msg=training_data_status)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fast-kafka-api-0.0.1rc0.tar.gz (22.2 kB view hashes)

Uploaded Source

Built Distribution

fast_kafka_api-0.0.1rc0-py3-none-any.whl (23.9 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page