Skip to main content

A pytest plugin and CLI for validating Kafka JSON and Avro messages against contracts.

Project description

pytest-kafka-contract

Pytest plugin and CLI for contract testing and flow testing Kafka event payloads.

pytest-kafka-contract removes Kafka test boilerplate and lets teams validate real JSON/Avro Kafka messages, Schema Registry subjects, and black-box event flows directly in pytest.

pytest-kafka-contract helps QA and backend teams validate Kafka messages against explicit contracts and verify that events flow correctly through services — all from normal pytest tests.

It supports:

  • JSON payload validation against YAML contracts
  • Avro record validation against .avsc schemas
  • Confluent Schema Registry checks
  • Real Kafka message validation
  • Real Kafka Avro message decoding
  • Kafka black-box flow testing — produce → consume → assert
  • Negative flow testing — assert that an event is NOT forwarded
  • YAML-driven flow specs via the flow-run CLI command
  • Pytest fixture API
  • CLI validation commands
  • Markdown and JSON reports

Why

Kafka messages can break silently.

A producer can rename a field, remove a required value, change a number into a string, or publish a payload that no longer matches what downstream consumers expect. A routing service can silently drop events it should forward, or forward events it should filter.

This package catches issues like:

  • Missing required fields
  • Wrong field types
  • Invalid constants
  • Invalid enum values
  • Unexpected extra fields
  • Null values where null is not allowed
  • Invalid Avro records
  • Schema Registry subject issues
  • Kafka messages that do not match their expected contract
  • Events that fail to reach a destination topic
  • Events that reach a destination topic when they should not

The goal is simple:

Make Kafka event contracts and flows testable in normal pytest workflows.


Why Not Just Normal Assertions?

Normal assertions are still useful.

This package does not replace pytest assertions. It removes the Kafka boilerplate around them.

Manual Kafka tests usually repeat the same setup:

  • create producer
  • create consumer
  • create unique consumer group
  • subscribe to topic
  • produce test message
  • poll with timeout
  • decode JSON or Avro
  • match correlation ID
  • validate schema or contract
  • format readable failures

pytest-kafka-contract standardizes that part.

Then you can still write normal pytest assertions for business-specific logic.

Use the package for:

  • Kafka producing
  • Kafka consuming
  • polling
  • timeout handling
  • correlation matching
  • JSON contract validation
  • Avro validation
  • Schema Registry checks
  • readable result objects

Use normal pytest assertions for:

  • business math
  • field relationships
  • custom transformation rules
  • enrichment logic
  • domain-specific expectations

Package Levels

You can use this package at several levels:

Level 1: validate local JSON file
Level 2: validate local Avro record
Level 3: check Schema Registry subject
Level 4: consume latest Kafka message and validate
Level 5: produce source message, consume destination message, validate full flow
Level 6: run custom pytest assertions on consumed event

Contract Testing vs Flow Testing

Contract test Flow test
Question Does this event look right? Does this app move/transform/filter events correctly?
How Validate payload shape/values Produce to source topic, assert what arrives on destination topic
When On any payload, offline Requires running Kafka and a live app
Fixture method validate_payload, validate_avro_record run_json_flow, expect_no_json_flow, run_avro_flow

Install

Basic JSON contract validation (no Kafka required):

pip install pytest-kafka-contract

Install everything, including Kafka, Avro, Schema Registry, and flow testing support:

pip install "pytest-kafka-contract[all]"

Optional extras:

pip install "pytest-kafka-contract[kafka]"     # real Kafka validation + flow testing
pip install "pytest-kafka-contract[avro]"      # Avro schema validation
pip install "pytest-kafka-contract[registry]"  # Schema Registry checks

Quick Start: JSON Contract Validation

Create a contract file:

mkdir -p contracts

Create contracts/order-created.yaml:

version: 1
name: order-created-v1
topic: orders.created

message:
  type: object
  required:
    - event_id
    - event_type
    - order
  properties:
    event_id:
      type: string
      nullable: false

    event_type:
      type: string
      const: OrderCreated
      nullable: false

    order:
      type: object
      required:
        - order_id
        - total
      properties:
        order_id:
          type: string

        total:
          type: number

rules:
  allow_extra_fields: false

Write a pytest test:

def test_order_created_contract(kafka_contract):
    result = kafka_contract.validate_payload(
        payload={
            "event_id": "evt_1",
            "event_type": "OrderCreated",
            "order": {
                "order_id": "ord_1",
                "total": 20.0,
            },
        },
        contract_path="contracts/order-created.yaml",
    )

    assert result.passed, result.issues

