Skip to main content

A pytest plugin and CLI for validating Kafka JSON and Avro messages against contracts.

Project description

pytest-kafka-contract

pytest-kafka-contract is a pytest plugin and CLI for validating Kafka event payloads against JSON/YAML contracts, Avro schemas, Schema Registry subjects, and real Kafka messages.

It is lightweight pytest-native contract checking for QA automation, SDET, backend, and Kafka teams. It complements Schema Registry by making contract checks easy to run in unit tests, CI jobs, and local developer workflows.

Why this exists

Kafka messages can drift silently: producers rename fields, remove required data, change types, or publish payloads that consumers cannot handle. This package catches those failures in normal pytest runs before they reach production.

Features

  • JSON payload validation with YAML contracts
  • Avro record validation with .avsc schemas
  • Confluent Schema Registry subject checks
  • Real Kafka JSON message validation
  • Real Kafka Avro message validation with Confluent wire format
  • kafka_contract pytest fixture
  • CLI commands for local and CI usage
  • Markdown and JSON reports

Install

Basic JSON support:

pip install pytest-kafka-contract

All features:

pip install "pytest-kafka-contract[all]"

Individual extras:

pip install "pytest-kafka-contract[avro]"
pip install "pytest-kafka-contract[registry]"
pip install "pytest-kafka-contract[kafka]"

The base install supports JSON contracts and does not import Kafka, Avro, or Schema Registry clients at import time.

Beginner Flow

Step 1: Install:

pip install "pytest-kafka-contract[all]"

Step 2: Generate starter files:

pytest-kafka-contract init

Step 3: Validate sample JSON:

pytest-kafka-contract validate-file contracts/order-created.yaml samples/order-created.json

Step 4: Validate sample Avro record:

pytest-kafka-contract avro-validate-file schemas/order-created.avsc samples/order-created-avro.json

Step 5: Write pytest test:

def test_order_created(kafka_contract):
    result = kafka_contract.validate_payload(
        payload={
            "event_id": "evt_1",
            "event_type": "OrderCreated",
            "order": {"order_id": "ord_1", "total": 20.0},
        },
        contract_path="contracts/order-created.yaml",
    )

    assert result.passed, result.issues

Step 6: Run pytest:

pytest -q

Step 7: Optional Schema Registry check:

pytest-kafka-contract registry-check \
  --registry-url http://localhost:8081 \
  --subject orders.created-value \
  --schema schemas/order-created.avsc

Step 8: Optional real Kafka check:

pytest-kafka-contract kafka-validate-json \
  --bootstrap-servers localhost:19092 \
  --topic orders.created \
  --contract contracts/order-created.yaml \
  --auto-offset-reset earliest

Quick Start: JSON Contract

Create folders:

mkdir -p contracts samples tests

Create contracts/order-created.yaml:

version: 1
name: order-created-v1
topic: orders.created
message:
  type: object
  required:
    - event_id
    - event_type
    - order
  properties:
    event_id:
      type: string
      nullable: false
    event_type:
      type: string
      const: OrderCreated
      nullable: false
    order:
      type: object
      nullable: false
      required:
        - order_id
        - total
      properties:
        order_id:
          type: string
          nullable: false
        total:
          type: number
          nullable: false
rules:
  allow_extra_fields: false

Create samples/order-created.json:

{
  "event_id": "evt_1",
  "event_type": "OrderCreated",
  "order": {
    "order_id": "ord_1",
    "total": 20.0
  }
}

Create tests/test_order_created.py:

import json


def test_order_created_json(kafka_contract):
    payload = json.loads(open("samples/order-created.json", encoding="utf-8").read())
    result = kafka_contract.validate_payload(
        payload=payload,
        contract_path="contracts/order-created.yaml",
    )
    assert result.passed, result.issues

Run:

pytest -q

CLI:

pytest-kafka-contract validate-file contracts/order-created.yaml samples/order-created.json

Quick Start: Avro Schema

Create schemas/order-created.avsc:

