A pytest plugin and CLI for validating Kafka JSON and Avro messages against contracts.
Project description
pytest-kafka-contract
pytest-kafka-contract is a pytest plugin and CLI for validating Kafka event payloads against JSON/YAML contracts, Avro schemas, Schema Registry subjects, and real Kafka messages.
It is lightweight pytest-native contract checking for QA automation, SDET, backend, and Kafka teams. It complements Schema Registry by making contract checks easy to run in unit tests, CI jobs, and local developer workflows.
Why this exists
Kafka messages can drift silently: producers rename fields, remove required data, change types, or publish payloads that consumers cannot handle. This package catches those failures in normal pytest runs before they reach production.
Features
- JSON payload validation with YAML contracts
- Avro record validation with
.avscschemas - Confluent Schema Registry subject checks
- Real Kafka JSON message validation
- Real Kafka Avro message validation with Confluent wire format
kafka_contractpytest fixture- CLI commands for local and CI usage
- Markdown and JSON reports
Install
Basic JSON support:
pip install pytest-kafka-contract
All features:
pip install "pytest-kafka-contract[all]"
Individual extras:
pip install "pytest-kafka-contract[avro]"
pip install "pytest-kafka-contract[registry]"
pip install "pytest-kafka-contract[kafka]"
The base install supports JSON contracts and does not import Kafka, Avro, or Schema Registry clients at import time.
Beginner Flow
Step 1: Install:
pip install "pytest-kafka-contract[all]"
Step 2: Generate starter files:
pytest-kafka-contract init
Step 3: Validate sample JSON:
pytest-kafka-contract validate-file contracts/order-created.yaml samples/order-created.json
Step 4: Validate sample Avro record:
pytest-kafka-contract avro-validate-file schemas/order-created.avsc samples/order-created-avro.json
Step 5: Write pytest test:
def test_order_created(kafka_contract):
result = kafka_contract.validate_payload(
payload={
"event_id": "evt_1",
"event_type": "OrderCreated",
"order": {"order_id": "ord_1", "total": 20.0},
},
contract_path="contracts/order-created.yaml",
)
assert result.passed, result.issues
Step 6: Run pytest:
pytest -q
Step 7: Optional Schema Registry check:
pytest-kafka-contract registry-check \
--registry-url http://localhost:8081 \
--subject orders.created-value \
--schema schemas/order-created.avsc
Step 8: Optional real Kafka check:
pytest-kafka-contract kafka-validate-json \
--bootstrap-servers localhost:19092 \
--topic orders.created \
--contract contracts/order-created.yaml \
--auto-offset-reset earliest
Quick Start: JSON Contract
Create folders:
mkdir -p contracts samples tests
Create contracts/order-created.yaml:
version: 1
name: order-created-v1
topic: orders.created
message:
type: object
required:
- event_id
- event_type
- order
properties:
event_id:
type: string
nullable: false
event_type:
type: string
const: OrderCreated
nullable: false
order:
type: object
nullable: false
required:
- order_id
- total
properties:
order_id:
type: string
nullable: false
total:
type: number
nullable: false
rules:
allow_extra_fields: false
Create samples/order-created.json:
{
"event_id": "evt_1",
"event_type": "OrderCreated",
"order": {
"order_id": "ord_1",
"total": 20.0
}
}
Create tests/test_order_created.py:
import json
def test_order_created_json(kafka_contract):
payload = json.loads(open("samples/order-created.json", encoding="utf-8").read())
result = kafka_contract.validate_payload(
payload=payload,
contract_path="contracts/order-created.yaml",
)
assert result.passed, result.issues
Run:
pytest -q
CLI:
pytest-kafka-contract validate-file contracts/order-created.yaml samples/order-created.json
Quick Start: Avro Schema
Create schemas/order-created.avsc:
{
"type": "record",
"name": "OrderCreated",
"namespace": "com.example.orders",
"fields": [
{ "name": "event_id", "type": "string" },
{ "name": "order_id", "type": "string" },
{ "name": "total", "type": "double" },
{
"name": "currency",
"type": {
"type": "enum",
"name": "Currency",
"symbols": ["USD", "CAD", "EUR"]
},
"default": "USD"
}
]
}
Create samples/order-created-avro.json:
{
"event_id": "evt_1",
"order_id": "ord_1",
"total": 20.0,
"currency": "USD"
}
Use the fixture:
def test_order_created_avro(kafka_contract):
result = kafka_contract.validate_avro_record(
record={
"event_id": "evt_1",
"order_id": "ord_1",
"total": 20.0,
"currency": "USD",
},
schema_path="schemas/order-created.avsc",
)
assert result.passed, result.issues
CLI:
pytest-kafka-contract avro-validate-file schemas/order-created.avsc samples/order-created-avro.json
Schema Registry Check
Schema Registry already manages schemas. This package checks it from pytest or the CLI, which is useful for CI and QA automation.
pytest-kafka-contract registry-check \
--registry-url http://localhost:8081 \
--subject orders.created-value \
--schema schemas/order-created.avsc \
--compatibility BACKWARD
def test_registry_subject(kafka_contract):
result = kafka_contract.validate_schema_registry_subject(
registry_url="http://localhost:8081",
subject="orders.created-value",
schema_path="schemas/order-created.avsc",
compatibility="BACKWARD",
)
assert result.passed, result.issues
Real Kafka JSON Validation
Requires Kafka running and install with [kafka] or [all].
pytest-kafka-contract kafka-validate-json \
--bootstrap-servers localhost:19092 \
--topic orders.created \
--contract contracts/order-created.yaml \
--timeout-ms 10000 \
--auto-offset-reset earliest
def test_latest_json_message(kafka_contract):
result = kafka_contract.validate_latest_json(
topic="orders.created",
contract_path="contracts/order-created.yaml",
bootstrap_servers="localhost:19092",
timeout_ms=10000,
auto_offset_reset="earliest",
)
assert result.passed, result.issues
Real Kafka Avro Validation
Requires Kafka, Schema Registry, and install with [all] or [kafka], [avro], and [registry].
pytest-kafka-contract kafka-validate-avro \
--bootstrap-servers localhost:19092 \
--registry-url http://localhost:8081 \
--topic orders.created \
--subject orders.created-value \
--timeout-ms 10000 \
--auto-offset-reset earliest
def test_latest_avro_message(kafka_contract):
result = kafka_contract.validate_latest_avro(
topic="orders.created",
registry_url="http://localhost:8081",
subject="orders.created-value",
bootstrap_servers="localhost:19092",
timeout_ms=10000,
auto_offset_reset="earliest",
)
assert result.passed, result.issues
CLI Reference
pytest-kafka-contract --help
pytest-kafka-contract init
pytest-kafka-contract validate-file contracts/order-created.yaml samples/order-created.json
pytest-kafka-contract avro-validate-file schemas/order-created.avsc samples/order-created-avro.json
pytest-kafka-contract registry-check --registry-url http://localhost:8081 --subject orders.created-value --schema schemas/order-created.avsc --compatibility BACKWARD
pytest-kafka-contract kafka-validate-json --bootstrap-servers localhost:19092 --topic orders.created --contract contracts/order-created.yaml --timeout-ms 10000 --auto-offset-reset earliest
pytest-kafka-contract kafka-validate-avro --bootstrap-servers localhost:19092 --registry-url http://localhost:8081 --topic orders.created --subject orders.created-value --timeout-ms 10000 --auto-offset-reset earliest
kafka-contract is also available as a compatibility alias.
Pytest Fixture API
kafka_contract.load_contract("contracts/order-created.yaml")
kafka_contract.validate_payload(payload, contract_path="contracts/order-created.yaml")
kafka_contract.validate_avro_record(record, schema_path="schemas/order-created.avsc")
kafka_contract.validate_schema_registry_subject(registry_url, subject, schema_path, compatibility="BACKWARD")
kafka_contract.validate_latest_json(topic, contract_path, bootstrap_servers="localhost:19092", auto_offset_reset="earliest")
kafka_contract.validate_latest_avro(topic, registry_url, subject, bootstrap_servers="localhost:19092", auto_offset_reset="earliest")
Pytest Options
pytest --kafka-contract contracts/order-created.yaml
pytest --kafka-bootstrap-servers localhost:9092
pytest --kafka-timeout-ms 10000
pytest --kafka-strict
pytest --kafka-contract-report .reports/kafka-contract.md
pytest --kafka-contract-json-report .reports/kafka-contract.json
pytest --schema-registry-url http://localhost:8081
pytest --kafka-avro-subject orders.created-value
pytest --kafka-format json
Reports
Markdown and JSON reports include total checks, passed checks, failed checks, format, topic, subject, schema id, issues, and metadata.
pytest --kafka-contract-report .reports/kafka-contract.md \
--kafka-contract-json-report .reports/kafka-contract.json
Example Markdown:
# Kafka Contract Report
| Total | Passed | Failed |
|---:|---:|---:|
| 3 | 2 | 1 |
## Failed Checks
### orders.created
- Format: avro
- Subject: orders.created-value
- Schema ID: 12
- Issue: AVRO_RECORD_INVALID at $.total
Local Integration Testing
Docker integration tests are optional and are not run by default.
The compose stack exposes Kafka on localhost:19092, Schema Registry on localhost:8081,
and Redpanda admin on ${PKCT_REDPANDA_ADMIN_PORT:-19644} to avoid common local 9644
conflicts.
./scripts/run-integration.sh
Equivalent manual commands:
docker compose -f docker-compose.integration.yml up -d
PKCT_RUN_INTEGRATION=1 \
PKCT_BOOTSTRAP_SERVERS=localhost:19092 \
PKCT_SCHEMA_REGISTRY_URL=http://localhost:8081 \
pytest tests/integration -q -s
docker compose -f docker-compose.integration.yml down -v
CI Example
- run: python -m pip install -e ".[dev]"
- run: ruff check .
- run: mypy src
- run: pytest --cov=pytest_kafka_contract --cov-report=term-missing
- run: python -m build
- run: python -m twine check dist/*
Limitations
- Alpha package.
- JSON contract syntax is intentionally small.
- Avro support expects standard
.avscschemas. - Kafka Avro validation expects Confluent wire format.
- Schema Registry auth is not included in the MVP.
- This does not replace Pact or Confluent governance tools.
Roadmap
- Protobuf support
- Schema Registry auth
- Batch topic validation
- Compatibility diff output
- Kafka key and header validation
License
MIT.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pytest_kafka_contract-0.1.0.tar.gz.
File metadata
- Download URL: pytest_kafka_contract-0.1.0.tar.gz
- Upload date:
- Size: 29.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2ada980ff10a5f89b68bf1e3b21a38b30b7a2fe49835f1738f5b2c01b47ccef4
|
|
| MD5 |
d63cd2c4cd1d5e533fb58ad5bc8ab2d6
|
|
| BLAKE2b-256 |
748de3b51bf6dedfe108336d2d821abb6447102be47ea07fe46f51454152bf67
|
File details
Details for the file pytest_kafka_contract-0.1.0-py3-none-any.whl.
File metadata
- Download URL: pytest_kafka_contract-0.1.0-py3-none-any.whl
- Upload date:
- Size: 24.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d12290a525a7b65b3f2fc75f2ba28cff57a3359da779757f941b74617fa12d95
|
|
| MD5 |
da7eb20ceaf50f43d23a7090612c2461
|
|
| BLAKE2b-256 |
82f1caa9188541803ff13b6b09371db32576fc0eb00dc0905f4f9976aba8e858
|