Run:

pytest

JSON Contract Failure Example

If the payload is wrong:

def test_order_created_contract_fails(kafka_contract):
    result = kafka_contract.validate_payload(
        payload={
            "event_id": "evt_1",
            "event_type": "WrongEvent",
            "order": {
                "order_id": "ord_1",
                "total": "20.0",
            },
        },
        contract_path="contracts/order-created.yaml",
    )

    assert result.passed, result.issues

The result contains readable issues such as:

CONST_MISMATCH
TYPE_MISMATCH

Avro Validation

Install Avro support:

pip install "pytest-kafka-contract[avro]"

Create schemas/order-created.avsc:

{
  "type": "record",
  "name": "OrderCreated",
  "namespace": "com.example.orders",
  "fields": [
    {
      "name": "event_id",
      "type": "string"
    },
    {
      "name": "order_id",
      "type": "string"
    },
    {
      "name": "total",
      "type": "double"
    },
    {
      "name": "currency",
      "type": {
        "type": "enum",
        "name": "Currency",
        "symbols": ["USD", "CAD", "EUR"]
      },
      "default": "USD"
    }
  ]
}

Validate a Python dictionary against the Avro schema:

def test_order_created_avro(kafka_contract):
    result = kafka_contract.validate_avro_record(
        record={
            "event_id": "evt_1",
            "order_id": "ord_1",
            "total": 20.0,
            "currency": "USD",
        },
        schema_path="schemas/order-created.avsc",
    )

    assert result.passed, result.issues

Schema Registry Checks

Install Schema Registry support:

pip install "pytest-kafka-contract[registry]"

Validate that a subject exists and is compatible with a local schema:

def test_schema_registry_subject(kafka_contract):
    result = kafka_contract.validate_schema_registry_subject(
        registry_url="http://localhost:8081",
        subject="orders.created-value",
        schema_path="schemas/order-created.avsc",
        compatibility="BACKWARD",
    )

    assert result.passed, result.issues

This can check:

  • Registry reachability
  • Subject existence
  • Latest schema lookup
  • Local schema comparison
  • Compatibility result

Schema Registry vs this package

Schema Registry governs schemas. pytest-kafka-contract validates real app behavior in tests.

For example, Schema Registry can say OrderCreated is compatible. This package can prove checkout actually emitted OrderCreated with the expected order_id, total, Kafka topic, wire format, and downstream flow behavior.


Real Kafka JSON Message Validation

Install Kafka support:

pip install "pytest-kafka-contract[kafka]"

Validate the latest JSON message from a Kafka topic:

def test_latest_json_message(kafka_contract):
    result = kafka_contract.validate_latest_json(
        topic="orders.created",
        contract_path="contracts/order-created.yaml",
        bootstrap_servers="localhost:9092",
        timeout_ms=10000,
        auto_offset_reset="latest",  # "latest" or "earliest"
    )

    assert result.passed, result.issues

This flow:

consume Kafka message
decode JSON
validate against YAML contract
return detailed result

Real Kafka Avro Message Validation

Install all optional dependencies:

pip install "pytest-kafka-contract[all]"

Validate the latest Avro message from Kafka using Schema Registry:

def test_latest_avro_message(kafka_contract):
    result = kafka_contract.validate_latest_avro(
        topic="orders.created",
        registry_url="http://localhost:8081",
        subject="orders.created-value",
        bootstrap_servers="localhost:9092",
        timeout_ms=10000,
        auto_offset_reset="latest",  # "latest" or "earliest"
    )

    assert result.passed, result.issues

This flow:

consume Kafka message
extract Confluent schema ID
fetch schema from Schema Registry
decode Avro payload
validate decoded record
return detailed result

Kafka Flow Testing

Flow testing lets you verify that your service correctly moves, transforms, or filters Kafka events — without touching its internals.

Install Kafka support:

pip install "pytest-kafka-contract[kafka]"

flow-run runs a full Kafka black-box flow from YAML:

kafka-contract flow-run flows/paid-order-flow.yaml

It does this in order:

1. read flow YAML
2. subscribe to destination topic first
3. produce payload to source topic
4. wait for the real app/service to process the event
5. consume destination topic
6. match the correct message by correlation_id
7. validate expected fields
8. optionally validate a JSON contract or Avro schema
9. print pass/fail
10. exit 0 on pass, exit 1 on fail

flow-run does not start your app. Your service must already be running and connected to Kafka.

Positive flow: assert an event reaches the destination

def test_paid_order_reaches_destination(kafka_contract):
    cid = kafka_contract.new_correlation_id()

    result = kafka_contract.run_json_flow(
        source_topic="src.rds.orders",
        destination_topic="dest.filtered.orders",
        payload={
            "correlation_id": cid,
            "order_id": "ord_123",
            "status": "PAID",
            "amount": 49.99,
        },
        expect={
            "correlation_id": cid,
            "status": "PAID",
        },
        contract_path="contracts/paid-order.yaml",
        bootstrap_servers="localhost:9092",
        timeout_ms=10000,
    )

    assert result.passed, result.issues

run_json_flow does the following in order:

1. subscribe to destination_topic
2. produce payload to source_topic (with injected correlation_id)
3. poll destination_topic until the correlated message arrives
4. assert expected subset fields match (if expect= given)
5. validate against contract (if contract_path= given)
6. return FlowResult

Subscribing before producing prevents missing fast messages.

Negative flow: assert an event does NOT reach the destination

def test_cancelled_order_is_filtered_out(kafka_contract):
    cid = kafka_contract.new_correlation_id()

    result = kafka_contract.expect_no_json_flow(
        source_topic="src.rds.orders",
        destination_topic="dest.filtered.orders",
        payload={
            "correlation_id": cid,
            "order_id": "ord_123",
            "status": "CANCELLED",
        },
        bootstrap_servers="localhost:9092",
        timeout_ms=5000,
    )

    assert result.passed, result.issues

expect_no_json_flow passes when no correlated message arrives on the destination topic within timeout_ms.

CLI Flow Assertions

The CLI supports simple expected-field assertions through expect.

name: paid-order-reaches-destination
format: json

bootstrap_servers: localhost:9092
timeout_ms: 10000

source_topic: src.rds.orders
destination_topic: dest.filtered.orders
correlation_field: correlation_id

payload:
  order_id: ord_123
  status: PAID
  amount: 49.99

expect:
  order_id: ord_123
  status: PAID
  amount: 49.99

contract_path: contracts/paid-order.yaml

Run:

kafka-contract flow-run flows/paid-order.yaml

Use CLI flow specs for simple checks. Use pytest flow helpers when you need custom Python assertions.

Avro flow spec:

name: avro-paid-order-flow
format: avro
bootstrap_servers: localhost:9092
schema_registry_url: http://localhost:8081
timeout_ms: 10000

source_topic: src.rds.orders
destination_topic: dest.filtered.orders
source_subject: src.rds.orders-value
destination_subject: dest.filtered.orders-value
correlation_field: correlation_id

payload:
  correlation_id: test-123
  order_id: ord_123
  status: PAID

expect:
  correlation_id: test-123
  status: PAID

Run it the same way:

kafka-contract flow-run flows/avro-paid-order-flow.yaml

Negative flow spec using mode: expect_no_message:

# examples/flows/cancelled-order-filtered-out.yaml
name: cancelled-order-filtered-out
format: json
mode: expect_no_message
bootstrap_servers: localhost:9092
timeout_ms: 10000

source_topic: src.rds.orders
destination_topic: dest.filtered.orders
correlation_field: correlation_id

payload:
  order_id: ord_456
  status: CANCELLED
  amount: 19.99

Correlation ID

By default, a uuid4 is generated and injected into the payload under correlation_field (default: correlation_id). The consumer polls for the message whose value contains the matching correlation ID.

You can supply a fixed ID:

result = kafka_contract.run_json_flow(
    ...,
    correlation_id="my-fixed-id",
)

If the payload already contains correlation_field, it is not overwritten.


Custom Assertions After a Flow

The flow helpers return a result object.

You can first let the package validate the Kafka flow, then write your own pytest assertions against the consumed message.

def test_paid_order_flow_has_correct_business_logic(kafka_contract):
    result = kafka_contract.run_json_flow(
        source_topic="src.rds.orders",
        destination_topic="dest.filtered.orders",
        payload={
            "order_id": "ord_123",
            "status": "PAID",
            "subtotal": 100.00,
            "tax": 8.25,
        },
        expect={
            "order_id": "ord_123",
            "status": "PAID",
        },
        contract_path="contracts/filtered-order.yaml",
        bootstrap_servers="localhost:9092",
        timeout_ms=30000,
    )

    # Package-level checks
    assert result.passed, result.issues
    assert result.consumed_message is not None

    # User-level custom business checks
    event = result.consumed_message.value
    assert event is not None

    assert event["total"] == 108.25
    assert event["total"] == event["subtotal"] + event["tax"]
    assert event["status"] == "PAID"
    assert event["processed_by"] == "order-filter-service"

