FastKafka is a powerful and easy-to-use Python library for building asynchronous web services that interact with Kafka topics. Built on top of FastAPI, Starlette, Pydantic, AIOKafka and AsyncAPI, FastKafka simplifies the process of writing producers and consumers for Kafka topics.
Project description
FastKafka
Effortless Kafka integration for your web services
FastKafka is a powerful and easy-to-use Python library for building asynchronous services that interact with Kafka topics. Built on top of Pydantic, AIOKafka and AsyncAPI, FastKafka simplifies the process of writing producers and consumers for Kafka topics, handling all the parsing, networking, task scheduling and data generation automatically. With FastKafka, you can quickly prototype and develop high-performance Kafka-based services with minimal code, making it an ideal choice for developers looking to streamline their workflow and accelerate their projects.
Install
FastKafka works on macOS, Linux, and most Unix-style operating systems.
You can install it with pip
as usual:
pip install fastkafka
Writing server code
Here is an example python script using FastKafka that takes data from a Kafka topic, makes a prediction using a predictive model, and outputs the prediction to another Kafka topic.
Messages
FastKafka uses Pydantic to parse input
JSON-encoded data into Python objects, making it easy to work with
structured data in your Kafka-based applications. Pydantic’s
BaseModel
class allows you
to define messages using a declarative syntax, making it easy to specify
the fields and types of your messages.
This example defines two message classes for use in a FastKafka application:
-
The
InputData
class is used to represent input data for a predictive model. It has three fields:user_id
,feature_1
, andfeature_2
. Theuser_id
field is of typeNonNegativeInt
, which is a subclass of int that only allows non-negative integers. Thefeature_1
andfeature_2
fields are both lists of floating-point numbers and integers, respectively. -
The
Prediction
class is used to represent the output of the predictive model. It has two fields:user_id
andscore
. Thescore
field is a floating-point number and it represents the prediction made by the model, such as the probability of churn in the next 28 days.
These message classes will be used to parse and validate incoming data in Kafka consumers and producers.
from typing import List
from pydantic import BaseModel, Field, NonNegativeInt
class InputData(BaseModel):
user_id: NonNegativeInt = Field(..., example=202020, description="ID of a user")
feature_1: List[float] = Field(
...,
example=[1.2, 2.3, 4.5, 6.7, 0.1],
description="input feature 1",
)
feature_2: List[int] = Field(
...,
example=[2, 4, 3, 1, 0],
description="input feature 2",
)
class Prediction(BaseModel):
user_id: NonNegativeInt = Field(..., example=202020, description="ID of a user")
score: float = Field(
...,
example=0.4321,
description="Prediction score (e.g. the probability of churn in the next 28 days)",
ge=0.0,
le=1.0,
)
These message classes will be used to parse and validate incoming data in a Kafka consumer and to produce a JSON-encoded message in a producer. Using Pydantic’s BaseModel in combination with FastKafka makes it easy to work with structured data in your Kafka-based applications.
Application
This example shows how to initialize a FastKafka application.
It starts by defining a dictionary called kafka_brokers
, which
contains two entries: "localhost"
and "production"
, specifying local
development and production Kafka brokers. Each entry specifies the URL,
port, and other details of a Kafka broker. This dictionary is used for
generating the documentation only and it is not being checked by the
actual server.
Next, an object of the FastAPI
class is created. It role is to serve
the documentation and to start and shutdown
FastKafka
.
Finally, an object of the
FastKafka
class is initialized with the minimum set of arguments:
-
app
: anFastAPI
application used for serving the documentation and starting/shutting down the service -
kafka_brokers
: a dictionary used for generation of documentation -
bootstrap_servers
: ahost[:port]
string or list ofhost[:port]
strings that a consumer or a producer should contact to bootstrap initial cluster metadata
from os import environ
from fastkafka.application import FastKafka
kafka_brokers = {
"localhost": {
"url": "localhost",
"description": "local development kafka broker",
"port": 9092,
},
"production": {
"url": "kafka.airt.ai",
"description": "production kafka broker",
"port": 9092,
"protocol": "kafka-secure",
"security": {"type": "plain"},
},
}
bootstrap_servers = f"{environ['KAFKA_HOSTNAME']}:{environ['KAFKA_PORT']}"
kafka_app = FastKafka(
kafka_brokers=kafka_brokers,
bootstrap_servers=bootstrap_servers,
)
Function decorators
FastKafka provides convenient function decorators @kafka_app.consumes
and @kafka_app.produces
to allow you to delegate the actual process of
-
consuming and producing data to Kafka, and
-
decoding and encoding JSON encode messages
from user defined functions to the framework. The FastKafka framework delegates these jobs to AIOKafka and Pydantic libraries.
These decorators make it easy to specify the processing logic for your Kafka consumers and producers, allowing you to focus on the core business logic of your application without worrying about the underlying Kafka integration.
This following example shows how to use the @kafka_app.consumes
and
@kafka_app.produces
decorators in a FastKafka application:
-
The
@kafka_app.consumes
decorator is applied to theon_input_data
function, which specifies that this function should be called whenever a message is received on the “input_data” Kafka topic. Theon_input_data
function takes a single argument which is expected to be an instance of theInputData
message class. Specifying the type of the single argument is instructing the Pydantic to useInputData.parse_raw()
on the consumed message before passing it to the user defined functionon_input_data
. -
The
@produces
decorator is applied to theto_predictions
function, which specifies that this function should produce a message to the “predictions” Kafka topic whenever it is called. Theto_predictions
function takes two arguments:user_id
andscore
. It creates a newPrediction
message with these values and then returns it. The framework will call thePrediction.json().encode("utf-8")
function on the returned value and produce it to the specified topic.
@kafka_app.consumes(topic="input_data", auto_offset_reset="latest", group_id="my_group")
async def on_input_data(msg: InputData):
global model
score = await model.predict(feature_1=msg.feature_1, feature_2=msg.feature_2)
await to_predictions(user_id=msg.user_id, score=score)
@kafka_app.produces(topic="predictions")
async def to_predictions(user_id: int, score: float) -> Prediction:
prediction = Prediction(user_id=user_id, score=score)
return prediction
# this is a mock up for testing, should be replaced with the real model
class Model:
async def predict(self, feature_1: List[int], feature_2: List[float]) -> float:
return 0.87
model = Model()
Running the service
The service can be started using builtin faskafka run CLI command
We will concatenate the code snippets from above and save them in a file
"server.py"
??? Example
This example contains the content of the file "server.py":
```python
from typing import List
from pydantic import BaseModel, Field, NonNegativeInt
class InputData(BaseModel):
user_id: NonNegativeInt = Field(..., example=202020, description="ID of a user")
feature_1: List[float] = Field(
...,
example=[1.2, 2.3, 4.5, 6.7, 0.1],
description="input feature 1",
)
feature_2: List[int] = Field(
...,
example=[2, 4, 3, 1, 0],
description="input feature 2",
)
class Prediction(BaseModel):
user_id: NonNegativeInt = Field(..., example=202020, description="ID of a user")
score: float = Field(
...,
example=0.4321,
description="Prediction score (e.g. the probability of churn in the next 28 days)",
ge=0.0,
le=1.0,
)
from os import environ
from fastkafka.application import FastKafka
kafka_brokers = {
"localhost": {
"url": "localhost",
"description": "local development kafka broker",
"port": 9092,
},
"production": {
"url": "kafka.airt.ai",
"description": "production kafka broker",
"port": 9092,
"protocol": "kafka-secure",
"security": {"type": "plain"},
},
}
bootstrap_servers = f"{environ['KAFKA_HOSTNAME']}:{environ['KAFKA_PORT']}"
kafka_app = FastKafka(
kafka_brokers=kafka_brokers,
bootstrap_servers=bootstrap_servers,
)
@kafka_app.consumes(topic="input_data", auto_offset_reset="latest", group_id="my_group")
async def on_input_data(msg: InputData):
global model
score = await model.predict(feature_1=msg.feature_1, feature_2=msg.feature_2)
await to_predictions(user_id=msg.user_id, score=score)
@kafka_app.produces(topic="predictions")
async def to_predictions(user_id: int, score: float) -> Prediction:
prediction = Prediction(user_id=user_id, score=score)
return prediction
# this is a mock up for testing, should be replaced with the real model
class Model:
async def predict(self, feature_1: List[int], feature_2: List[float]) -> float:
return 0.87
model = Model()
```
Then, we start the FaskKafka servie by running the following command in the folder where the server.py file is located:
fastkafka run --num-workers=1 server:kafka_app
After running the command, you should see an output like the one below:
[5047]: [INFO] fastkafka.application: _create_producer() : created producer using the config: '{'bootstrap_servers': 'tvrtko-fastkafka-kafka-1:9092'}'
[5047]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
[5047]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'bootstrap_servers': 'tvrtko-fastkafka-kafka-1:9092', 'auto_offset_reset': 'latest', 'max_poll_records': 100, 'group_id': 'my_group'}
[5047]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
[5047]: [INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'input_data'})
[5047]: [INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'input_data'}
[5047]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
[5047]: [INFO] aiokafka.consumer.group_coordinator: Discovered coordinator 1001 for group my_group
[5047]: [INFO] aiokafka.consumer.group_coordinator: Revoking previously assigned partitions set() for group my_group
[5047]: [INFO] aiokafka.consumer.group_coordinator: (Re-)joining group my_group
[5047]: [INFO] aiokafka.consumer.group_coordinator: Joined group 'my_group' (generation 3) with member_id aiokafka-0.8.0-4638ef56-bd7d-4581-95ad-42c90322442d
[5047]: [INFO] aiokafka.consumer.group_coordinator: Elected group leader -- performing partition assignments using roundrobin
[5047]: [INFO] aiokafka.consumer.group_coordinator: Successfully synced group my_group with generation 3
[5047]: [INFO] aiokafka.consumer.group_coordinator: Setting newly assigned partitions {TopicPartition(topic='input_data', partition=0)} for group my_group
Starting process cleanup, this may take a few seconds...
[INFO] fastkafka.server: terminate_asyncio_process(): Terminating the process 5047...
[5047]: [INFO] aiokafka.consumer.group_coordinator: LeaveGroup request succeeded
[5047]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
[5047]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
[INFO] fastkafka.server: terminate_asyncio_process(): Process 5047 terminated.
When the service is started, several log messages are printed to the console, including information about the application startup, AsyncAPI specification generation, and consumer loop status.
During the lifetime of the service, incoming requests will be processed
by the FastKafka application and appropriate actions will be taken based
on the defined Kafka consumers and producers. For example, if a message
is received on the “input_data” Kafka topic, the on_input_data
function will be called to process the message, and if the
to_predictions
function is called, it will produce a message to the
“predictions” Kafka topic. The service will continue to run until it is
shut down, at which point the application shutdown process will be
initiated and the service will stop.
Checking out the documentation
To generate and serve the documentation locally, you can use the built in kafka function that will do all the work for you. In the folder where the server.py file is located, run the following command:
fastkafka docs serve server:kafka_app
After running the command you should see the following output:
exit_code, output = await run_script_and_cancel(
script=script,
script_file="server.py",
cmd=cmd,
cancel_after=45,
)
print(output.decode("utf-8"))
assert exit_code == 0, exit_code
print(output.decode("utf-8"))
{'localhost': {'description': 'local development kafka broker',
'port': 9092,
'url': 'localhost'},
'production': {'description': 'production kafka broker',
'port': 9092,
'protocol': 'kafka-secure',
'security': {'type': 'plain'},
'url': 'kafka.airt.ai'}}
The generated documentation is as follows:
Next, you can see the documentation generated from the @consumes
decorator when used on the function on_input_data
with a single
parameter of type InputData
:
class InputData(BaseModel):
user_id: NonNegativeInt = Field(..., example=202020, description="ID of a user")
feature_1: List[float] = Field(
...,
example=[1.2, 2.3, 4.5, 6.7, 0.1],
description="input feature 1",
)
feature_2: List[int] = Field(
...,
example=[2, 4, 3, 1, 0],
description="input feature 2",
)
@kafka_app.consumes(topic="input_data", auto_offset_reset="latest", group_id="my_group")
async def on_input_data(msg: InputData):
global model
score = await model.predict(feature_1=msg.feature_1, feature_2=msg.feature_2)
await to_predictions(user_id=msg.user_id, score=score)
The resulting documentation is generated as follows:
Testing the service
from os import environ
import anyio
import asyncer
from tqdm.notebook import tqdm, trange
from fastkafka.helpers import (
consumes_messages,
create_missing_topics,
create_admin_client,
produce_messages,
wait_for_get_url,
)
from fastkafka.testing import LocalKafkaBroker
num_workers = 4
listener_port = 9913
msgs = [
dict(user_id=i, feature_1=[(i / 1_000) ** 2], feature_2=[i % 177])
for i in trange(5_000, desc="generating messages")
]
bootstrap_servers = f"{environ['KAFKA_HOSTNAME']}:{environ['KAFKA_PORT']}"
async with LocalKafkaBroker(
zookeeper_port=9911, listener_port=listener_port
) as bootstrap_servers:
create_missing_topics(
["input_data", "predictions"],
bootstrap_servers=bootstrap_servers,
num_partitions=4,
)
async with asyncer.create_task_group() as tg:
output = tg.soonify(run_script_and_cancel)(
script=script.replace(
"bootstrap_servers = f\"{environ['KAFKA_HOSTNAME']}:{environ['KAFKA_PORT']}\"",
f'bootstrap_servers = "localhost:{listener_port}"',
),
script_file=f"server.py",
cmd=f"fastkafka run --num-workers={num_workers} server:kafka_app",
cancel_after=120,
)
tg.soonify(consumes_messages)(
msgs_count=len(msgs),
topic="predictions",
bootstrap_servers=bootstrap_servers,
)
await anyio.sleep(10)
tg.soonify(produce_messages)(
msgs=msgs, topic="input_data", bootstrap_servers=bootstrap_servers
)
print(output.value[1].decode("UTF-8"))
generating messages: 0%| | 0/20000 [00:00<?, ?it/s]
[INFO] fastkafka.testing: Java is already installed.
[INFO] fastkafka.testing: But not exported to PATH, exporting...
[INFO] fastkafka.testing: Kafka is already installed.
[INFO] fastkafka.testing: But not exported to PATH, exporting...
[INFO] fastkafka.helpers: create_missing_topics(['input_data', 'predictions']): new_topics = [NewTopic(topic=predictions,num_partitions=4)]
[INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'predictions'})
[INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'predictions': 4}.
consuming from 'predictions': 0%| | 0/20000 [00:00<?, ?it/s]
producing to 'input_data': 0%| | 0/20000 [00:00<?, ?it/s]
[INFO] fastkafka.components._subprocess: terminate_asyncio_process(): Terminating the process 5435...
[INFO] fastkafka.components._subprocess: terminate_asyncio_process(): Process 5435 was already terminated.
[INFO] fastkafka.components._subprocess: terminate_asyncio_process(): Terminating the process 5052...
[INFO] fastkafka.components._subprocess: terminate_asyncio_process(): Process 5052 was already terminated.
PendingValueException: The return value of this task is still pending. Maybe you forgot to access it after the async with asyncer.create_task_group() block. If you need to access values of async tasks inside the same task group, you probably need a different approach, for example with AnyIO Streams.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for fastkafka-0.1.0rc1-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | acc00331ffb715f245809e21a1da4e4b3be02a875a7126b6fb4717d7e29cb07b |
|
MD5 | 873e25389d4320dae689f8e70594c01d |
|
BLAKE2b-256 | f902d47c8719ed4556827a70842fe693cf767ebc97d98e08d532c7ca6800410a |