A pytest plugin and CLI for validating Kafka JSON and Avro messages against contracts.
Project description
pytest-kafka-contract
Pytest plugin and CLI for contract testing and flow testing Kafka event payloads.
pytest-kafka-contract removes Kafka test boilerplate and lets teams validate real JSON/Avro Kafka messages, Schema Registry subjects, and black-box event flows directly in pytest.
pytest-kafka-contract helps QA and backend teams validate Kafka messages against explicit contracts and verify that events flow correctly through services — all from normal pytest tests.
It supports:
- JSON payload validation against YAML contracts
- Avro record validation against
.avscschemas - Confluent Schema Registry checks
- Real Kafka message validation
- Real Kafka Avro message decoding
- Kafka black-box flow testing — produce → consume → assert
- Negative flow testing — assert that an event is NOT forwarded
- YAML-driven flow specs via the
flow-runCLI command - Pytest fixture API
- CLI validation commands
- Markdown and JSON reports
Why
Kafka messages can break silently.
A producer can rename a field, remove a required value, change a number into a string, or publish a payload that no longer matches what downstream consumers expect. A routing service can silently drop events it should forward, or forward events it should filter.
This package catches issues like:
- Missing required fields
- Wrong field types
- Invalid constants
- Invalid enum values
- Unexpected extra fields
- Null values where null is not allowed
- Invalid Avro records
- Schema Registry subject issues
- Kafka messages that do not match their expected contract
- Events that fail to reach a destination topic
- Events that reach a destination topic when they should not
The goal is simple:
Make Kafka event contracts and flows testable in normal pytest workflows.
Why Not Just Normal Assertions?
Normal assertions are still useful.
This package does not replace pytest assertions. It removes the Kafka boilerplate around them.
Manual Kafka tests usually repeat the same setup:
- create producer
- create consumer
- create unique consumer group
- subscribe to topic
- produce test message
- poll with timeout
- decode JSON or Avro
- match correlation ID
- validate schema or contract
- format readable failures
pytest-kafka-contract standardizes that part.
Then you can still write normal pytest assertions for business-specific logic.
Use the package for:
- Kafka producing
- Kafka consuming
- polling
- timeout handling
- correlation matching
- JSON contract validation
- Avro validation
- Schema Registry checks
- readable result objects
Use normal pytest assertions for:
- business math
- field relationships
- custom transformation rules
- enrichment logic
- domain-specific expectations
Package Levels
You can use this package at several levels:
Level 1: validate local JSON file
Level 2: validate local Avro record
Level 3: check Schema Registry subject
Level 4: consume latest Kafka message and validate
Level 5: produce source message, consume destination message, validate full flow
Level 6: run custom pytest assertions on consumed event
Contract Testing vs Flow Testing
| Contract test | Flow test | |
|---|---|---|
| Question | Does this event look right? | Does this app move/transform/filter events correctly? |
| How | Validate payload shape/values | Produce to source topic, assert what arrives on destination topic |
| When | On any payload, offline | Requires running Kafka and a live app |
| Fixture method | validate_payload, validate_avro_record |
run_json_flow, expect_no_json_flow, run_avro_flow |
Install
Basic JSON contract validation (no Kafka required):
pip install pytest-kafka-contract
Install everything, including Kafka, Avro, Schema Registry, and flow testing support:
pip install "pytest-kafka-contract[all]"
Optional extras:
pip install "pytest-kafka-contract[kafka]" # real Kafka validation + flow testing
pip install "pytest-kafka-contract[avro]" # Avro schema validation
pip install "pytest-kafka-contract[registry]" # Schema Registry checks
Quick Start: JSON Contract Validation
Create a contract file:
mkdir -p contracts
Create contracts/order-created.yaml:
version: 1
name: order-created-v1
topic: orders.created
message:
type: object
required:
- event_id
- event_type
- order
properties:
event_id:
type: string
nullable: false
event_type:
type: string
const: OrderCreated
nullable: false
order:
type: object
required:
- order_id
- total
properties:
order_id:
type: string
total:
type: number
rules:
allow_extra_fields: false
Write a pytest test:
def test_order_created_contract(kafka_contract):
result = kafka_contract.validate_payload(
payload={
"event_id": "evt_1",
"event_type": "OrderCreated",
"order": {
"order_id": "ord_1",
"total": 20.0,
},
},
contract_path="contracts/order-created.yaml",
)
assert result.passed, result.issues
Run:
pytest
JSON Contract Failure Example
If the payload is wrong:
def test_order_created_contract_fails(kafka_contract):
result = kafka_contract.validate_payload(
payload={
"event_id": "evt_1",
"event_type": "WrongEvent",
"order": {
"order_id": "ord_1",
"total": "20.0",
},
},
contract_path="contracts/order-created.yaml",
)
assert result.passed, result.issues
The result contains readable issues such as:
CONST_MISMATCH
TYPE_MISMATCH
Avro Validation
Install Avro support:
pip install "pytest-kafka-contract[avro]"
Create schemas/order-created.avsc:
{
"type": "record",
"name": "OrderCreated",
"namespace": "com.example.orders",
"fields": [
{
"name": "event_id",
"type": "string"
},
{
"name": "order_id",
"type": "string"
},
{
"name": "total",
"type": "double"
},
{
"name": "currency",
"type": {
"type": "enum",
"name": "Currency",
"symbols": ["USD", "CAD", "EUR"]
},
"default": "USD"
}
]
}
Validate a Python dictionary against the Avro schema:
def test_order_created_avro(kafka_contract):
result = kafka_contract.validate_avro_record(
record={
"event_id": "evt_1",
"order_id": "ord_1",
"total": 20.0,
"currency": "USD",
},
schema_path="schemas/order-created.avsc",
)
assert result.passed, result.issues
Schema Registry Checks
Install Schema Registry support:
pip install "pytest-kafka-contract[registry]"
Validate that a subject exists and is compatible with a local schema:
def test_schema_registry_subject(kafka_contract):
result = kafka_contract.validate_schema_registry_subject(
registry_url="http://localhost:8081",
subject="orders.created-value",
schema_path="schemas/order-created.avsc",
compatibility="BACKWARD",
)
assert result.passed, result.issues
This can check:
- Registry reachability
- Subject existence
- Latest schema lookup
- Local schema comparison
- Compatibility result
Schema Registry vs this package
Schema Registry governs schemas. pytest-kafka-contract validates real app behavior in tests.
For example, Schema Registry can say OrderCreated is compatible. This package can prove checkout actually emitted OrderCreated with the expected order_id, total, Kafka topic, wire format, and downstream flow behavior.
Real Kafka JSON Message Validation
Install Kafka support:
pip install "pytest-kafka-contract[kafka]"
Validate the latest JSON message from a Kafka topic:
def test_latest_json_message(kafka_contract):
result = kafka_contract.validate_latest_json(
topic="orders.created",
contract_path="contracts/order-created.yaml",
bootstrap_servers="localhost:9092",
timeout_ms=10000,
auto_offset_reset="latest", # "latest" or "earliest"
)
assert result.passed, result.issues
This flow:
consume Kafka message
decode JSON
validate against YAML contract
return detailed result
Real Kafka Avro Message Validation
Install all optional dependencies:
pip install "pytest-kafka-contract[all]"
Validate the latest Avro message from Kafka using Schema Registry:
def test_latest_avro_message(kafka_contract):
result = kafka_contract.validate_latest_avro(
topic="orders.created",
registry_url="http://localhost:8081",
subject="orders.created-value",
bootstrap_servers="localhost:9092",
timeout_ms=10000,
auto_offset_reset="latest", # "latest" or "earliest"
)
assert result.passed, result.issues
This flow:
consume Kafka message
extract Confluent schema ID
fetch schema from Schema Registry
decode Avro payload
validate decoded record
return detailed result
Kafka Flow Testing
Flow testing lets you verify that your service correctly moves, transforms, or filters Kafka events — without touching its internals.
Install Kafka support:
pip install "pytest-kafka-contract[kafka]"
flow-run runs a full Kafka black-box flow from YAML:
kafka-contract flow-run flows/paid-order-flow.yaml
It does this in order:
1. read flow YAML
2. subscribe to destination topic first
3. produce payload to source topic
4. wait for the real app/service to process the event
5. consume destination topic
6. match the correct message by correlation_id
7. validate expected fields
8. optionally validate a JSON contract or Avro schema
9. print pass/fail
10. exit 0 on pass, exit 1 on fail
flow-run does not start your app. Your service must already be running and connected to Kafka.
Positive flow: assert an event reaches the destination
def test_paid_order_reaches_destination(kafka_contract):
cid = kafka_contract.new_correlation_id()
result = kafka_contract.run_json_flow(
source_topic="src.rds.orders",
destination_topic="dest.filtered.orders",
payload={
"correlation_id": cid,
"order_id": "ord_123",
"status": "PAID",
"amount": 49.99,
},
expect={
"correlation_id": cid,
"status": "PAID",
},
contract_path="contracts/paid-order.yaml",
bootstrap_servers="localhost:9092",
timeout_ms=10000,
)
assert result.passed, result.issues
run_json_flow does the following in order:
1. subscribe to destination_topic
2. produce payload to source_topic (with injected correlation_id)
3. poll destination_topic until the correlated message arrives
4. assert expected subset fields match (if expect= given)
5. validate against contract (if contract_path= given)
6. return FlowResult
Subscribing before producing prevents missing fast messages.
Negative flow: assert an event does NOT reach the destination
def test_cancelled_order_is_filtered_out(kafka_contract):
cid = kafka_contract.new_correlation_id()
result = kafka_contract.expect_no_json_flow(
source_topic="src.rds.orders",
destination_topic="dest.filtered.orders",
payload={
"correlation_id": cid,
"order_id": "ord_123",
"status": "CANCELLED",
},
bootstrap_servers="localhost:9092",
timeout_ms=5000,
)
assert result.passed, result.issues
expect_no_json_flow passes when no correlated message arrives on the destination topic within timeout_ms.
CLI Flow Assertions
The CLI supports simple expected-field assertions through expect.
name: paid-order-reaches-destination
format: json
bootstrap_servers: localhost:9092
timeout_ms: 10000
source_topic: src.rds.orders
destination_topic: dest.filtered.orders
correlation_field: correlation_id
payload:
order_id: ord_123
status: PAID
amount: 49.99
expect:
order_id: ord_123
status: PAID
amount: 49.99
contract_path: contracts/paid-order.yaml
Run:
kafka-contract flow-run flows/paid-order.yaml
Use CLI flow specs for simple checks. Use pytest flow helpers when you need custom Python assertions.
Avro flow spec:
name: avro-paid-order-flow
format: avro
bootstrap_servers: localhost:9092
schema_registry_url: http://localhost:8081
timeout_ms: 10000
source_topic: src.rds.orders
destination_topic: dest.filtered.orders
source_subject: src.rds.orders-value
destination_subject: dest.filtered.orders-value
correlation_field: correlation_id
payload:
correlation_id: test-123
order_id: ord_123
status: PAID
expect:
correlation_id: test-123
status: PAID
Run it the same way:
kafka-contract flow-run flows/avro-paid-order-flow.yaml
Negative flow spec using mode: expect_no_message:
# examples/flows/cancelled-order-filtered-out.yaml
name: cancelled-order-filtered-out
format: json
mode: expect_no_message
bootstrap_servers: localhost:9092
timeout_ms: 10000
source_topic: src.rds.orders
destination_topic: dest.filtered.orders
correlation_field: correlation_id
payload:
order_id: ord_456
status: CANCELLED
amount: 19.99
Correlation ID
By default, a uuid4 is generated and injected into the payload under correlation_field (default: correlation_id). The consumer polls for the message whose value contains the matching correlation ID.
You can supply a fixed ID:
result = kafka_contract.run_json_flow(
...,
correlation_id="my-fixed-id",
)
If the payload already contains correlation_field, it is not overwritten.
Custom Assertions After a Flow
The flow helpers return a result object.
You can first let the package validate the Kafka flow, then write your own pytest assertions against the consumed message.
def test_paid_order_flow_has_correct_business_logic(kafka_contract):
result = kafka_contract.run_json_flow(
source_topic="src.rds.orders",
destination_topic="dest.filtered.orders",
payload={
"order_id": "ord_123",
"status": "PAID",
"subtotal": 100.00,
"tax": 8.25,
},
expect={
"order_id": "ord_123",
"status": "PAID",
},
contract_path="contracts/filtered-order.yaml",
bootstrap_servers="localhost:9092",
timeout_ms=30000,
)
# Package-level checks
assert result.passed, result.issues
assert result.consumed_message is not None
# User-level custom business checks
event = result.consumed_message.value
assert event is not None
assert event["total"] == 108.25
assert event["total"] == event["subtotal"] + event["tax"]
assert event["status"] == "PAID"
assert event["processed_by"] == "order-filter-service"
Recommended pattern:
Package proves Kafka flow worked.
Your pytest assertions prove business logic is correct.
What Custom Assertions Are Good For
Use custom assertions when contract shape is not enough.
# Field relationship
assert event["total"] == event["subtotal"] + event["tax"]
# Business rule
assert event["status"] in ["PAID", "SETTLED"]
# Enrichment check
assert event["processed_by"] == "order-filter-service"
# Value copied from input
assert event["order_id"] == input_order_id
# Timestamp generated
assert event["processed_at"] is not None
# Domain rule
assert event["discount"] <= event["subtotal"]
CLI Flow vs Pytest Flow
| Need | Use |
|---|---|
| Quick contract check from terminal | CLI |
| CI smoke check | CLI |
| YAML-defined flow test | CLI |
| Simple expected fields | CLI expect |
| Business math assertions | Pytest |
| Field relationship checks | Pytest |
| Reusing test fixtures/data setup | Pytest |
| Complex app-specific assertions | Pytest |
Rule of thumb:
CLI = simple portable flow check
pytest = full test logic with custom assertions
Flow Result Object
Flow helpers return a result object.
Useful fields:
result.passed
result.issues
result.correlation_id
result.produced_message
result.consumed_message
result.metadata
Example:
result = kafka_contract.run_json_flow(...)
assert result.passed, result.issues
assert result.consumed_message is not None
event = result.consumed_message.value
assert event is not None
assert event["status"] == "PAID"
Example: Test a Kafka Filter Service
Suppose your service does this:
Consumes: src.rds.orders
Publishes: dest.filtered.orders
Rule: only PAID orders should be forwarded
Rule: CANCELLED orders should be dropped
Positive flow:
def test_paid_order_reaches_destination(kafka_contract):
result = kafka_contract.run_json_flow(
source_topic="src.rds.orders",
destination_topic="dest.filtered.orders",
payload={
"order_id": "ord_paid_1",
"status": "PAID",
"amount": 49.99,
},
expect={
"order_id": "ord_paid_1",
"status": "PAID",
},
contract_path="contracts/paid-order.yaml",
bootstrap_servers="localhost:9092",
)
assert result.passed, result.issues
Negative flow:
def test_cancelled_order_is_not_forwarded(kafka_contract):
cid = kafka_contract.new_correlation_id()
result = kafka_contract.expect_no_json_flow(
source_topic="src.rds.orders",
destination_topic="dest.filtered.orders",
payload={
"correlation_id": cid,
"order_id": "ord_cancelled_1",
"status": "CANCELLED",
"amount": 49.99,
},
correlation_id=cid,
bootstrap_servers="localhost:9092",
timeout_ms=5000,
)
assert result.passed, result.issues
Example: Avro Flow With Schema Registry
Use this when your Kafka messages use Confluent Avro wire format.
def test_paid_order_avro_flow(kafka_contract):
result = kafka_contract.run_avro_flow(
source_topic="src.rds.orders",
destination_topic="dest.filtered.orders",
payload={
"order_id": "ord_123",
"status": "PAID",
"amount": 49.99,
},
expect={
"order_id": "ord_123",
"status": "PAID",
},
registry_url="http://localhost:8081",
source_subject="src.rds.orders-value",
destination_subject="dest.filtered.orders-value",
bootstrap_servers="localhost:9092",
timeout_ms=30000,
)
assert result.passed, result.issues
assert result.consumed_message is not None
event = result.consumed_message.value
assert event is not None
assert event["amount"] > 0
Pytest Fixture API
The plugin provides a kafka_contract fixture.
def test_with_fixture(kafka_contract):
result = kafka_contract.validate_payload(
payload={"event_type": "OrderCreated"},
contract_path="contracts/order-created.yaml",
)
assert result.passed, result.issues
Supported fixture methods:
# Contract validation (no Kafka required)
kafka_contract.new_correlation_id()
kafka_contract.load_contract(contract_path)
kafka_contract.validate_payload(
payload,
contract_path,
)
kafka_contract.validate_avro_record(
record,
schema_path,
)
kafka_contract.validate_schema_registry_subject(
registry_url,
subject,
schema_path,
compatibility=None,
)
# Real Kafka – consume latest message and validate
kafka_contract.validate_latest_json(
topic,
contract_path,
bootstrap_servers="localhost:9092",
timeout_ms=10000,
auto_offset_reset="latest",
)
kafka_contract.validate_latest_avro(
topic,
registry_url,
subject,
bootstrap_servers="localhost:9092",
timeout_ms=10000,
auto_offset_reset="latest",
)
# Flow testing – produce to source, assert on destination
kafka_contract.run_json_flow(
source_topic,
destination_topic,
payload,
expect=None, # optional: subset of destination message to assert
contract_path=None, # optional: YAML contract to validate destination message
correlation_field="correlation_id",
correlation_id=None, # auto-generated as uuid4 if not given
bootstrap_servers=None,
timeout_ms=None,
)
kafka_contract.expect_no_json_flow(
source_topic,
destination_topic,
payload,
correlation_field="correlation_id",
correlation_id=None,
bootstrap_servers=None,
timeout_ms=None,
)
kafka_contract.run_avro_flow(
source_topic,
destination_topic,
payload,
registry_url,
source_subject,
destination_subject,
expect=None,
expected_destination_schema_id=None,
bootstrap_servers=None,
timeout_ms=None,
)
CLI Usage
Initialize example contracts and samples:
kafka-contract init
Validate a JSON sample file against a YAML contract:
kafka-contract validate-file \
contracts/order-created.yaml \
samples/order-created.json
Validate a JSON sample file against an Avro schema:
kafka-contract avro-validate-file \
schemas/order-created.avsc \
samples/order-created-avro.json
Check Schema Registry:
kafka-contract registry-check \
--registry-url http://localhost:8081 \
--subject orders.created-value \
--schema schemas/order-created.avsc \
--compatibility BACKWARD
Validate the latest JSON message from Kafka:
kafka-contract kafka-validate-json \
--bootstrap-servers localhost:9092 \
--topic orders.created \
--contract contracts/order-created.yaml \
--timeout-ms 10000
Validate the latest Avro message from Kafka:
kafka-contract kafka-validate-avro \
--bootstrap-servers localhost:9092 \
--registry-url http://localhost:8081 \
--topic orders.created \
--subject orders.created-value \
--timeout-ms 10000
Run a YAML-defined flow test:
kafka-contract flow-run examples/flows/paid-order-flow.yaml
This produces output like:
PASS paid-order-reaches-destination
source: src.rds.orders
destination: dest.filtered.orders
correlation: a1b2c3d4-...
Or on failure:
FAIL cancelled-order-filtered-out
FLOW_UNEXPECTED_MESSAGE at $: Expected no destination message but found one...
Show help:
kafka-contract --help
Pytest CLI Options
pytest \
--kafka-contract contracts/order-created.yaml \
--kafka-bootstrap-servers localhost:9092 \
--kafka-timeout-ms 10000
Report options:
pytest \
--kafka-contract-report .reports/kafka-contract.md \
--kafka-contract-json-report .reports/kafka-contract.json
Available options:
--kafka-contract
--kafka-bootstrap-servers
--kafka-timeout-ms
--kafka-strict
--kafka-contract-report
--kafka-contract-json-report
--schema-registry-url
--kafka-avro-subject
--kafka-format
Contract Rules
Supported JSON contract types:
string
number
integer
boolean
object
array
null
Supported validation rules:
required
properties
nullable
const
enum
format: datetime
items
allow_extra_fields
Example:
message:
type: object
required:
- event_id
- status
properties:
event_id:
type: string
nullable: false
status:
type: string
enum:
- CREATED
- UPDATED
- CANCELLED
rules:
allow_extra_fields: false
Result Model
Contract validators return a ContractResult:
result.passed # bool
result.issues # list of ContractIssue
result.metadata # dict – topic, offset, schema_id, etc.
Flow methods return a FlowResult:
result.passed # bool
result.source_topic # str
result.destination_topic # str
result.correlation_id # str
result.produced_message # FlowMessage | None
result.consumed_message # FlowMessage | None (None on FLOW_TIMEOUT)
result.issues # list of ContractIssue
result.metadata # dict – partition, offset
Inspecting issues:
for issue in result.issues:
print(issue.code, issue.path, issue.message)
Contract issue codes:
MISSING_REQUIRED_FIELD
TYPE_MISMATCH
CONST_MISMATCH
ENUM_MISMATCH
EXTRA_FIELD
NULL_NOT_ALLOWED
AVRO_RECORD_INVALID
REGISTRY_SUBJECT_NOT_FOUND
REGISTRY_COMPATIBILITY_FAILED
KAFKA_NO_MESSAGE
KAFKA_DESERIALIZATION_FAILED
Flow issue codes:
FLOW_TIMEOUT – no matching message arrived within timeout_ms
FLOW_EXPECTATION_FAILED – destination message did not match expected subset
FLOW_CONTRACT_FAILED – destination message failed contract validation
FLOW_UNEXPECTED_MESSAGE – message arrived when expect_no_json_flow expected silence
FLOW_DESERIALIZATION_FAILED – destination message could not be decoded
FLOW_SCHEMA_ID_MISMATCH – Avro destination schema_id did not match expected
Reports
Markdown and JSON reports can be created from pytest runs:
pytest \
--kafka-contract-report .reports/kafka-contract.md \
--kafka-contract-json-report .reports/kafka-contract.json
Markdown report includes:
summary
passed checks
failed checks
issue codes
topic metadata
schema metadata
JSON report includes structured output for CI pipelines, artifacts, and Slack summaries.
Local Kafka Example
Start local Kafka-compatible infrastructure:
docker compose -f docker-compose.integration.yml up -d
Run integration tests:
PKCT_RUN_INTEGRATION=1 pytest tests/integration -q
Stop services:
docker compose -f docker-compose.integration.yml down -v
What Can This Test?
This package is generalized for Kafka event testing.
Use it for:
- producer event contract tests
- consumer output tests
- Kafka filter services
- CDC pipeline validation
- Debezium output checks
- enrichment pipeline tests
- event transformation tests
- Avro message validation
- Schema Registry subject checks
- CI smoke tests against Kafka topics
- black-box integration tests
It does not care what app framework you use.
Your app can be:
- Java Spring Boot
- Node.js
- Python
- Go
- .NET
- anything that reads/writes Kafka
The package only cares about Kafka inputs, Kafka outputs, contracts, and test results.
It is especially useful when your team already uses pytest and wants lightweight Kafka contract checks without adopting a larger contract-testing platform immediately.
Limitations
This package is focused on pytest-native Kafka contract testing.
It is not:
- A full replacement for Confluent Schema Registry
- A full Pact Broker replacement
- A Kafka monitoring platform
- A production schema governance system
Recommended use:
use Schema Registry for schema governance
use pytest-kafka-contract for automated test validation
License
MIT License
Copyright (c) 2026 Dharsan Guruparan
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pytest_kafka_contract-0.2.4.tar.gz.
File metadata
- Download URL: pytest_kafka_contract-0.2.4.tar.gz
- Upload date:
- Size: 65.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d9ec29d00eab1e752a9fcfd419d0a3b1327801952e2922116bf9a7bfb4c9a0e3
|
|
| MD5 |
106804bfadba69b1e218ead65d01c3af
|
|
| BLAKE2b-256 |
f98ef8d965ae27bf95338b473b12b9a03f0646eef6ea1682f73ca753d8ac6e2d
|
File details
Details for the file pytest_kafka_contract-0.2.4-py3-none-any.whl.
File metadata
- Download URL: pytest_kafka_contract-0.2.4-py3-none-any.whl
- Upload date:
- Size: 37.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
17ba98bca8b9a7c4946a54c3e3ced88f6ed45866d0fab776ed7bf3cee5ff79fe
|
|
| MD5 |
61accaefc7af38677146677c1945ca40
|
|
| BLAKE2b-256 |
16c26e1ca88bf7b65f6d73ce4e786743dd647d76cb35f79934ab938786ebd309
|