Recommended pattern:

Package proves Kafka flow worked.
Your pytest assertions prove business logic is correct.

What Custom Assertions Are Good For

Use custom assertions when contract shape is not enough.

# Field relationship
assert event["total"] == event["subtotal"] + event["tax"]

# Business rule
assert event["status"] in ["PAID", "SETTLED"]

# Enrichment check
assert event["processed_by"] == "order-filter-service"

# Value copied from input
assert event["order_id"] == input_order_id

# Timestamp generated
assert event["processed_at"] is not None

# Domain rule
assert event["discount"] <= event["subtotal"]

CLI Flow vs Pytest Flow

Need Use
Quick contract check from terminal CLI
CI smoke check CLI
YAML-defined flow test CLI
Simple expected fields CLI expect
Business math assertions Pytest
Field relationship checks Pytest
Reusing test fixtures/data setup Pytest
Complex app-specific assertions Pytest

Rule of thumb:

CLI = simple portable flow check
pytest = full test logic with custom assertions

Flow Result Object

Flow helpers return a result object.

Useful fields:

result.passed
result.issues
result.correlation_id
result.produced_message
result.consumed_message
result.metadata

Example:

result = kafka_contract.run_json_flow(...)

assert result.passed, result.issues
assert result.consumed_message is not None

event = result.consumed_message.value
assert event is not None
assert event["status"] == "PAID"

Example: Test a Kafka Filter Service

Suppose your service does this:

Consumes: src.rds.orders
Publishes: dest.filtered.orders
Rule: only PAID orders should be forwarded
Rule: CANCELLED orders should be dropped

Positive flow:

def test_paid_order_reaches_destination(kafka_contract):
    result = kafka_contract.run_json_flow(
        source_topic="src.rds.orders",
        destination_topic="dest.filtered.orders",
        payload={
            "order_id": "ord_paid_1",
            "status": "PAID",
            "amount": 49.99,
        },
        expect={
            "order_id": "ord_paid_1",
            "status": "PAID",
        },
        contract_path="contracts/paid-order.yaml",
        bootstrap_servers="localhost:9092",
    )

    assert result.passed, result.issues

Negative flow:

def test_cancelled_order_is_not_forwarded(kafka_contract):
    cid = kafka_contract.new_correlation_id()

    result = kafka_contract.expect_no_json_flow(
        source_topic="src.rds.orders",
        destination_topic="dest.filtered.orders",
        payload={
            "correlation_id": cid,
            "order_id": "ord_cancelled_1",
            "status": "CANCELLED",
            "amount": 49.99,
        },
        correlation_id=cid,
        bootstrap_servers="localhost:9092",
        timeout_ms=5000,
    )

    assert result.passed, result.issues

Example: Avro Flow With Schema Registry

Use this when your Kafka messages use Confluent Avro wire format.

def test_paid_order_avro_flow(kafka_contract):
    result = kafka_contract.run_avro_flow(
        source_topic="src.rds.orders",
        destination_topic="dest.filtered.orders",
        payload={
            "order_id": "ord_123",
            "status": "PAID",
            "amount": 49.99,
        },
        expect={
            "order_id": "ord_123",
            "status": "PAID",
        },
        registry_url="http://localhost:8081",
        source_subject="src.rds.orders-value",
        destination_subject="dest.filtered.orders-value",
        bootstrap_servers="localhost:9092",
        timeout_ms=30000,
    )

    assert result.passed, result.issues
    assert result.consumed_message is not None

    event = result.consumed_message.value
    assert event is not None
    assert event["amount"] > 0

Pytest Fixture API

The plugin provides a kafka_contract fixture.

def test_with_fixture(kafka_contract):
    result = kafka_contract.validate_payload(
        payload={"event_type": "OrderCreated"},
        contract_path="contracts/order-created.yaml",
    )

    assert result.passed, result.issues

Supported fixture methods:

# Contract validation (no Kafka required)
kafka_contract.new_correlation_id()

kafka_contract.load_contract(contract_path)

kafka_contract.validate_payload(
    payload,
    contract_path,
)

