Skip to main content

FastKafka is a powerful and easy-to-use Python library for building asynchronous web services that interact with Kafka topics. Built on top of FastAPI, Starlette, Pydantic, AIOKafka and AsyncAPI, FastKafka simplifies the process of writing producers and consumers for Kafka topics.

Project description

FastKafka

Effortless Kafka integration for your web services


PyPI PyPI - Downloads PyPI - Python Version

GitHub Workflow Status CodeQL Dependency Review

GitHub


FastKafka is a powerful and easy-to-use Python library for building asynchronous services that interact with Kafka topics. Built on top of Pydantic, AIOKafka and AsyncAPI, FastKafka simplifies the process of writing producers and consumers for Kafka topics, handling all the parsing, networking, task scheduling and data generation automatically. With FastKafka, you can quickly prototype and develop high-performance Kafka-based services with minimal code, making it an ideal choice for developers looking to streamline their workflow and accelerate their projects.

Install

FastKafka works on macOS, Linux, and most Unix-style operating systems. You can install it with pip as usual:

pip install fastkafka

Writing server code

Here is an example python script using FastKafka that takes data from a Kafka topic, makes a prediction using a predictive model, and outputs the prediction to another Kafka topic.

Messages

FastKafka uses Pydantic to parse input JSON-encoded data into Python objects, making it easy to work with structured data in your Kafka-based applications. Pydantic’s BaseModel class allows you to define messages using a declarative syntax, making it easy to specify the fields and types of your messages.

This example defines two message classes for use in a FastKafka application:

  • The InputData class is used to represent input data for a predictive model. It has three fields: user_id, feature_1, and feature_2. The user_id field is of type NonNegativeInt, which is a subclass of int that only allows non-negative integers. The feature_1 and feature_2 fields are both lists of floating-point numbers and integers, respectively.

  • The Prediction class is used to represent the output of the predictive model. It has two fields: user_id and score. The score field is a floating-point number and it represents the prediction made by the model, such as the probability of churn in the next 28 days.

These message classes will be used to parse and validate incoming data in Kafka consumers and producers.

from typing import List
from pydantic import BaseModel, Field, NonNegativeInt

class InputData(BaseModel):
    user_id: NonNegativeInt = Field(..., example=202020, description="ID of a user")
    feature_1: List[float] = Field(
        ...,
        example=[1.2, 2.3, 4.5, 6.7, 0.1],
        description="input feature 1",
    )
    feature_2: List[int] = Field(
        ...,
        example=[2, 4, 3, 1, 0],
        description="input feature 2",
    )

class Prediction(BaseModel):
    user_id: NonNegativeInt = Field(..., example=202020, description="ID of a user")
    score: float = Field(
        ...,
        example=0.4321,
        description="Prediction score (e.g. the probability of churn in the next 28 days)",
        ge=0.0,
        le=1.0,
    )

These message classes will be used to parse and validate incoming data in a Kafka consumer and to produce a JSON-encoded message in a producer. Using Pydantic’s BaseModel in combination with FastKafka makes it easy to work with structured data in your Kafka-based applications.

Application

This example shows how to initialize a FastKafka application.

It starts by defining a dictionary called kafka_brokers, which contains two entries: "localhost" and "production", specifying local development and production Kafka brokers. Each entry specifies the URL, port, and other details of a Kafka broker. This dictionary is used for generating the documentation only and it is not being checked by the actual server.

Next, an object of the FastKafka class is initialized with the minimum set of arguments:

  • kafka_brokers: a dictionary used for generation of documentation

  • bootstrap_servers: a host[:port] string or list of host[:port] strings that a consumer or a producer should contact to bootstrap initial cluster metadata

from fastkafka.application import FastKafka

def create_app(bootstrap_servers: str) -> FastKafka:
    kafka_brokers = {
        "localhost": {
            "url": "localhost",
            "description": "local development kafka broker",
            "port": 9092,
        },
        "production": {
            "url": "kafka.airt.ai",
            "description": "production kafka broker",
            "port": 9092,
            "protocol": "kafka-secure",
            "security": {"type": "plain"},
        },
    }

    kafka_app = FastKafka(
        kafka_brokers=kafka_brokers,
        bootstrap_servers=bootstrap_servers,
    )
    
    return kafka_app