{
  "type": "record",
  "name": "OrderCreated",
  "namespace": "com.example.orders",
  "fields": [
    { "name": "event_id", "type": "string" },
    { "name": "order_id", "type": "string" },
    { "name": "total", "type": "double" },
    {
      "name": "currency",
      "type": {
        "type": "enum",
        "name": "Currency",
        "symbols": ["USD", "CAD", "EUR"]
      },
      "default": "USD"
    }
  ]
}

Create samples/order-created-avro.json:

{
  "event_id": "evt_1",
  "order_id": "ord_1",
  "total": 20.0,
  "currency": "USD"
}

Use the fixture:

def test_order_created_avro(kafka_contract):
    result = kafka_contract.validate_avro_record(
        record={
            "event_id": "evt_1",
            "order_id": "ord_1",
            "total": 20.0,
            "currency": "USD",
        },
        schema_path="schemas/order-created.avsc",
    )

    assert result.passed, result.issues

CLI:

pytest-kafka-contract avro-validate-file schemas/order-created.avsc samples/order-created-avro.json

Schema Registry Check

Schema Registry already manages schemas. This package checks it from pytest or the CLI, which is useful for CI and QA automation.

pytest-kafka-contract registry-check \
  --registry-url http://localhost:8081 \
  --subject orders.created-value \
  --schema schemas/order-created.avsc \
  --compatibility BACKWARD
def test_registry_subject(kafka_contract):
    result = kafka_contract.validate_schema_registry_subject(
        registry_url="http://localhost:8081",
        subject="orders.created-value",
        schema_path="schemas/order-created.avsc",
        compatibility="BACKWARD",
    )
    assert result.passed, result.issues

Real Kafka JSON Validation

Requires Kafka running and install with [kafka] or [all].

pytest-kafka-contract kafka-validate-json \
  --bootstrap-servers localhost:19092 \
  --topic orders.created \
  --contract contracts/order-created.yaml \
  --timeout-ms 10000 \
  --auto-offset-reset earliest
def test_latest_json_message(kafka_contract):
    result = kafka_contract.validate_latest_json(
        topic="orders.created",
        contract_path="contracts/order-created.yaml",
        bootstrap_servers="localhost:19092",
        timeout_ms=10000,
        auto_offset_reset="earliest",
    )
    assert result.passed, result.issues

Real Kafka Avro Validation

Requires Kafka, Schema Registry, and install with [all] or [kafka], [avro], and [registry].

pytest-kafka-contract kafka-validate-avro \
  --bootstrap-servers localhost:19092 \
  --registry-url http://localhost:8081 \
  --topic orders.created \
  --subject orders.created-value \
  --timeout-ms 10000 \
  --auto-offset-reset earliest
def test_latest_avro_message(kafka_contract):
    result = kafka_contract.validate_latest_avro(
        topic="orders.created",
        registry_url="http://localhost:8081",
        subject="orders.created-value",
        bootstrap_servers="localhost:19092",
        timeout_ms=10000,
        auto_offset_reset="earliest",
    )
    assert result.passed, result.issues

CLI Reference

pytest-kafka-contract --help
pytest-kafka-contract init
pytest-kafka-contract validate-file contracts/order-created.yaml samples/order-created.json
pytest-kafka-contract avro-validate-file schemas/order-created.avsc samples/order-created-avro.json
pytest-kafka-contract registry-check --registry-url http://localhost:8081 --subject orders.created-value --schema schemas/order-created.avsc --compatibility BACKWARD
pytest-kafka-contract kafka-validate-json --bootstrap-servers localhost:19092 --topic orders.created --contract contracts/order-created.yaml --timeout-ms 10000 --auto-offset-reset earliest
pytest-kafka-contract kafka-validate-avro --bootstrap-servers localhost:19092 --registry-url http://localhost:8081 --topic orders.created --subject orders.created-value --timeout-ms 10000 --auto-offset-reset earliest

kafka-contract is also available as a compatibility alias.

Pytest Fixture API

kafka_contract.load_contract("contracts/order-created.yaml")
kafka_contract.validate_payload(payload, contract_path="contracts/order-created.yaml")
kafka_contract.validate_avro_record(record, schema_path="schemas/order-created.avsc")
kafka_contract.validate_schema_registry_subject(registry_url, subject, schema_path, compatibility="BACKWARD")
kafka_contract.validate_latest_json(topic, contract_path, bootstrap_servers="localhost:19092", auto_offset_reset="earliest")
kafka_contract.validate_latest_avro(topic, registry_url, subject, bootstrap_servers="localhost:19092", auto_offset_reset="earliest")

Pytest Options

pytest --kafka-contract contracts/order-created.yaml
pytest --kafka-bootstrap-servers localhost:9092
pytest --kafka-timeout-ms 10000
pytest --kafka-strict
pytest --kafka-contract-report .reports/kafka-contract.md
pytest --kafka-contract-json-report .reports/kafka-contract.json
pytest --schema-registry-url http://localhost:8081
pytest --kafka-avro-subject orders.created-value
pytest --kafka-format json

Reports

Markdown and JSON reports include total checks, passed checks, failed checks, format, topic, subject, schema id, issues, and metadata.

pytest --kafka-contract-report .reports/kafka-contract.md \
  --kafka-contract-json-report .reports/kafka-contract.json

Example Markdown:

# Kafka Contract Report

| Total | Passed | Failed |
|---:|---:|---:|
| 3 | 2 | 1 |

## Failed Checks

### orders.created

- Format: avro
- Subject: orders.created-value
- Schema ID: 12
- Issue: AVRO_RECORD_INVALID at $.total

Local Integration Testing

Docker integration tests are optional and are not run by default. The compose stack exposes Kafka on localhost:19092, Schema Registry on localhost:8081, and Redpanda admin on ${PKCT_REDPANDA_ADMIN_PORT:-19644} to avoid common local 9644 conflicts.

./scripts/run-integration.sh

Equivalent manual commands:

docker compose -f docker-compose.integration.yml up -d
PKCT_RUN_INTEGRATION=1 \
PKCT_BOOTSTRAP_SERVERS=localhost:19092 \
PKCT_SCHEMA_REGISTRY_URL=http://localhost:8081 \
pytest tests/integration -q -s
docker compose -f docker-compose.integration.yml down -v

CI Example

- run: python -m pip install -e ".[dev]"
- run: ruff check .
- run: mypy src
- run: pytest --cov=pytest_kafka_contract --cov-report=term-missing
- run: python -m build
- run: python -m twine check dist/*

Limitations

  • Alpha package.
  • JSON contract syntax is intentionally small.
  • Avro support expects standard .avsc schemas.
  • Kafka Avro validation expects Confluent wire format.
  • Schema Registry auth is not included in the MVP.
  • This does not replace Pact or Confluent governance tools.

Roadmap

  • Protobuf support
  • Schema Registry auth
  • Batch topic validation
  • Compatibility diff output
  • Kafka key and header validation

License

MIT.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pytest_kafka_contract-0.1.0.tar.gz (29.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pytest_kafka_contract-0.1.0-py3-none-any.whl (24.0 kB view details)

Uploaded Python 3

File details

Details for the file pytest_kafka_contract-0.1.0.tar.gz.

File metadata

  • Download URL: pytest_kafka_contract-0.1.0.tar.gz
  • Upload date:
  • Size: 29.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.6

File hashes

Hashes for pytest_kafka_contract-0.1.0.tar.gz
Algorithm Hash digest
SHA256 2ada980ff10a5f89b68bf1e3b21a38b30b7a2fe49835f1738f5b2c01b47ccef4
MD5 d63cd2c4cd1d5e533fb58ad5bc8ab2d6
BLAKE2b-256 748de3b51bf6dedfe108336d2d821abb6447102be47ea07fe46f51454152bf67

See more details on using hashes here.

File details

Details for the file pytest_kafka_contract-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for pytest_kafka_contract-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 d12290a525a7b65b3f2fc75f2ba28cff57a3359da779757f941b74617fa12d95
MD5 da7eb20ceaf50f43d23a7090612c2461
BLAKE2b-256 82f1caa9188541803ff13b6b09371db32576fc0eb00dc0905f4f9976aba8e858

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page