Skip to main content

Extension of FastAPI with Kafka event handlers

Project description

fast-kafka-api

This file will become your README and also the index of your documentation.

Install

pip install fast_kafka_api

How to use

Fill me in please! Don’t forget code examples:

from os import environ

title = "Example for FastKafkaAPI"
description = "A simple example on how to use FastKafkaAPI"
version = "0.0.1"
openapi_url = "/openapi.json"
favicon_url = "/assets/images/favicon.ico"

contact = dict(name="airt.ai", url="https://airt.ai", email="info@airt.ai")

kafka_brokers = {
    "localhost": {
        "url": "kafka",
        "description": "local development kafka",
        "port": 9092,
    },
    "staging": {
        "url": "kafka.staging.acme.com",
        "description": "staging kafka",
        "port": 9092,
        "protocol": "kafka-secure",
        "security": {"type": "plain"},
    },
    "production": {
        "url": "kafka.acme.com",
        "description": "production kafka",
        "port": 9092,
        "protocol": "kafka-secure",
        "security": {"type": "plain"},
    },
}

kafka_server_url = "kafka"
kafka_server_port = "9092"

kafka_config = {
    "bootstrap.servers": f"{kafka_server_url}:{kafka_server_port}",
    "group.id": f"{kafka_server_url}:{kafka_server_port}_group",
    "auto.offset.reset": "earliest",
}
if "KAFKA_API_KEY" in environ:
    kafka_config = {
        **kafka_config,
        **{
            "security.protocol": "SASL_SSL",
            "sasl.mechanisms": "PLAIN",
            "sasl.username": environ["KAFKA_API_KEY"],
            "sasl.password": environ["KAFKA_API_SECRET"],
        },
    }

app = FastKafkaAPI(
    title=title,
    contact=contact,
    kafka_brokers=kafka_brokers,
    kafka_config=kafka_config,
    description=description,
    version=version,
    docs_url=None,
    redoc_url=None,
)
from typing import *
from datetime import datetime
from fast_kafka_api.application import KafkaMessage
from pydantic import NonNegativeInt, Field

class EventData(KafkaMessage):
    definition_id: str = Field(
        ...,
        example="appLaunch",
        description="name of the event",
        min_length=1,
    )
    occurred_time: datetime = Field(
        ...,
        example="2021-03-28T00:34:08",
        description="local time of the event",
    )
    user_id: NonNegativeInt = Field(
        ..., example=12345678, description="ID of a person"
    )
        
class TrainingDataStatus(KafkaMessage):
    no_of_records: NonNegativeInt = Field(
        ...,
        example=12_345,
        description="number of records (rows) ingested",
    )
    total_no_of_records: NonNegativeInt = Field(
        ...,
        example=1_000_000,
        description="total number of records (rows) to be ingested",
    )
        
_total_no_of_records = 0
_no_of_records_received = 0

@app.consumes("training_data")  # type: ignore
async def on_training_data(msg: EventData, produce: Callable[[str, TrainingDataStatus], None]) -> None:
    global _total_no_of_records
    global _no_of_records_received
    _no_of_records_received = _no_of_records_received + 1

    if _no_of_records_received % 100 == 0:
        training_data_status = TrainingDataStatus(
            no_of_records=_no_of_records_received,
            total_no_of_records=_total_no_of_records,
        )
        produce(topic="training_data_status", msg=training_data_status)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fast-kafka-api-0.0.1rc0.tar.gz (22.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fast_kafka_api-0.0.1rc0-py3-none-any.whl (23.9 kB view details)

Uploaded Python 3

File details

Details for the file fast-kafka-api-0.0.1rc0.tar.gz.

File metadata

  • Download URL: fast-kafka-api-0.0.1rc0.tar.gz
  • Upload date:
  • Size: 22.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.10.6

File hashes

Hashes for fast-kafka-api-0.0.1rc0.tar.gz
Algorithm Hash digest
SHA256 77d1da0a7ede3d0ea4b2644aa830968fc0ec65f3e92c13d2314b6183fa7d0262
MD5 ad02e249bb6ca8346d35d75a070ccae4
BLAKE2b-256 eb473ed41ecfdd4a00a8d411885369422e3be9cdf630bc542c5dbee3206cc84f

See more details on using hashes here.

File details

Details for the file fast_kafka_api-0.0.1rc0-py3-none-any.whl.

File metadata

File hashes

Hashes for fast_kafka_api-0.0.1rc0-py3-none-any.whl
Algorithm Hash digest
SHA256 a04016fe57c11fea0d38a46fa9e16d350f0276de7de9c4d492067611fbb766ff
MD5 7212baf6fb458dbbf93ba5d163224b29
BLAKE2b-256 0574b982f00e0b43e26a2e76609c26f47efaea2599026a9fe350e16239a2ec21

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page