Function decorators

FastKafka provides convenient function decorators @kafka_app.consumes and @kafka_app.produces to allow you to delegate the actual process of

  • consuming and producing data to Kafka, and

  • decoding and encoding JSON encode messages

from user defined functions to the framework. The FastKafka framework delegates these jobs to AIOKafka and Pydantic libraries.

These decorators make it easy to specify the processing logic for your Kafka consumers and producers, allowing you to focus on the core business logic of your application without worrying about the underlying Kafka integration.

This following example shows how to use the @kafka_app.consumes and @kafka_app.produces decorators in a FastKafka application:

  • The @kafka_app.consumes decorator is applied to the on_input_data function, which specifies that this function should be called whenever a message is received on the “input_data” Kafka topic. The on_input_data function takes a single argument which is expected to be an instance of the InputData message class. Specifying the type of the single argument is instructing the Pydantic to use InputData.parse_raw() on the consumed message before passing it to the user defined function on_input_data.

  • The @produces decorator is applied to the to_predictions function, which specifies that this function should produce a message to the “predictions” Kafka topic whenever it is called. The to_predictions function takes two arguments: user_id and score. It creates a new Prediction message with these values and then returns it. The framework will call the Prediction.json().encode("utf-8") function on the returned value and produce it to the specified topic.

def decorate_app(kafka_app: FastKafka):
    @kafka_app.consumes(topic="input_data", auto_offset_reset="latest", group_id="my_group")
    async def on_input_data(msg: InputData):     
        # this is a mock up for testing, should be replaced with the real model
        class Model:
            async def predict(self, feature_1: List[int], feature_2: List[float]) -> float:
                return 0.87


        model = Model()
    
        score = await model.predict(feature_1=msg.feature_1, feature_2=msg.feature_2)
        await to_predictions(user_id=msg.user_id, score=score)


    @kafka_app.produces(topic="predictions")
    async def to_predictions(user_id: int, score: float) -> Prediction:
        prediction = Prediction(user_id=user_id, score=score)
        return prediction

Testing the service

The service can be tested using the LocalKafkaBroker and Tester instances.

from fastkafka.testing import LocalKafkaBroker
from fastkafka.application import Tester
from fastkafka.helpers import create_missing_topics
async with LocalKafkaBroker(
    zookeeper_port=9892, listener_port=9893
) as bootstrap_servers:
    create_missing_topics(
        ["input_data", "predictions"],
        bootstrap_servers=bootstrap_servers,
        num_partitions=1,
    )

    # Creating the KafkaApp object
    kafka_app = create_app(bootstrap_servers=bootstrap_servers)
    decorate_app(kafka_app=kafka_app)

    # Creating the Tester object
    tester = Tester(app=kafka_app)

    async with tester:
        # Send message to input_data topic
        await tester.to_input_data(
            InputData(user_id=1, feature_1=[0.1, 0.2], feature_2=[1.1, -1.2])
        )
        # Assert that the "kafka_app" service reacted to sent message with a Prediction message in predictions topic
        await tester.awaited_mocks.on_predictions.assert_awaited_with(
            Prediction(user_id=1, score=0.87), timeout=5
        )
[INFO] fastkafka.testing: Installing Java...
[INFO] fastkafka.testing:  - installing install-jdk...
Defaulting to user installation because normal site-packages is not writeable
Collecting install-jdk
  Downloading install-jdk-0.3.0.tar.gz (3.8 kB)
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Building wheels for collected packages: install-jdk
  Building wheel for install-jdk (setup.py): started
  Building wheel for install-jdk (setup.py): finished with status 'done'
  Created wheel for install-jdk: filename=install_jdk-0.3.0-py3-none-any.whl size=3741 sha256=d7d47386fbf6806f67acf3b4f8668eef131305e43eeb33a0dff719b72838f00c
  Stored in directory: /home/tvrtko/.cache/pip/wheels/79/7a/47/9a4619174f7ca0f1068edb7a5412730a37365b6d183b0b3847
