A pytest plugin and CLI for validating Kafka JSON and Avro messages against contracts.
Project description
pytest-kafka-contract
Pytest plugin and CLI for contract testing Kafka event payloads.
pytest-kafka-contract helps QA and backend teams validate Kafka messages against explicit contracts before broken event changes reach consumers.
It supports:
- JSON payload validation against YAML contracts
- Avro record validation against
.avscschemas - Confluent Schema Registry checks
- Real Kafka message validation
- Real Kafka Avro message decoding
- Pytest fixture API
- CLI validation commands
- Markdown and JSON reports
Why
Kafka messages can break silently.
A producer can rename a field, remove a required value, change a number into a string, or publish a payload that no longer matches what downstream consumers expect.
This package catches issues like:
- Missing required fields
- Wrong field types
- Invalid constants
- Invalid enum values
- Unexpected extra fields
- Null values where null is not allowed
- Invalid Avro records
- Schema Registry subject issues
- Kafka messages that do not match their expected contract
The goal is simple:
Make Kafka event contracts testable in normal pytest workflows.
Install
Basic JSON contract validation:
pip install pytest-kafka-contract
Install everything, including Kafka, Avro, and Schema Registry support:
pip install "pytest-kafka-contract[all]"
Optional extras:
pip install "pytest-kafka-contract[avro]"
pip install "pytest-kafka-contract[kafka]"
pip install "pytest-kafka-contract[registry]"
Quick Start: JSON Contract Validation
Create a contract file:
mkdir -p contracts
Create contracts/order-created.yaml:
version: 1
name: order-created-v1
topic: orders.created
message:
type: object
required:
- event_id
- event_type
- order
properties:
event_id:
type: string
nullable: false
event_type:
type: string
const: OrderCreated
nullable: false
order:
type: object
required:
- order_id
- total
properties:
order_id:
type: string
total:
type: number
rules:
allow_extra_fields: false
Write a pytest test:
def test_order_created_contract(kafka_contract):
result = kafka_contract.validate_payload(
payload={
"event_id": "evt_1",
"event_type": "OrderCreated",
"order": {
"order_id": "ord_1",
"total": 20.0,
},
},
contract_path="contracts/order-created.yaml",
)
assert result.passed, result.issues
Run:
pytest
JSON Contract Failure Example
If the payload is wrong:
def test_order_created_contract_fails(kafka_contract):
result = kafka_contract.validate_payload(
payload={
"event_id": "evt_1",
"event_type": "WrongEvent",
"order": {
"order_id": "ord_1",
"total": "20.0",
},
},
contract_path="contracts/order-created.yaml",
)
assert result.passed, result.issues
The result contains readable issues such as:
CONST_MISMATCH
TYPE_MISMATCH
Avro Validation
Install Avro support:
pip install "pytest-kafka-contract[avro]"
Create schemas/order-created.avsc:
{
"type": "record",
"name": "OrderCreated",
"namespace": "com.example.orders",
"fields": [
{
"name": "event_id",
"type": "string"
},
{
"name": "order_id",
"type": "string"
},
{
"name": "total",
"type": "double"
},
{
"name": "currency",
"type": {
"type": "enum",
"name": "Currency",
"symbols": ["USD", "CAD", "EUR"]
},
"default": "USD"
}
]
}
Validate a Python dictionary against the Avro schema:
def test_order_created_avro(kafka_contract):
result = kafka_contract.validate_avro_record(
record={
"event_id": "evt_1",
"order_id": "ord_1",
"total": 20.0,
"currency": "USD",
},
schema_path="schemas/order-created.avsc",
)
assert result.passed, result.issues
Schema Registry Checks
Install Schema Registry support:
pip install "pytest-kafka-contract[registry]"
Validate that a subject exists and is compatible with a local schema:
def test_schema_registry_subject(kafka_contract):
result = kafka_contract.validate_schema_registry_subject(
registry_url="http://localhost:8081",
subject="orders.created-value",
schema_path="schemas/order-created.avsc",
compatibility="BACKWARD",
)
assert result.passed, result.issues
This can check:
- Registry reachability
- Subject existence
- Latest schema lookup
- Local schema comparison
- Compatibility result
Real Kafka JSON Message Validation
Install Kafka support:
pip install "pytest-kafka-contract[kafka]"
Validate the latest JSON message from a Kafka topic:
def test_latest_json_message(kafka_contract):
result = kafka_contract.validate_latest_json(
topic="orders.created",
contract_path="contracts/order-created.yaml",
bootstrap_servers="localhost:9092",
timeout_ms=10000,
)
assert result.passed, result.issues
This flow:
consume Kafka message
decode JSON
validate against YAML contract
return detailed result
Real Kafka Avro Message Validation
Install all optional dependencies:
pip install "pytest-kafka-contract[all]"
Validate the latest Avro message from Kafka using Schema Registry:
def test_latest_avro_message(kafka_contract):
result = kafka_contract.validate_latest_avro(
topic="orders.created",
registry_url="http://localhost:8081",
subject="orders.created-value",
bootstrap_servers="localhost:9092",
timeout_ms=10000,
)
assert result.passed, result.issues
This flow:
consume Kafka message
extract Confluent schema ID
fetch schema from Schema Registry
decode Avro payload
validate decoded record
return detailed result
Pytest Fixture API
The plugin provides a kafka_contract fixture.
def test_with_fixture(kafka_contract):
result = kafka_contract.validate_payload(
payload={"event_type": "OrderCreated"},
contract_path="contracts/order-created.yaml",
)
assert result.passed, result.issues
Supported fixture methods:
kafka_contract.load_contract(contract_path)
kafka_contract.validate_payload(
payload,
contract_path,
)
kafka_contract.validate_avro_record(
record,
schema_path,
)
kafka_contract.validate_schema_registry_subject(
registry_url,
subject,
schema_path,
compatibility=None,
)
kafka_contract.validate_latest_json(
topic,
contract_path,
bootstrap_servers="localhost:9092",
timeout_ms=10000,
)
kafka_contract.validate_latest_avro(
topic,
registry_url,
subject,
bootstrap_servers="localhost:9092",
timeout_ms=10000,
)
CLI Usage
Initialize example contracts and samples:
kafka-contract init
Validate a JSON sample file against a YAML contract:
kafka-contract validate-file \
contracts/order-created.yaml \
samples/order-created.json
Validate a JSON sample file against an Avro schema:
kafka-contract avro-validate-file \
schemas/order-created.avsc \
samples/order-created-avro.json
Check Schema Registry:
kafka-contract registry-check \
--registry-url http://localhost:8081 \
--subject orders.created-value \
--schema schemas/order-created.avsc \
--compatibility BACKWARD
Validate the latest JSON message from Kafka:
kafka-contract kafka-validate-json \
--bootstrap-servers localhost:9092 \
--topic orders.created \
--contract contracts/order-created.yaml \
--timeout-ms 10000
Validate the latest Avro message from Kafka:
kafka-contract kafka-validate-avro \
--bootstrap-servers localhost:9092 \
--registry-url http://localhost:8081 \
--topic orders.created \
--subject orders.created-value \
--timeout-ms 10000
Show help:
kafka-contract --help
Pytest CLI Options
pytest \
--kafka-contract contracts/order-created.yaml \
--kafka-bootstrap-servers localhost:9092 \
--kafka-timeout-ms 10000
Report options:
pytest \
--kafka-contract-report .reports/kafka-contract.md \
--kafka-contract-json-report .reports/kafka-contract.json
Available options:
--kafka-contract
--kafka-bootstrap-servers
--kafka-timeout-ms
--kafka-strict
--kafka-contract-report
--kafka-contract-json-report
--schema-registry-url
--kafka-avro-subject
--kafka-format
Contract Rules
Supported JSON contract types:
string
number
integer
boolean
object
array
null
Supported validation rules:
required
properties
nullable
const
enum
format: datetime
items
allow_extra_fields
Example:
message:
type: object
required:
- event_id
- status
properties:
event_id:
type: string
nullable: false
status:
type: string
enum:
- CREATED
- UPDATED
- CANCELLED
rules:
allow_extra_fields: false
Result Model
All validators return a result object.
result.passed
result.issues
result.metadata
Example:
result = kafka_contract.validate_payload(payload, "contracts/order-created.yaml")
if not result.passed:
for issue in result.issues:
print(issue.code, issue.path, issue.message)
Issue examples:
MISSING_REQUIRED_FIELD
TYPE_MISMATCH
CONST_MISMATCH
ENUM_MISMATCH
EXTRA_FIELD
NULL_NOT_ALLOWED
AVRO_RECORD_INVALID
REGISTRY_SUBJECT_NOT_FOUND
REGISTRY_COMPATIBILITY_FAILED
KAFKA_NO_MESSAGE
KAFKA_DESERIALIZATION_FAILED
Reports
Markdown and JSON reports can be created from pytest runs:
pytest \
--kafka-contract-report .reports/kafka-contract.md \
--kafka-contract-json-report .reports/kafka-contract.json
Markdown report includes:
summary
passed checks
failed checks
issue codes
topic metadata
schema metadata
JSON report includes structured output for CI pipelines, artifacts, and Slack summaries.
Local Kafka Example
Start local Kafka-compatible infrastructure:
docker compose -f docker-compose.integration.yml up -d
Run integration tests:
PKCT_RUN_INTEGRATION=1 pytest tests/integration -q
Stop services:
docker compose -f docker-compose.integration.yml down -v
Development
Create a virtual environment:
python3 -m venv .venv
source .venv/bin/activate
Install in editable mode:
python -m pip install --upgrade pip
python -m pip install -e ".[dev]"
Run tests:
pytest
Run full integration suite:
docker compose -f docker-compose.integration.yml up -d
PKCT_RUN_INTEGRATION=1 pytest -q
docker compose -f docker-compose.integration.yml down -v
Run quality checks:
ruff check .
mypy src
Build package:
python -m build
python -m twine check dist/*
Tested Workflows
The test suite covers:
- Unit validation
- JSON contract validation
- Avro schema validation
- Schema Registry client behavior
- Kafka validation logic
- Pytest plugin discovery
- Pytest fixture usage
- CLI workflows
- README command workflows
- Temporary user project workflow
- Wheel install workflow
- Real Kafka integration tests
Current local validation target:
111 passed
When To Use This
Use this package when you want to test:
- Kafka event payloads
- Redpanda event payloads
- JSON event contracts
- Avro records
- Schema Registry subjects
- Event-driven microservices
- CDC pipeline outputs
- Producer/consumer contract expectations
- QA automation checks in CI
It is especially useful when your team already uses pytest and wants lightweight Kafka contract checks without adopting a larger contract-testing platform immediately.
Limitations
This package is focused on pytest-native Kafka contract testing.
It is not:
- A full replacement for Confluent Schema Registry
- A full Pact Broker replacement
- A Kafka monitoring platform
- A production schema governance system
Recommended use:
use Schema Registry for schema governance
use pytest-kafka-contract for automated test validation
Roadmap
Planned improvements:
- Protobuf support
- Kafka header validation
- Kafka key validation
- JSON Schema export
- Better CI report summaries
- GitHub Actions examples
- More built-in contract examples
License
MIT License
Copyright (c) 2026 Dharsan Guruparan
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pytest_kafka_contract-0.1.2.tar.gz.
File metadata
- Download URL: pytest_kafka_contract-0.1.2.tar.gz
- Upload date:
- Size: 32.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
88e2148782dd9db45bf5a11d70a5ed4dec278ba61052ffcd5e5a2483d2f43302
|
|
| MD5 |
d41796c7d55cc39e3271156738ae79be
|
|
| BLAKE2b-256 |
347dbb2368d9557bfac9e191f84b370b99bb12c236410649af5d26a20af51ff3
|
File details
Details for the file pytest_kafka_contract-0.1.2-py3-none-any.whl.
File metadata
- Download URL: pytest_kafka_contract-0.1.2-py3-none-any.whl
- Upload date:
- Size: 25.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2141f0f4421f45677a535e6a6008b1907a34bdf7b81f0fa4753fc7c876642532
|
|
| MD5 |
a9f1789974e0108a638b0998cd7a9a8c
|
|
| BLAKE2b-256 |
6419948a537f22189daac1dca9d74ef97c67de0772bcbfd3db8dc0aa56633d3a
|