kafka_contract.validate_avro_record(
    record,
    schema_path,
)

kafka_contract.validate_schema_registry_subject(
    registry_url,
    subject,
    schema_path,
    compatibility=None,
)

# Real Kafka – consume latest message and validate
kafka_contract.validate_latest_json(
    topic,
    contract_path,
    bootstrap_servers="localhost:9092",
    timeout_ms=10000,
    auto_offset_reset="latest",
)

kafka_contract.validate_latest_avro(
    topic,
    registry_url,
    subject,
    bootstrap_servers="localhost:9092",
    timeout_ms=10000,
    auto_offset_reset="latest",
)

# Flow testing – produce to source, assert on destination
kafka_contract.run_json_flow(
    source_topic,
    destination_topic,
    payload,
    expect=None,           # optional: subset of destination message to assert
    contract_path=None,    # optional: YAML contract to validate destination message
    correlation_field="correlation_id",
    correlation_id=None,   # auto-generated as uuid4 if not given
    bootstrap_servers=None,
    timeout_ms=None,
)

kafka_contract.expect_no_json_flow(
    source_topic,
    destination_topic,
    payload,
    correlation_field="correlation_id",
    correlation_id=None,
    bootstrap_servers=None,
    timeout_ms=None,
)

kafka_contract.run_avro_flow(
    source_topic,
    destination_topic,
    payload,
    registry_url,
    source_subject,
    destination_subject,
    expect=None,
    expected_destination_schema_id=None,
    bootstrap_servers=None,
    timeout_ms=None,
)

CLI Usage

Initialize example contracts and samples:

kafka-contract init

Validate a JSON sample file against a YAML contract:

kafka-contract validate-file \
  contracts/order-created.yaml \
  samples/order-created.json

Validate a JSON sample file against an Avro schema:

kafka-contract avro-validate-file \
  schemas/order-created.avsc \
  samples/order-created-avro.json

Check Schema Registry:

kafka-contract registry-check \
  --registry-url http://localhost:8081 \
  --subject orders.created-value \
  --schema schemas/order-created.avsc \
  --compatibility BACKWARD

Validate the latest JSON message from Kafka:

kafka-contract kafka-validate-json \
  --bootstrap-servers localhost:9092 \
  --topic orders.created \
  --contract contracts/order-created.yaml \
  --timeout-ms 10000

Validate the latest Avro message from Kafka:

kafka-contract kafka-validate-avro \
  --bootstrap-servers localhost:9092 \
  --registry-url http://localhost:8081 \
  --topic orders.created \
  --subject orders.created-value \
  --timeout-ms 10000

Run a YAML-defined flow test:

kafka-contract flow-run examples/flows/paid-order-flow.yaml

This produces output like:

PASS  paid-order-reaches-destination
      source:      src.rds.orders
      destination: dest.filtered.orders
      correlation: a1b2c3d4-...

Or on failure:

FAIL  cancelled-order-filtered-out
      FLOW_UNEXPECTED_MESSAGE at $: Expected no destination message but found one...

Show help:

kafka-contract --help

Pytest CLI Options

pytest \
  --kafka-contract contracts/order-created.yaml \
  --kafka-bootstrap-servers localhost:9092 \
  --kafka-timeout-ms 10000

Report options:

pytest \
  --kafka-contract-report .reports/kafka-contract.md \
  --kafka-contract-json-report .reports/kafka-contract.json

Available options:

--kafka-contract
--kafka-bootstrap-servers
--kafka-timeout-ms
--kafka-strict
--kafka-contract-report
--kafka-contract-json-report
--schema-registry-url
--kafka-avro-subject
--kafka-format

Contract Rules

Supported JSON contract types:

string
number
integer
boolean
object
array
null

Supported validation rules:

required
properties
nullable
const
enum
format: datetime
items
allow_extra_fields

Example:

message:
  type: object
  required:
    - event_id
    - status
  properties:
    event_id:
      type: string
      nullable: false

    status:
      type: string
      enum:
        - CREATED
        - UPDATED
        - CANCELLED

rules:
  allow_extra_fields: false

Result Model

Contract validators return a ContractResult:

result.passed      # bool
result.issues      # list of ContractIssue
result.metadata    # dict – topic, offset, schema_id, etc.

Flow methods return a FlowResult:

result.passed              # bool
result.source_topic        # str
result.destination_topic   # str
result.correlation_id      # str
result.produced_message    # FlowMessage | None
result.consumed_message    # FlowMessage | None  (None on FLOW_TIMEOUT)
result.issues              # list of ContractIssue
result.metadata            # dict – partition, offset

