A Kafka mock library that is designed to be used in integration tests for applications using librdkafka.
Project description
Embedded Kafka (Kafka Simulator) for Python
Embedded Kafka is a mocking library for the confluent_kafka library used for Apache Kafka. Its goal is to ease the
effort
of writing integration tests that utilize Producer and/or Consumer instances. Of course, you can always span your
own
Kafka Cluster just for testing purposes, but it is not always the best solution.
With kafka_mocha you no longer need to have a Kafka Cluster running to test your Kafka-related code. Instead, you
can use the KProducerand KConsumer (by simply decorating your code with @mock_producer/@mock_consumer) and check
the behavior of your
code - or even the messages that are being produced and consumed in the browser!
Inspiration for this project comes from the moto library, which provides a similar feature for AWS SDK.
Support me with
Project Overview
The main component of this project is a process called KafkaSimulator which simulates the behavior of an actual Kafka
Cluster, within the bounds of implementation limitations. The current version includes a KProducer class that acts as
a mock for the Producer from the confluent_kafka package. A KConsumer class is still under development.
Additionally, the project includes a SchemaRegistryMock class that acts as a mock for the SchemaRegistryClient from
the confluent_kafka package.
Table of Contents
Installation
Official Release
pip install kafka_mocha
or using your favorite package manager, e.g. poetry:
poetry add kafka_mocha
Prerelease or Development Version
From GitHub (development version):
pip install git+https://github.com/Effiware/kafka-mocha@develop
or as published (prerelease) version:
poetry add kafka_mocha --allow-prereleases
Usage
Starting Kafka Simulator
Kafka Simulator is automatically ran whenever any instance of either KProdcer or KConsumer is created (e.g. via
mock_producer, mock_consumer). So there is no need to manually start it.
Upon default logging settings a custom start-up messages might be visible (default logging level is set to WARNING though):
INFO ticking_thread > Buffer for KProducer(4409519920): ticking initialized
INFO buffer_handler > Buffer for KProducer(4409519920) has been primed, length: 5, timeout: 300
INFO kafka_simulator > Handle producers has been primed
INFO kafka_simulator > Kafka Simulator initialized, id: 4399382400
DEBUG kafka_simulator > Registered topics: [KTopic(name='_schemas', partition_no=1, config=None), KTopic(name='__consumer_offsets', partition_no=1, config=None)]
INFO ticking_thread > Buffer for KProducer(4409519920): ticking started
DEBUG kproducer > KProducer(4409519920): received ack: BUFFERED
DEBUG buffer_handler > Buffer for KProducer(4409519920): received done (or manual flush) signal...
DEBUG kafka_simulator > Appended message: (partition=0, offset=1000, key=b'4df33f7a-fcee-4b3b-b176-a4e650540401', value=b'\x00\x00\x00\x00\x01H4df33f7a-fcee-4b3b-b176-a4e650540401\x08John\x06Doe\x01\x04\xbe\xfd\x83\x8b\xa2e\x00\x00\x00\x00\x00\x00\x00\x00He5df0ed3-765c-4f58-b18c-aea4400dfce4\xbe\xfd\x83\x8b\xa2e(kafka_mocha_examples\n1.0.0', headers=[]))
INFO buffer_handler > Buffer for KProducer(4409519920): Kafka response: SUCCESS
INFO ticking_thread > Buffer for KProducer(4409519920): stop event
DEBUG buffer_handler > Buffer for KProducer(4409519920): received done (or manual flush) signal...
INFO buffer_handler > Buffer for KProducer(4409519920): nothing to send...
Additionally, all the messages produced by the KProducer instances are stored in the KafkaSimulator instance. The
messages can be dropped to either HTML or CSV file by passing output parameter, see KProucer
and outputs for more details.
KProducer
To use the KProducer class in your tests, you need to import it from the kafka_simulator package:
import confluent_kafka
from kafka_mocha import mock_producer
@mock_producer()
def handle_produce():
"""Most basic usage of the KProducer class. For more go to `examples` directory."""
producer = confluent_kafka.Producer({"bootstrap.servers": "localhost:9092"})
producer.produce("test-topic", "some value".encode(), "key".encode())
producer.flush()
The KProducer class replicates the interface and behavior of the Producer class from the confluent_kafka library.
For more examples, see the examples directory.
Parameters for mock_producer
| No | Parameter name | Parameter type | Comment |
|---|---|---|---|
| 1 | loglevel | Literal | See available levels in logging library |
| 2 | output | dict | Dictionary with output configuration |
| 3 | output.format | Literal | html, csv or int - output format of messages emitted |
| 4 | output.name | str | Name of the output file (only for HTML), e.g. kafka-dump.html |
| 5 | output.include_internal_topics | bool | Flag to include internal topics in the output |
| 6 | output.include_markers | bool | Flag to include transaction markers in the output |
KConsumer
The KConsumer class is still under development. It will replicate the interface and behavior of the Consumer class
from the confluent_kafka library.
Parameters for mock_consumer
| No | Parameter name | Parameter type | Comment |
|---|---|---|---|
| 1 | loglevel | Literal | See available levels in logging library |
| 2 | |||
| 3 |
Schema Registry Mock
The Schema Registry mock is a part of the kafla_mocha package. It is heavily inspired by
the confluent-kafka-python/mock_schema_registry_client
implementation and hence is lincensed under the Apache License, Version 2.0.
It provides fully compatible implementation of the SchemaRegistryClient class from the confluent_kafka library and
similarly to the KProducer class, it can be used as a decorator:
import confluent_kafka.schema_registry
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.serialization import SerializationContext, MessageField
from kafka_mocha.schema_registry import mock_schema_registry
@mock_schema_registry()
def quick_start():
schema_registry = confluent_kafka.schema_registry.SchemaRegistryClient({"url": "http://localhost:8081"})
avro_serializer = AvroSerializer(schema_registry, conf={"auto.register.schemas": False})
avro_serializer({"foo": "bar"}, SerializationContext("topic", MessageField.VALUE))
For more examples, see the examples directory.
Parameters for mock_schema_registry
| No | Parameter name | Parameter type | Comment |
|---|---|---|---|
| 1 | loglevel | Literal | See available levels in logging library |
| 2 | register_schemas | list[str] | List of schemas (as relative paths) to load into Schema Registry Mock on launch |
| 3 |
Absolute imports
Due to specific nature of python imports, it is recommended (and enforced) to use absolute imports in your code.
This is especially
important when using mock_producer and mock_consumer decorators, as they are using patching mechanism from
unittest.mock module.
Imports that will work:
import confluent_kafka
import confluent_kafka.schema_registry
producer = confluent_kafka.Producer({"bootstrap.servers": "localhost:9092"}) # OK
schema_registry = confluent_kafka.schema_registry.SchemaRegistryClient({"url": "http://localhost:8081"}) # OK
Imports that will not work:
from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
producer = Producer({"bootstrap.servers": "localhost:9092"}) # Will not work
schema_registry = SchemaRegistryClient({"url": "http://localhost:8081"}) # Will not work
Contributing
We welcome contributions! Before posting your first PR, please see our contributing guidelines for more details.
Also, bear in mind that this project uses Poetry for dependency management. If you are not familiar with it, please first read the Poetry documentation and:
- Setup poetry environment (recommended)
- Don't overwrite the
pyproject.tomlfile manually (Poetry will do it for you) - Don't recreate the
poetry.lock(unless you know what you are doing)
Cloning the repository
git clone git@github.com:Effiware/kafka-mocha.git
cd kafka-mocha
Installing dependencies
Default (and recommended) way:
poetry install --with test
Standard way:
poetry export -f requirements.txt --output requirements.txt
pip install -r requirements.txt
Running tests
Currently, test configuration is set up to run with pytest and kept in pytest.ini file. You can
run them with:
poetry run pytest
License
This project is primarily licensed under the MIT License - see the LICENSE file for details.
Parts of this project (specifically the Schema Registry mock implementation) contain code from Confluent Inc., licensed under the Apache License, Version 2.0. See LICENSE.APACHE for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file kafka_mocha-1.0.5.tar.gz.
File metadata
- Download URL: kafka_mocha-1.0.5.tar.gz
- Upload date:
- Size: 64.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
802cd9691ec596486043b1c49e003d57dd775ebdf13ad07771cd8e062d03d8bb
|
|
| MD5 |
d8834fb4862bdd037680c979057e2c3a
|
|
| BLAKE2b-256 |
f312888e3d19a6fe19427d68b3b4eb6c4d72b3ac981a95d56e362b277f7c26ed
|
Provenance
The following attestation bundles were made for kafka_mocha-1.0.5.tar.gz:
Publisher:
pypi-publish.yml on Effiware/kafka-mocha
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
kafka_mocha-1.0.5.tar.gz -
Subject digest:
802cd9691ec596486043b1c49e003d57dd775ebdf13ad07771cd8e062d03d8bb - Sigstore transparency entry: 189919209
- Sigstore integration time:
-
Permalink:
Effiware/kafka-mocha@510e1ce314dc0353da8315acab969badbf53a364 -
Branch / Tag:
refs/tags/1.0.5 - Owner: https://github.com/Effiware
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
pypi-publish.yml@510e1ce314dc0353da8315acab969badbf53a364 -
Trigger Event:
release
-
Statement type:
File details
Details for the file kafka_mocha-1.0.5-py3-none-any.whl.
File metadata
- Download URL: kafka_mocha-1.0.5-py3-none-any.whl
- Upload date:
- Size: 68.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2d8731e7d167df50cb7be465d6f79fad96eaa2349c225b72d28ec8b832008081
|
|
| MD5 |
062d82401e488b848d661a3bf3d3e47f
|
|
| BLAKE2b-256 |
1ea571cb0f806c290f11f9efc497455f65aae39be946c9b05d46a8478e36f9f7
|
Provenance
The following attestation bundles were made for kafka_mocha-1.0.5-py3-none-any.whl:
Publisher:
pypi-publish.yml on Effiware/kafka-mocha
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
kafka_mocha-1.0.5-py3-none-any.whl -
Subject digest:
2d8731e7d167df50cb7be465d6f79fad96eaa2349c225b72d28ec8b832008081 - Sigstore transparency entry: 189919210
- Sigstore integration time:
-
Permalink:
Effiware/kafka-mocha@510e1ce314dc0353da8315acab969badbf53a364 -
Branch / Tag:
refs/tags/1.0.5 - Owner: https://github.com/Effiware
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
pypi-publish.yml@510e1ce314dc0353da8315acab969badbf53a364 -
Trigger Event:
release
-
Statement type: