Skip to main content

A Kafka mock library that is designed to be used in integration tests for applications using librdkafka.

Project description

Embedded Kafka (Kafka Simulator) for Python

PyPI PyPI - Python Version PyPI - License PyPI - Downloads PyPI - Coverage PyPI - Wheel PyPI - Implementation

Embedded Kafka is a mocking library for the confluent_kafka library used for Apache Kafka. Its goal is to ease the effort of writing integration tests that utilize Producer and/or Consumer instances. Of course, you can always span your own Kafka Cluster just for testing purposes, but it is not always the best solution.

With kafka_mocha you no longer need to have a Kafka Cluster running to test your Kafka-related code. Instead, you can use the KProducerand KConsumer (by simply decorating your code with @mock_producer/@mock_consumer) and check the behavior of your code - or even the messages that are being produced and consumed in the browser!

Inspiration for this project comes from the moto library, which provides a similar feature for AWS SDK.

Support me with

Buy Me A Coffee

Project Overview

The main component of this project is a process called KafkaSimulator which simulates the behavior of an actual Kafka Cluster, within the bounds of implementation limitations. The current version includes a KProducer class that acts as a mock for the Producer from the confluent_kafka package. A KConsumer class is still under development.

Additionally, the project includes a SchemaRegistryMock class that acts as a mock for the SchemaRegistryClient from the confluent_kafka package.

Table of Contents

Installation

Official Release
pip install kafka_mocha

or using your favorite package manager, e.g. poetry:

poetry add kafka_mocha

Prerelease or Development Version

From GitHub (development version):

pip install git+https://github.com/Effiware/kafka-mocha@develop

or as published (prerelease) version:

poetry add kafka_mocha --allow-prereleases

Usage

Starting Kafka Simulator

Kafka Simulator is automatically ran whenever any instance of either KProdcer or KConsumer is created (e.g. via mock_producer, mock_consumer). So there is no need to manually start it.

Upon default logging settings a custom start-up messages might be visible (default logging level is set to WARNING though):

INFO     ticking_thread  > Buffer for KProducer(4409519920): ticking initialized
INFO     buffer_handler  > Buffer for KProducer(4409519920) has been primed, length: 5, timeout: 300
INFO     kafka_simulator > Handle producers has been primed
INFO     kafka_simulator > Kafka Simulator initialized, id: 4399382400
DEBUG    kafka_simulator > Registered topics: [KTopic(name='_schemas', partition_no=1, config=None), KTopic(name='__consumer_offsets', partition_no=1, config=None)]
INFO     ticking_thread  > Buffer for KProducer(4409519920): ticking started
DEBUG    kproducer       > KProducer(4409519920): received ack: BUFFERED
DEBUG    buffer_handler  > Buffer for KProducer(4409519920): received done (or manual flush) signal...
DEBUG    kafka_simulator > Appended message: (partition=0, offset=1000, key=b'4df33f7a-fcee-4b3b-b176-a4e650540401', value=b'\x00\x00\x00\x00\x01H4df33f7a-fcee-4b3b-b176-a4e650540401\x08John\x06Doe\x01\x04\xbe\xfd\x83\x8b\xa2e\x00\x00\x00\x00\x00\x00\x00\x00He5df0ed3-765c-4f58-b18c-aea4400dfce4\xbe\xfd\x83\x8b\xa2e(kafka_mocha_examples\n1.0.0', headers=[]))
INFO     buffer_handler  > Buffer for KProducer(4409519920): Kafka response: SUCCESS
INFO     ticking_thread  > Buffer for KProducer(4409519920): stop event
DEBUG    buffer_handler  > Buffer for KProducer(4409519920): received done (or manual flush) signal...
INFO     buffer_handler  > Buffer for KProducer(4409519920): nothing to send...

Additionally, all the messages produced by the KProducer instances are stored in the KafkaSimulator instance. The messages can be dropped to either HTML or CSV file by passing output parameter, see KProucer and outputs for more details.

KProducer

To use the KProducer class in your tests, you need to import it from the kafka_simulator package:

import confluent_kafka

from kafka_mocha import mock_producer


@mock_producer()
def handle_produce():
    """Most basic usage of the KProducer class. For more go to `examples` directory."""
    producer = confluent_kafka.Producer({"bootstrap.servers": "localhost:9092"})
    producer.produce("test-topic", "some value".encode(), "key".encode())
    producer.flush()

The KProducer class replicates the interface and behavior of the Producer class from the confluent_kafka library. For more examples, see the examples directory.

Parameters for mock_producer
No Parameter name Parameter type Comment
1 loglevel Literal See available levels in logging library
2 output dict Dictionary with output configuration
3 output.format Literal html, csv or int - output format of messages emitted
4 output.name str Name of the output file (only for HTML), e.g. kafka-dump.html
5 output.include_internal_topics bool Flag to include internal topics in the output
6 output.include_markers bool Flag to include transaction markers in the output

