A small example package
Project description
Yet another kafka consumer!
Installation
To use the consumer you can add it as a dependency to your project.
PIP
python -m pip install --user cb-kafka-consumer
Pipfile
[packages]
cb-kafka-consumer = "~=0.0.4"
An example handler
Make sure to use the right values to initiate CBKafkaConsumer
from cb_kafka_consumer.src.consumer import CBKafkaConsumer, Message
class MyConsumer:
def __init__(self):
self.__consumer = CBKafkaConsumer('my-topic', 'my-group-id', '127.0.0.1:9092', self.handler, batch_size=20)
self.__consumer.start()
def handler(self, msg: Message):
print(msg.get_offset(), msg.msg, msg)
self.__consumer.commit(msg)
Commit policy
There are two different approaches dealing with commit policy.
-
auto_commit=True
You can use auto_commit=True
when initiating the consumer to instruct it to automatically
commit received messages right after they have been handed over to the handler callback. When
using auto_commit
, the handler callback is not expected to explicitly call the commit
method of consumer object.
-
auto_commit=False (default behavior)
If auto_commit
is not specified or set to False
the consumer will only commit messages
right before the item in the sequence where it's not committed (handling may have probably failed).
To further demonstrate this let's assume we the consumer has received messages with the following
offsets:
1, 2, 3, 4, 5
The consumer now hands over the received messages to the handler callback. Now let's say the callback
processes message #1 and #2 successfully and commits these two messages but fails to process #3. Next,
messages #4 and #5 are successfully processed and committed. The consumer will only commit message #1
and #2 and will not commit succeeded messages until commit
is called with message #3 and will
only then move onward.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for cb_kafka_consumer-0.0.6-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 0893b23238bf29ce39594c511086ad013ed984811d9cac4acc96a42993105f96 |
|
MD5 | 816ebb8242ecc0ffe2bbf828ed551f10 |
|
BLAKE2b-256 | b056a259c905b024f4707da2d345f8b03b76e8a08502cbcddf20665b64734d64 |