Skip to main content

A pytest plugin and CLI for validating Kafka JSON and Avro messages against contracts.

Project description

pytest-kafka-contract

Pytest plugin and CLI for contract testing Kafka event payloads.

pytest-kafka-contract helps QA and backend teams validate Kafka messages against explicit contracts before broken event changes reach consumers.

It supports:

  • JSON payload validation against YAML contracts
  • Avro record validation against .avsc schemas
  • Confluent Schema Registry checks
  • Real Kafka message validation
  • Real Kafka Avro message decoding
  • Pytest fixture API
  • CLI validation commands
  • Markdown and JSON reports

Why

Kafka messages can break silently.

A producer can rename a field, remove a required value, change a number into a string, or publish a payload that no longer matches what downstream consumers expect.

This package catches issues like:

  • Missing required fields
  • Wrong field types
  • Invalid constants
  • Invalid enum values
  • Unexpected extra fields
  • Null values where null is not allowed
  • Invalid Avro records
  • Schema Registry subject issues
  • Kafka messages that do not match their expected contract

The goal is simple:

Make Kafka event contracts testable in normal pytest workflows.


Install

Basic JSON contract validation:

pip install pytest-kafka-contract

Install everything, including Kafka, Avro, and Schema Registry support:

pip install "pytest-kafka-contract[all]"

Optional extras:

pip install "pytest-kafka-contract[avro]"
pip install "pytest-kafka-contract[kafka]"
pip install "pytest-kafka-contract[registry]"

Quick Start: JSON Contract Validation

Create a contract file:

mkdir -p contracts

Create contracts/order-created.yaml:

version: 1
name: order-created-v1
topic: orders.created

message:
  type: object
  required:
    - event_id
    - event_type
    - order
  properties:
    event_id:
      type: string
      nullable: false

    event_type:
      type: string
      const: OrderCreated
      nullable: false

    order:
      type: object
      required:
        - order_id
        - total
      properties:
        order_id:
          type: string

        total:
          type: number

rules:
  allow_extra_fields: false

Write a pytest test:

def test_order_created_contract(kafka_contract):
    result = kafka_contract.validate_payload(
        payload={
            "event_id": "evt_1",
            "event_type": "OrderCreated",
            "order": {
                "order_id": "ord_1",
                "total": 20.0,
            },
        },
        contract_path="contracts/order-created.yaml",
    )

    assert result.passed, result.issues

Run:

pytest

JSON Contract Failure Example

If the payload is wrong:

def test_order_created_contract_fails(kafka_contract):
    result = kafka_contract.validate_payload(
        payload={
            "event_id": "evt_1",
            "event_type": "WrongEvent",
            "order": {
                "order_id": "ord_1",
                "total": "20.0",
            },
        },
        contract_path="contracts/order-created.yaml",
    )

    assert result.passed, result.issues

The result contains readable issues such as:

CONST_MISMATCH
TYPE_MISMATCH

Avro Validation

Install Avro support:

pip install "pytest-kafka-contract[avro]"

Create schemas/order-created.avsc:

{
  "type": "record",
  "name": "OrderCreated",
  "namespace": "com.example.orders",
  "fields": [
    {
      "name": "event_id",
      "type": "string"
    },
    {
      "name": "order_id",
      "type": "string"
    },
    {
      "name": "total",
      "type": "double"
    },
    {
      "name": "currency",
      "type": {
        "type": "enum",
        "name": "Currency",
        "symbols": ["USD", "CAD", "EUR"]
      },
      "default": "USD"
    }
  ]
}

Validate a Python dictionary against the Avro schema:

def test_order_created_avro(kafka_contract):
    result = kafka_contract.validate_avro_record(
        record={
            "event_id": "evt_1",
            "order_id": "ord_1",
            "total": 20.0,
            "currency": "USD",
        },
        schema_path="schemas/order-created.avsc",
    )

    assert result.passed, result.issues

Schema Registry Checks

Install Schema Registry support:

pip install "pytest-kafka-contract[registry]"

Validate that a subject exists and is compatible with a local schema:

def test_schema_registry_subject(kafka_contract):
    result = kafka_contract.validate_schema_registry_subject(
        registry_url="http://localhost:8081",
        subject="orders.created-value",
        schema_path="schemas/order-created.avsc",
        compatibility="BACKWARD",
    )

    assert result.passed, result.issues

This can check:

  • Registry reachability
  • Subject existence
  • Latest schema lookup
  • Local schema comparison
  • Compatibility result