KConsumer

The KConsumer class is still under development. It will replicate the interface and behavior of the Consumer class from the confluent_kafka library.

Parameters for mock_consumer
No Parameter name Parameter type Comment
1 loglevel Literal See available levels in logging library
2
3

Schema Registry Mock

The Schema Registry mock is a part of the kafla_mocha package. It is heavily inspired by the confluent-kafka-python/mock_schema_registry_client implementation and hence is lincensed under the Apache License, Version 2.0.

It provides fully compatible implementation of the SchemaRegistryClient class from the confluent_kafka library and similarly to the KProducer class, it can be used as a decorator:

import confluent_kafka.schema_registry
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.serialization import SerializationContext, MessageField

from kafka_mocha.schema_registry import mock_schema_registry


@mock_schema_registry()
def quick_start():
    schema_registry = confluent_kafka.schema_registry.SchemaRegistryClient({"url": "http://localhost:8081"})
    avro_serializer = AvroSerializer(schema_registry, conf={"auto.register.schemas": False})
    avro_serializer({"foo": "bar"}, SerializationContext("topic", MessageField.VALUE))

For more examples, see the examples directory.

Parameters for mock_schema_registry
No Parameter name Parameter type Comment
1 loglevel Literal See available levels in logging library
2 register_schemas list[str] List of schemas (as relative paths) to load into Schema Registry Mock on launch
3

Absolute imports

Due to specific nature of python imports, it is recommended (and enforced) to use absolute imports in your code. This is especially important when using mock_producer and mock_consumer decorators, as they are using patching mechanism from unittest.mock module.

Imports that will work:

import confluent_kafka
import confluent_kafka.schema_registry

producer = confluent_kafka.Producer({"bootstrap.servers": "localhost:9092"})  # OK
schema_registry = confluent_kafka.schema_registry.SchemaRegistryClient({"url": "http://localhost:8081"})  # OK

Imports that will not work:

from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient

producer = Producer({"bootstrap.servers": "localhost:9092"})  # Will not work
schema_registry = SchemaRegistryClient({"url": "http://localhost:8081"})  # Will not work

Contributing

We welcome contributions! Before posting your first PR, please see our contributing guidelines for more details.

Also, bear in mind that this project uses Poetry for dependency management. If you are not familiar with it, please first read the Poetry documentation and:

  1. Setup poetry environment (recommended)
  2. Don't overwrite the pyproject.toml file manually (Poetry will do it for you)
  3. Don't recreate the poetry.lock (unless you know what you are doing)
Cloning the repository
git clone git@github.com:Effiware/kafka-mocha.git
cd kafka-mocha

Installing dependencies

Default (and recommended) way:

poetry install --with test

Standard way:

poetry export -f requirements.txt --output requirements.txt
pip install -r requirements.txt

Running tests

Currently, test configuration is set up to run with pytest and kept in pytest.ini file. You can run them with:

poetry run pytest

License

This project is primarily licensed under the MIT License - see the LICENSE file for details.

Parts of this project (specifically the Schema Registry mock implementation) contain code from Confluent Inc., licensed under the Apache License, Version 2.0. See LICENSE.APACHE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kafka_mocha-1.0.5.tar.gz (64.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kafka_mocha-1.0.5-py3-none-any.whl (68.8 kB view details)

Uploaded Python 3

File details

Details for the file kafka_mocha-1.0.5.tar.gz.

File metadata

  • Download URL: kafka_mocha-1.0.5.tar.gz
  • Upload date:
  • Size: 64.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for kafka_mocha-1.0.5.tar.gz
Algorithm Hash digest
SHA256 802cd9691ec596486043b1c49e003d57dd775ebdf13ad07771cd8e062d03d8bb
MD5 d8834fb4862bdd037680c979057e2c3a
BLAKE2b-256 f312888e3d19a6fe19427d68b3b4eb6c4d72b3ac981a95d56e362b277f7c26ed

See more details on using hashes here.

Provenance

The following attestation bundles were made for kafka_mocha-1.0.5.tar.gz:

Publisher: pypi-publish.yml on Effiware/kafka-mocha

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file kafka_mocha-1.0.5-py3-none-any.whl.

File metadata

  • Download URL: kafka_mocha-1.0.5-py3-none-any.whl
  • Upload date:
  • Size: 68.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for kafka_mocha-1.0.5-py3-none-any.whl
Algorithm Hash digest
SHA256 2d8731e7d167df50cb7be465d6f79fad96eaa2349c225b72d28ec8b832008081
MD5 062d82401e488b848d661a3bf3d3e47f
BLAKE2b-256 1ea571cb0f806c290f11f9efc497455f65aae39be946c9b05d46a8478e36f9f7

See more details on using hashes here.

Provenance

The following attestation bundles were made for kafka_mocha-1.0.5-py3-none-any.whl:

Publisher: pypi-publish.yml on Effiware/kafka-mocha

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page