Successfully built install-jdk
Installing collected packages: install-jdk
Successfully installed install-jdk-0.3.0
[INFO] fastkafka.testing:  - installing jdk...


[notice] A new release of pip is available: 23.0 -> 23.0.1
[notice] To update, run: python3 -m pip install --upgrade pip

/home/tvrtko/.jdk/jdk-11.0.18+10
[INFO] fastkafka.testing: Java installed.
[INFO] fastkafka.testing: Installing Kafka...

  0%|          | 0/832968 [00:00<?, ?it/s]

[INFO] fastkafka.testing: Kafka installed in /home/tvrtko/.local/kafka_2.13-3.3.2.
[INFO] fastkafka.testing: Starting zookeeper...
[INFO] fastkafka.testing: Zookeeper started, sleeping for 5 seconds...
[INFO] fastkafka.testing: Starting Kafka broker...
[INFO] fastkafka.testing: Kafka broker started, sleeping for 5 seconds...
[INFO] fastkafka.testing: Local Kafka broker up and running on 127.0.0.1:9893
[INFO] fastkafka.helpers: create_missing_topics(['input_data', 'predictions']): new_topics = [NewTopic(topic=input_data,num_partitions=1), NewTopic(topic=predictions,num_partitions=1)]
[INFO] fastkafka.application: _create_producer() : created producer using the config: '{'bootstrap_servers': '127.0.0.1:9893'}'
[INFO] fastkafka.application: _create_producer() : created producer using the config: '{'bootstrap_servers': '127.0.0.1:9893'}'
[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'bootstrap_servers': '127.0.0.1:9893', 'auto_offset_reset': 'latest', 'max_poll_records': 100, 'group_id': 'my_group'}
[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
[INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'input_data'})
[INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'input_data'}
[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'bootstrap_servers': '127.0.0.1:9893', 'auto_offset_reset': 'earliest', 'max_poll_records': 100}
[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
[INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'predictions'})
[INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'predictions'}
[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
[INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'predictions': 1}. 
[ERROR] aiokafka.consumer.group_coordinator: Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError
[ERROR] aiokafka.consumer.group_coordinator: Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError
[ERROR] aiokafka.consumer.group_coordinator: Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError
[ERROR] aiokafka.consumer.group_coordinator: Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError
[ERROR] aiokafka.consumer.group_coordinator: Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError
[ERROR] aiokafka.consumer.group_coordinator: Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError
[INFO] aiokafka.consumer.group_coordinator: Discovered coordinator 0 for group my_group
[INFO] aiokafka.consumer.group_coordinator: Revoking previously assigned partitions set() for group my_group
[INFO] aiokafka.consumer.group_coordinator: (Re-)joining group my_group
[INFO] aiokafka.consumer.group_coordinator: Joined group 'my_group' (generation 1) with member_id aiokafka-0.8.0-d4b67e81-1947-4aa5-8549-98c3254bcb4a
[INFO] aiokafka.consumer.group_coordinator: Elected group leader -- performing partition assignments using roundrobin
[INFO] aiokafka.consumer.group_coordinator: Successfully synced group my_group with generation 1
[INFO] aiokafka.consumer.group_coordinator: Setting newly assigned partitions {TopicPartition(topic='input_data', partition=0)} for group my_group
[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
[INFO] aiokafka.consumer.group_coordinator: LeaveGroup request succeeded
[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 630...
[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 630 terminated.
[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 263...
[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 263 terminated.
ok

Running the service

The service can be started using builtin faskafka run CLI command

We will concatenate the code snippets from above and save them in a file "server.py"

??? Example

This example contains the content of the file "server.py":

```python

from typing import List
from pydantic import BaseModel, Field, NonNegativeInt

class InputData(BaseModel):
    user_id: NonNegativeInt = Field(..., example=202020, description="ID of a user")
    feature_1: List[float] = Field(
        ...,
        example=[1.2, 2.3, 4.5, 6.7, 0.1],
        description="input feature 1",
    )
    feature_2: List[int] = Field(
        ...,
        example=[2, 4, 3, 1, 0],
        description="input feature 2",
    )

class Prediction(BaseModel):
    user_id: NonNegativeInt = Field(..., example=202020, description="ID of a user")
    score: float = Field(
        ...,
        example=0.4321,
        description="Prediction score (e.g. the probability of churn in the next 28 days)",
        ge=0.0,
        le=1.0,
    )



from os import environ

from fastkafka.application import FastKafka

kafka_brokers = {
    "localhost": {
        "url": "localhost",
        "description": "local development kafka broker",
        "port": 9092,
    },
    "production": {
        "url": "kafka.airt.ai",
        "description": "production kafka broker",
        "port": 9092,
        "protocol": "kafka-secure",
        "security": {"type": "plain"},
    },
}

bootstrap_servers = f"{environ['KAFKA_HOSTNAME']}:{environ['KAFKA_PORT']}"

kafka_app = FastKafka(
    kafka_brokers=kafka_brokers,
    bootstrap_servers=bootstrap_servers,
)



@kafka_app.consumes(topic="input_data", auto_offset_reset="latest", group_id="my_group")
async def on_input_data(msg: InputData):
    global model
    score = await model.predict(feature_1=msg.feature_1, feature_2=msg.feature_2)
    await to_predictions(user_id=msg.user_id, score=score)


@kafka_app.produces(topic="predictions")
async def to_predictions(user_id: int, score: float) -> Prediction:
    prediction = Prediction(user_id=user_id, score=score)
    return prediction


# this is a mock up for testing, should be replaced with the real model
class Model:
    async def predict(self, feature_1: List[int], feature_2: List[float]) -> float:
        return 0.87


model = Model()

```

Notice the bootstrap_servers = f"{environ['KAFKA_HOSTNAME']}:{environ['KAFKA_PORT']}" line. This enables us to pass the Kafka bootstrap server address to the app through the environment variables.

Then, we start the FastKafka service by running the following command in the folder where the server.py file is located:

fastkafka run --num-workers=1 server:kafka_app

After running the command, you should see an output like the one below:

[INFO] fastkafka.testing: Java is already installed.
[INFO] fastkafka.testing: Kafka is already installed.
[INFO] fastkafka.testing: Starting zookeeper...
[INFO] fastkafka.testing: Zookeeper started, sleeping for 5 seconds...
[INFO] fastkafka.testing: Starting Kafka broker...
[INFO] fastkafka.testing: Kafka broker started, sleeping for 5 seconds...
[INFO] fastkafka.testing: Local Kafka broker up and running on 127.0.0.1:9893
[2648]: [INFO] fastkafka.application: _create_producer() : created producer using the config: '{'bootstrap_servers': 'localhost:9893'}'
[2648]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
[2648]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'bootstrap_servers': 'localhost:9893', 'auto_offset_reset': 'latest', 'max_poll_records': 100, 'group_id': 'my_group'}
[2648]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
[2648]: [INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'input_data'})
[2648]: [INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'input_data'}
[2648]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
[2648]: [ERROR] aiokafka.consumer.group_coordinator: Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError
[2648]: [WARNING] aiokafka.cluster: Topic input_data is not available during auto-create initialization
[2648]: [ERROR] aiokafka.consumer.group_coordinator: Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError
[2648]: [WARNING] aiokafka.cluster: Topic input_data is not available during auto-create initialization
[2648]: [ERROR] aiokafka.consumer.group_coordinator: Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError
[2648]: [WARNING] aiokafka.cluster: Topic input_data is not available during auto-create initialization
[2648]: [ERROR] aiokafka.consumer.group_coordinator: Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError
[2648]: [WARNING] aiokafka.cluster: Topic input_data is not available during auto-create initialization
[2648]: [ERROR] aiokafka.consumer.group_coordinator: Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError
[2648]: [WARNING] aiokafka.cluster: Topic input_data is not available during auto-create initialization
[2648]: [ERROR] aiokafka.consumer.group_coordinator: Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError
[2648]: [WARNING] aiokafka.cluster: Topic input_data is not available during auto-create initialization
[2648]: [INFO] aiokafka.consumer.group_coordinator: Discovered coordinator 0 for group my_group
[2648]: [INFO] aiokafka.consumer.group_coordinator: Revoking previously assigned partitions set() for group my_group
[2648]: [INFO] aiokafka.consumer.group_coordinator: (Re-)joining group my_group
[2648]: [INFO] aiokafka.consumer.group_coordinator: Joined group 'my_group' (generation 1) with member_id aiokafka-0.8.0-d01c8ae6-0888-4b80-bd50-a8c9e2358e28
[2648]: [INFO] aiokafka.consumer.group_coordinator: Elected group leader -- performing partition assignments using roundrobin
[2648]: [WARNING] kafka.coordinator.assignors.roundrobin: No partition metadata for topic input_data
[2648]: [INFO] aiokafka.consumer.group_coordinator: Successfully synced group my_group with generation 1
[2648]: [INFO] aiokafka.consumer.group_coordinator: Setting newly assigned partitions set() for group my_group
Starting process cleanup, this may take a few seconds...
[INFO] fastkafka.server: terminate_asyncio_process(): Terminating the process 2648...
[2648]: [INFO] aiokafka.consumer.group_coordinator: LeaveGroup request succeeded
[2648]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
[2648]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
[INFO] fastkafka.server: terminate_asyncio_process(): Process 2648 terminated.

[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 2232...
[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 2232 terminated.
[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 1867...
[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 1867 terminated.

When the service is started, several log messages are printed to the console, including information about the application startup, AsyncAPI specification generation, and consumer loop status.

During the lifetime of the service, incoming requests will be processed by the FastKafka application and appropriate actions will be taken based on the defined Kafka consumers and producers. For example, if a message is received on the “input_data” Kafka topic, the on_input_data function will be called to process the message, and if the to_predictions function is called, it will produce a message to the “predictions” Kafka topic. The service will continue to run until it is shut down, at which point the application shutdown process will be initiated and the service will stop.

Checking out the documentation

To generate and serve the documentation locally, you can use the built in kafka function that will do all the work for you. In the folder where the server.py file is located, run the following command:

fastkafka docs serve server:kafka_app

After running the command you should see the following output:

[INFO] fastkafka._components.asyncapi: Old async specifications at '/tmp/tmp8ld9d1yo/asyncapi/spec/asyncapi.yml' does not exist.
[INFO] fastkafka._components.asyncapi: New async specifications generated at: '/tmp/tmp8ld9d1yo/asyncapi/spec/asyncapi.yml'
[INFO] fastkafka._components.asyncapi: Async docs generated at 'asyncapi/docs'
[INFO] fastkafka._components.asyncapi: Output of '$ npx -y -p @asyncapi/generator ag asyncapi/spec/asyncapi.yml @asyncapi/html-template -o asyncapi/docs --force-write'

Done! ✨
Check out your shiny new generated files at /tmp/tmp8ld9d1yo/asyncapi/docs.


Serving documentation on http://127.0.0.1:8000
Interupting serving of documentation and cleaning up...

The generated documentation is as follows:

Kafka_servers

Next, you can see the documentation generated from the @consumes decorator when used on the function on_input_data with a single parameter of type InputData:

class InputData(BaseModel):
    user_id: NonNegativeInt = Field(..., example=202020, description="ID of a user")
    feature_1: List[float] = Field(
        ...,
        example=[1.2, 2.3, 4.5, 6.7, 0.1],
        description="input feature 1",
    )
    feature_2: List[int] = Field(
        ...,
        example=[2, 4, 3, 1, 0],
        description="input feature 2",
    )
@kafka_app.consumes(topic="input_data", auto_offset_reset="latest", group_id="my_group")
async def on_input_data(msg: InputData):
    global model
    score = await model.predict(feature_1=msg.feature_1, feature_2=msg.feature_2)
    await to_predictions(user_id=msg.user_id, score=score)

The resulting documentation is generated as follows:

Kafka_consumer

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fastkafka-0.1.0rc3.tar.gz (64.9 kB view hashes)

Uploaded Source

Built Distribution

fastkafka-0.1.0rc3-py3-none-any.whl (67.8 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page