Skip to main content

A small example package

Project description

Yet another kafka consumer!

Installation

To use the consumer you can add it as a dependency to your project.

PIP

python -m pip install --user cb-kafka-consumer

Pipfile

[packages]
cb-kafka-consumer = "~=0.1.5"

An example decoder / handler

Make sure to use the right values to initiate CBKafkaConsumer. It is necessary to pass the right decoder callback to decode the received raw message. An example json decoder is provided below.

import json
import logging

from cb_kafka_consumer.src.consumer import CBKafkaConsumer, Message


class MyConsumer:
    def __init__(self):
        self.__consumer = CBKafkaConsumer(
            '127.0.0.1:9092',
            'my-group-id',
            'my-topic',
            self.__handler,
            self.__decoder,
            start_from='latest')
        self.__consumer.start()

    def __handler(self, msg: Message):
        print(msg.get_offset(), msg.msg)
        self.__consumer.commit(msg)

    def __decoder(self, raw_msg: bytes):
        try:
            return json.loads(raw_msg)
        except ValueError:
            logging.error(f'Message cannot be parsed\n{raw_msg}')
            return None

Commit policy

There are two different approaches dealing with commit policy.

  • auto_commit=True

You can use auto_commit=True when initiating the consumer to instruct it to automatically commit received messages right after they have been handed over to the handler callback. When using auto_commit, the handler callback is not expected to explicitly call the commit method of consumer object.

  • auto_commit=False (default behavior)

If auto_commit is not specified or set to False the consumer will only commit messages right before the item in the sequence where it's not committed (handling may have probably failed). To further demonstrate this let's assume we the consumer has received messages with the following offsets:

1, 2, 3, 4, 5

The consumer now hands over the received messages to the handler callback. Now let's say the callback processes message #1 and #2 successfully and commits these two messages but fails to process #3. Next, messages #4 and #5 are successfully processed and committed. The consumer will only commit message #1 and #2 and will not commit succeeded messages until commit is called with message #3 and will only then move onward.

Where to start receiving messages from

To control the start point the start_from argument of constructor can be utilized. It accepts either string values ('earliest', 'latest') or integer values. In case you might need to start from a specific message use its offset as start_from. Let's say you need to start the consumer and instruct it to start from message with offset 3743313. The below statement constructs an instance of consumer as desired:

self.__consumer = CBKafkaConsumer(
    '127.0.0.1:9092',
    'my-group-id',
    'my-topic',
    handler_method,
    decoder_method,
    start_from=3743313)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cb_kafka_consumer-0.1.6.tar.gz (4.5 kB view details)

Uploaded Source

Built Distribution

cb_kafka_consumer-0.1.6-py3-none-any.whl (4.9 kB view details)

Uploaded Python 3

File details

Details for the file cb_kafka_consumer-0.1.6.tar.gz.

File metadata

  • Download URL: cb_kafka_consumer-0.1.6.tar.gz
  • Upload date:
  • Size: 4.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.11.1

File hashes

Hashes for cb_kafka_consumer-0.1.6.tar.gz
Algorithm Hash digest
SHA256 ee9d0f7fe60cb49bedf8ea538d941b7c5572d643d2557154a10d2b24488f9a9a
MD5 b99840cff5682848e7c6d9a908723a23
BLAKE2b-256 ade8f55d83d8608559dc58631f69e29900f5d2a99d0f89470e16ea22d394eacf

See more details on using hashes here.

File details

Details for the file cb_kafka_consumer-0.1.6-py3-none-any.whl.

File metadata

File hashes

Hashes for cb_kafka_consumer-0.1.6-py3-none-any.whl
Algorithm Hash digest
SHA256 9301dbce16e6c543ea0f77fc4d0c4b2b47ee6494430cdc046e64ad3ac4b19b35
MD5 5041c223056f9fc94a098f748b5745bc
BLAKE2b-256 7963510184314cf0ef60bfb3ec495f3faa56aa4e7a72b9e48021fdbee9bc345b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page