A mock library for confluent kafka
Project description
Mockafka-py is a Python library designed for in-memory mocking of Kafka.
Mockafka: Fake Version for confluent-kafka-python & aiokafka
Features
- Compatible with confluent-kafka
- Compatible with aiokafka (async support)
- Supports Produce, Consume, and AdminClient operations with ease.
Getting Start
Installing via pip or poetry
pip install mockafka-py
# or using poetry
poetry add mockafka-py
Usage
Multi-Decorator Examples for confluent-kafka-python
In the following examples, we showcase the usage of multiple decorators to simulate different scenarios in a Mockafka environment. These scenarios include producing, consuming, and setting up Kafka topics using the provided decorators.
Example 1: Using @produce
and @consume
Decorators
Test Case: test_produce_decorator
from mockafka import produce, consume
@produce(topic='test', key='test_key', value='test_value', partition=4)
@consume(topics=['test'])
def test_produce_and_consume_decorator(message):
"""
This test showcases the usage of both @produce and @consume decorators in a single test case.
It produces a message to the 'test' topic and then consumes it to perform further logic.
# Notice you may get message None
"""
# Your test logic for processing the consumed message here
if not message:
return
pass
Example 2: Using Multiple @produce
Decorators
Test Case: test_produce_twice
from mockafka import produce
@produce(topic='test', key='test_key', value='test_value', partition=4)
@produce(topic='test', key='test_key1', value='test_value1', partition=0)
def test_produce_twice():
# Your test logic here
pass
Example 3: Using @bulk_produce
and @consume
Decorators
Test Case: test_bulk_produce_decorator
from mockafka import bulk_produce, consume
@bulk_produce(list_of_messages=sample_for_bulk_produce)
@consume(topics=['test'])
def test_bulk_produce_and_consume_decorator(message):
"""
This test showcases the usage of both @bulk_produce and @consume decorators in a single test case.
It does bulk produces messages to the 'test' topic and then consumes them to perform further logic.
"""
# Your test logic for processing the consumed message here
pass
Example 4: Using @setup_kafka
and @produce
Decorators
Test Case: test_produce_with_kafka_setup_decorator
from mockafka import setup_kafka, produce
@setup_kafka(topics=[{"topic": "test_topic", "partition": 16}])
@produce(topic='test_topic', partition=5, key='test_', value='test_value1')
def test_produce_with_kafka_setup_decorator():
# Your test logic here
pass
Example 5: Using @setup_kafka
, Multiple @produce
, and @consume
Decorators
Test Case: test_consumer_decorator
from mockafka import setup_kafka, produce, consume
@setup_kafka(topics=[{"topic": "test_topic", "partition": 16}])
@produce(topic='test_topic', partition=5, key='test_', value='test_value1')
@produce(topic='test_topic', partition=5, key='test_', value='test_value1')
@consume(topics=['test_topic'])
def test_consumer_decorator(message: Message = None):
if message is None:
return
# Your test logic for processing the consumed message here
pass
Using classes like confluent-kafka
from mockafka import FakeProducer, FakeConsumer, FakeAdminClientImpl
from mockafka.admin_client import NewTopic
from random import randint
# Create topic
admin = FakeAdminClientImpl()
admin.create_topics([
NewTopic(topic='test', num_partitions=5)
])
# Produce messages
producer = FakeProducer()
for i in range(0, 10):
producer.produce(
topic='test',
key=f'test_key{i}',
value=f'test_value{i}',
partition=randint(0, 4)
)
# Subscribe consumer
consumer = FakeConsumer()
consumer.subscribe(topics=['test'])
# Consume messages
while True:
message = consumer.poll()
print(message)
consumer.commit()
if message is None:
break
Output:
"""
<mockafka.message.Message object at 0x7fe84b4c3310>
<mockafka.message.Message object at 0x7fe84b4c3370>
<mockafka.message.Message object at 0x7fe84b4c33a0>
<mockafka.message.Message object at 0x7fe84b4c33d0>
<mockafka.message.Message object at 0x7fe84b4c3430>
<mockafka.message.Message object at 0x7fe84b4c32e0>
<mockafka.message.Message object at 0x7fe84b4c31f0>
<mockafka.message.Message object at 0x7fe84b4c32b0>
<mockafka.message.Message object at 0x7fe84b4c3400>
<mockafka.message.Message object at 0x7fe84b4c3340>
None
"""
Async support
Multi-Decorator Examples for aiokafka
Example 1: Using @aproduce
and @aconsume
and @asetup_kafka
Decorators
Test Case: test_produce_and_consume_with_decorator
import pytest
from mockafka import aproduce, aconsume, asetup_kafka
@pytest.mark.asyncio
@asetup_kafka(topics=[{'topic': 'test_topic', 'partition': 16}], clean=True)
@aproduce(topic='test_topic', value='test_value', key='test_key', partition=0)
@aconsume(topics=['test_topic'])
async def test_produce_and_consume_with_decorator(message=None):
if not message:
return
assert message.key() == 'test_key'
assert message.value() == 'test_value'
Example 2: Using @aproduce
and @asetup_kafka
Decorators
Test Case: test_produce_with_decorator
import pytest
from mockafka import aproduce, asetup_kafka
from mockafka.aiokafka import FakeAIOKafkaConsumer
@pytest.mark.asyncio
@asetup_kafka(topics=[{'topic': 'test_topic', 'partition': 16}], clean=True)
@aproduce(topic='test_topic', value='test_value', key='test_key', partition=0)
async def test_produce_with_decorator():
consumer = FakeAIOKafkaConsumer()
await consumer.start()
consumer.subscribe(['test_topic'])
message = await consumer.getone()
assert message.key() == 'test_key'
assert message.value() == 'test_value'
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
mockafka_py-0.1.61.tar.gz
(16.9 kB
view details)
Built Distribution
File details
Details for the file mockafka_py-0.1.61.tar.gz
.
File metadata
- Download URL: mockafka_py-0.1.61.tar.gz
- Upload date:
- Size: 16.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.0 CPython/3.12.4
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | cd3c8320f2e7552cbf1641a6da0bb1124f5388789e2408b2e99d978cfcc61fda |
|
MD5 | c308f8d72f65e4cb179b1edffd16c0c9 |
|
BLAKE2b-256 | f85d846e278d3c5e2ccc61ddf360f586b78fdca03c7341aa360ba2a714581511 |
File details
Details for the file mockafka_py-0.1.61-py3-none-any.whl
.
File metadata
- Download URL: mockafka_py-0.1.61-py3-none-any.whl
- Upload date:
- Size: 23.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.0 CPython/3.12.4
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | a61bf1a8526fa9a6cd9a2240e182fb58111bfd6f0b40ff49b50a2a4cfa2b5b15 |
|
MD5 | 470d0d55e3f680a13ac3aa8c4ea42987 |
|
BLAKE2b-256 | c07dd61d6653fe3d6969b0e761a566c2b1d6242e8b1f9d71487266d1be90a277 |