Extension of FastAPI with Kafka event handlers
Project description
fast-kafka-api
This file will become your README and also the index of your documentation.
Install
pip install fast_kafka_api
How to use
Fill me in please! Don’t forget code examples:
from os import environ
title = "Example for FastKafkaAPI"
description = "A simple example on how to use FastKafkaAPI"
version = "0.0.1"
openapi_url = "/openapi.json"
favicon_url = "/assets/images/favicon.ico"
contact = dict(name="airt.ai", url="https://airt.ai", email="info@airt.ai")
kafka_brokers = {
"localhost": {
"url": "kafka",
"description": "local development kafka",
"port": 9092,
},
"staging": {
"url": "kafka.staging.acme.com",
"description": "staging kafka",
"port": 9092,
"protocol": "kafka-secure",
"security": {"type": "plain"},
},
"production": {
"url": "kafka.acme.com",
"description": "production kafka",
"port": 9092,
"protocol": "kafka-secure",
"security": {"type": "plain"},
},
}
kafka_server_url = "kafka"
kafka_server_port = "9092"
kafka_config = {
"bootstrap.servers": f"{kafka_server_url}:{kafka_server_port}",
"group.id": f"{kafka_server_url}:{kafka_server_port}_group",
"auto.offset.reset": "earliest",
}
if "KAFKA_API_KEY" in environ:
kafka_config = {
**kafka_config,
**{
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "PLAIN",
"sasl.username": environ["KAFKA_API_KEY"],
"sasl.password": environ["KAFKA_API_SECRET"],
},
}
app = FastKafkaAPI(
title=title,
contact=contact,
kafka_brokers=kafka_brokers,
kafka_config=kafka_config,
description=description,
version=version,
docs_url=None,
redoc_url=None,
)
from typing import *
from datetime import datetime
from fast_kafka_api.application import KafkaMessage
from pydantic import NonNegativeInt, Field
class EventData(KafkaMessage):
definition_id: str = Field(
...,
example="appLaunch",
description="name of the event",
min_length=1,
)
occurred_time: datetime = Field(
...,
example="2021-03-28T00:34:08",
description="local time of the event",
)
user_id: NonNegativeInt = Field(
..., example=12345678, description="ID of a person"
)
class TrainingDataStatus(KafkaMessage):
no_of_records: NonNegativeInt = Field(
...,
example=12_345,
description="number of records (rows) ingested",
)
total_no_of_records: NonNegativeInt = Field(
...,
example=1_000_000,
description="total number of records (rows) to be ingested",
)
_total_no_of_records = 0
_no_of_records_received = 0
@app.consumes("training_data") # type: ignore
async def on_training_data(msg: EventData, produce: Callable[[str, TrainingDataStatus], None]) -> None:
global _total_no_of_records
global _no_of_records_received
_no_of_records_received = _no_of_records_received + 1
if _no_of_records_received % 100 == 0:
training_data_status = TrainingDataStatus(
no_of_records=_no_of_records_received,
total_no_of_records=_total_no_of_records,
)
produce(topic="training_data_status", msg=training_data_status)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
fast-kafka-api-0.0.1rc0.tar.gz
(22.2 kB
view hashes)
Built Distribution
Close
Hashes for fast_kafka_api-0.0.1rc0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | a04016fe57c11fea0d38a46fa9e16d350f0276de7de9c4d492067611fbb766ff |
|
MD5 | 7212baf6fb458dbbf93ba5d163224b29 |
|
BLAKE2b-256 | 0574b982f00e0b43e26a2e76609c26f47efaea2599026a9fe350e16239a2ec21 |