Real Kafka JSON Message Validation

Install Kafka support:

pip install "pytest-kafka-contract[kafka]"

Validate the latest JSON message from a Kafka topic:

def test_latest_json_message(kafka_contract):
    result = kafka_contract.validate_latest_json(
        topic="orders.created",
        contract_path="contracts/order-created.yaml",
        bootstrap_servers="localhost:9092",
        timeout_ms=10000,
    )

    assert result.passed, result.issues

This flow:

consume Kafka message
decode JSON
validate against YAML contract
return detailed result

Real Kafka Avro Message Validation

Install all optional dependencies:

pip install "pytest-kafka-contract[all]"

Validate the latest Avro message from Kafka using Schema Registry:

def test_latest_avro_message(kafka_contract):
    result = kafka_contract.validate_latest_avro(
        topic="orders.created",
        registry_url="http://localhost:8081",
        subject="orders.created-value",
        bootstrap_servers="localhost:9092",
        timeout_ms=10000,
    )

    assert result.passed, result.issues

This flow:

consume Kafka message
extract Confluent schema ID
fetch schema from Schema Registry
decode Avro payload
validate decoded record
return detailed result

Pytest Fixture API

The plugin provides a kafka_contract fixture.

def test_with_fixture(kafka_contract):
    result = kafka_contract.validate_payload(
        payload={"event_type": "OrderCreated"},
        contract_path="contracts/order-created.yaml",
    )

    assert result.passed, result.issues

Supported fixture methods:

kafka_contract.load_contract(contract_path)

kafka_contract.validate_payload(
    payload,
    contract_path,
)

kafka_contract.validate_avro_record(
    record,
    schema_path,
)

kafka_contract.validate_schema_registry_subject(
    registry_url,
    subject,
    schema_path,
    compatibility=None,
)

kafka_contract.validate_latest_json(
    topic,
    contract_path,
    bootstrap_servers="localhost:9092",
    timeout_ms=10000,
)

kafka_contract.validate_latest_avro(
    topic,
    registry_url,
    subject,
    bootstrap_servers="localhost:9092",
    timeout_ms=10000,
)

CLI Usage

Initialize example contracts and samples:

kafka-contract init

Validate a JSON sample file against a YAML contract:

kafka-contract validate-file \
  contracts/order-created.yaml \
  samples/order-created.json

Validate a JSON sample file against an Avro schema:

kafka-contract avro-validate-file \
  schemas/order-created.avsc \
  samples/order-created-avro.json

Check Schema Registry:

kafka-contract registry-check \
  --registry-url http://localhost:8081 \
  --subject orders.created-value \
  --schema schemas/order-created.avsc \
  --compatibility BACKWARD

Validate the latest JSON message from Kafka:

kafka-contract kafka-validate-json \
  --bootstrap-servers localhost:9092 \
  --topic orders.created \
  --contract contracts/order-created.yaml \
  --timeout-ms 10000

Validate the latest Avro message from Kafka:

kafka-contract kafka-validate-avro \
  --bootstrap-servers localhost:9092 \
  --registry-url http://localhost:8081 \
  --topic orders.created \
  --subject orders.created-value \
  --timeout-ms 10000

Show help:

kafka-contract --help

Pytest CLI Options

pytest \
  --kafka-contract contracts/order-created.yaml \
  --kafka-bootstrap-servers localhost:9092 \
  --kafka-timeout-ms 10000

Report options:

pytest \
  --kafka-contract-report .reports/kafka-contract.md \
  --kafka-contract-json-report .reports/kafka-contract.json

Available options:

--kafka-contract
--kafka-bootstrap-servers
--kafka-timeout-ms
--kafka-strict
--kafka-contract-report
--kafka-contract-json-report
--schema-registry-url
--kafka-avro-subject
--kafka-format

Contract Rules

Supported JSON contract types:

string
number
integer
boolean
object
array
null

Supported validation rules:

required
properties
nullable
const
enum
format: datetime
items
allow_extra_fields

Example:

message:
  type: object
  required:
    - event_id
    - status
  properties:
    event_id:
      type: string
      nullable: false

    status:
      type: string
      enum:
        - CREATED
        - UPDATED
        - CANCELLED

rules:
  allow_extra_fields: false

Result Model

All validators return a result object.

result.passed
result.issues
result.metadata

Example:

result = kafka_contract.validate_payload(payload, "contracts/order-created.yaml")