Inspecting issues:

for issue in result.issues:
    print(issue.code, issue.path, issue.message)

Contract issue codes:

MISSING_REQUIRED_FIELD
TYPE_MISMATCH
CONST_MISMATCH
ENUM_MISMATCH
EXTRA_FIELD
NULL_NOT_ALLOWED
AVRO_RECORD_INVALID
REGISTRY_SUBJECT_NOT_FOUND
REGISTRY_COMPATIBILITY_FAILED
KAFKA_NO_MESSAGE
KAFKA_DESERIALIZATION_FAILED

Flow issue codes:

FLOW_TIMEOUT               – no matching message arrived within timeout_ms
FLOW_EXPECTATION_FAILED    – destination message did not match expected subset
FLOW_CONTRACT_FAILED       – destination message failed contract validation
FLOW_UNEXPECTED_MESSAGE    – message arrived when expect_no_json_flow expected silence
FLOW_DESERIALIZATION_FAILED – destination message could not be decoded
FLOW_SCHEMA_ID_MISMATCH    – Avro destination schema_id did not match expected

Reports

Markdown and JSON reports can be created from pytest runs:

pytest \
  --kafka-contract-report .reports/kafka-contract.md \
  --kafka-contract-json-report .reports/kafka-contract.json

Markdown report includes:

summary
passed checks
failed checks
issue codes
topic metadata
schema metadata

JSON report includes structured output for CI pipelines, artifacts, and Slack summaries.


Local Kafka Example

Start local Kafka-compatible infrastructure:

docker compose -f docker-compose.integration.yml up -d

Run integration tests:

PKCT_RUN_INTEGRATION=1 pytest tests/integration -q

Stop services:

docker compose -f docker-compose.integration.yml down -v

What Can This Test?

This package is generalized for Kafka event testing.

Use it for:

  • producer event contract tests
  • consumer output tests
  • Kafka filter services
  • CDC pipeline validation
  • Debezium output checks
  • enrichment pipeline tests
  • event transformation tests
  • Avro message validation
  • Schema Registry subject checks
  • CI smoke tests against Kafka topics
  • black-box integration tests

It does not care what app framework you use.

Your app can be:

  • Java Spring Boot
  • Node.js
  • Python
  • Go
  • .NET
  • anything that reads/writes Kafka

The package only cares about Kafka inputs, Kafka outputs, contracts, and test results.

It is especially useful when your team already uses pytest and wants lightweight Kafka contract checks without adopting a larger contract-testing platform immediately.


Limitations

This package is focused on pytest-native Kafka contract testing.

It is not:

  • A full replacement for Confluent Schema Registry
  • A full Pact Broker replacement
  • A Kafka monitoring platform
  • A production schema governance system

Recommended use:

use Schema Registry for schema governance
use pytest-kafka-contract for automated test validation

License

MIT License

Copyright (c) 2026 Dharsan Guruparan

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pytest_kafka_contract-0.2.4.tar.gz (65.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pytest_kafka_contract-0.2.4-py3-none-any.whl (37.0 kB view details)

Uploaded Python 3

File details

Details for the file pytest_kafka_contract-0.2.4.tar.gz.

File metadata

  • Download URL: pytest_kafka_contract-0.2.4.tar.gz
  • Upload date:
  • Size: 65.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.6

File hashes

Hashes for pytest_kafka_contract-0.2.4.tar.gz
Algorithm Hash digest
SHA256 d9ec29d00eab1e752a9fcfd419d0a3b1327801952e2922116bf9a7bfb4c9a0e3
MD5 106804bfadba69b1e218ead65d01c3af
BLAKE2b-256 f98ef8d965ae27bf95338b473b12b9a03f0646eef6ea1682f73ca753d8ac6e2d

See more details on using hashes here.

File details

Details for the file pytest_kafka_contract-0.2.4-py3-none-any.whl.

File metadata

File hashes

Hashes for pytest_kafka_contract-0.2.4-py3-none-any.whl
Algorithm Hash digest
SHA256 17ba98bca8b9a7c4946a54c3e3ced88f6ed45866d0fab776ed7bf3cee5ff79fe
MD5 61accaefc7af38677146677c1945ca40
BLAKE2b-256 16c26e1ca88bf7b65f6d73ce4e786743dd647d76cb35f79934ab938786ebd309

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page