if not result.passed:
    for issue in result.issues:
        print(issue.code, issue.path, issue.message)

Issue examples:

MISSING_REQUIRED_FIELD
TYPE_MISMATCH
CONST_MISMATCH
ENUM_MISMATCH
EXTRA_FIELD
NULL_NOT_ALLOWED
AVRO_RECORD_INVALID
REGISTRY_SUBJECT_NOT_FOUND
REGISTRY_COMPATIBILITY_FAILED
KAFKA_NO_MESSAGE
KAFKA_DESERIALIZATION_FAILED

Reports

Markdown and JSON reports can be created from pytest runs:

pytest \
  --kafka-contract-report .reports/kafka-contract.md \
  --kafka-contract-json-report .reports/kafka-contract.json

Markdown report includes:

summary
passed checks
failed checks
issue codes
topic metadata
schema metadata

JSON report includes structured output for CI pipelines, artifacts, and Slack summaries.


Local Kafka Example

Start local Kafka-compatible infrastructure:

docker compose -f docker-compose.integration.yml up -d

Run integration tests:

PKCT_RUN_INTEGRATION=1 pytest tests/integration -q

Stop services:

docker compose -f docker-compose.integration.yml down -v

Development

Create a virtual environment:

python3 -m venv .venv
source .venv/bin/activate

Install in editable mode:

python -m pip install --upgrade pip
python -m pip install -e ".[dev]"

Run tests:

pytest

Run full integration suite:

docker compose -f docker-compose.integration.yml up -d
PKCT_RUN_INTEGRATION=1 pytest -q
docker compose -f docker-compose.integration.yml down -v

Run quality checks:

ruff check .
mypy src

Build package:

python -m build
python -m twine check dist/*

Tested Workflows

The test suite covers:

  • Unit validation
  • JSON contract validation
  • Avro schema validation
  • Schema Registry client behavior
  • Kafka validation logic
  • Pytest plugin discovery
  • Pytest fixture usage
  • CLI workflows
  • README command workflows
  • Temporary user project workflow
  • Wheel install workflow
  • Real Kafka integration tests

Current local validation target:

111 passed

When To Use This

Use this package when you want to test:

  • Kafka event payloads
  • Redpanda event payloads
  • JSON event contracts
  • Avro records
  • Schema Registry subjects
  • Event-driven microservices
  • CDC pipeline outputs
  • Producer/consumer contract expectations
  • QA automation checks in CI

It is especially useful when your team already uses pytest and wants lightweight Kafka contract checks without adopting a larger contract-testing platform immediately.


Limitations

This package is focused on pytest-native Kafka contract testing.

It is not:

  • A full replacement for Confluent Schema Registry
  • A full Pact Broker replacement
  • A Kafka monitoring platform
  • A production schema governance system

Recommended use:

use Schema Registry for schema governance
use pytest-kafka-contract for automated test validation

Roadmap

Planned improvements:

  • Protobuf support
  • Kafka header validation
  • Kafka key validation
  • JSON Schema export
  • Better CI report summaries
  • GitHub Actions examples
  • More built-in contract examples

License

MIT License

Copyright (c) 2026 Dharsan Guruparan

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pytest_kafka_contract-0.1.2.tar.gz (32.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pytest_kafka_contract-0.1.2-py3-none-any.whl (25.2 kB view details)

Uploaded Python 3

File details

Details for the file pytest_kafka_contract-0.1.2.tar.gz.

File metadata

  • Download URL: pytest_kafka_contract-0.1.2.tar.gz
  • Upload date:
  • Size: 32.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.6

File hashes

Hashes for pytest_kafka_contract-0.1.2.tar.gz
Algorithm Hash digest
SHA256 88e2148782dd9db45bf5a11d70a5ed4dec278ba61052ffcd5e5a2483d2f43302
MD5 d41796c7d55cc39e3271156738ae79be
BLAKE2b-256 347dbb2368d9557bfac9e191f84b370b99bb12c236410649af5d26a20af51ff3

See more details on using hashes here.

File details

Details for the file pytest_kafka_contract-0.1.2-py3-none-any.whl.

File metadata

File hashes

Hashes for pytest_kafka_contract-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 2141f0f4421f45677a535e6a6008b1907a34bdf7b81f0fa4753fc7c876642532
MD5 a9f1789974e0108a638b0998cd7a9a8c
BLAKE2b-256 6419948a537f22189daac1dca9d74ef97c67de0772bcbfd3db8dc0aa56633d3a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page