Skip to main content

A python library to easily create kafka producer and consumer

Project description

Heizer

A python library to easily create kafka producer and consumer

Install

pip install --pre heizer

Setup

Use docker-compose.yaml file to start kafka service

docker-compose up -d

Sample

Producer

from heizer import HeizerConfig, HeizerTopic, producer

producer_config = HeizerConfig(
    {
        "bootstrap.servers": "localhost:9092",
    }
)

my_topics = [
    HeizerTopic(name="my.topic1", partitions=[0]),
    HeizerTopic(name="my.topic2", partitions=[0, 1]),
]


@producer(
    topics=my_topics,
    config=producer_config,
)
def my_producer(my_name: str):
    return {
        "name": my_name
    }


if __name__ == "__main__":
    my_producer("Jack")
    my_producer("Alice")

Consumer

from heizer import HeizerConfig, HeizerTopic, consumer, producer, HeizerMessage
import json

producer_config = HeizerConfig(
    {
        "bootstrap.servers": "localhost:9092",
    }
)

consumer_config = HeizerConfig(
    {
        "bootstrap.servers": "localhost:9092",
        "group.id": "default",
        "auto.offset.reset": "earliest",
    }
)

topics = [HeizerTopic(name="my.topic1")]


@producer(
    topics=topics,
    config=producer_config
)
def produce_data(status: str, result: str):
    return {
        "status": status,
        "result": result,
    }


# Heizer expects consumer stopper func return Bool type result
# For this example, consumer will stop and return value if 
# `status` is `success` in msg
# If there is no stopper func, consumer will keep running forever
def stopper(msg: HeizerMessage):
    data = json.loads(msg.value)
    if data["status"] == "success":
        return True

    return False


@consumer(
    topics=topics,
    config=consumer_config,
    stopper=stopper,
)
def consume_data(message: HeizerMessage):
    data = json.loads(message.value)
    print(data)
    return data["result"]


if __name__ == "__main__":
    produce_data("start", "1")
    produce_data("loading", "2")
    produce_data("success", "3")
    produce_data("postprocess", "4")

    result = consume_data()

    print("Expected Result:", result)

After you executed this code block, you will see those output on your terminal

{'status': 'start', 'result': '1'}
{'status': 'loading', 'result': '2'}
{'status': 'success', 'result': '3'}

Expected Result: 3

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

heizer-999999.dev1679776579.tar.gz (662.7 kB view details)

Uploaded Source

Built Distribution

heizer-999999.dev1679776579-py3-none-any.whl (33.2 kB view details)

Uploaded Python 3

File details

Details for the file heizer-999999.dev1679776579.tar.gz.

File metadata

  • Download URL: heizer-999999.dev1679776579.tar.gz
  • Upload date:
  • Size: 662.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.9.16

File hashes

Hashes for heizer-999999.dev1679776579.tar.gz
Algorithm Hash digest
SHA256 288ce52fcee9e36382021c509f85e15555c1d5ed81bc785d472e3f685a87119a
MD5 b82be0ce47f164af1b549295c652605b
BLAKE2b-256 17e3450c43e48eb607fffa14f79510a5c845e3603da814caa0da4e1319c7cf81

See more details on using hashes here.

File details

Details for the file heizer-999999.dev1679776579-py3-none-any.whl.

File metadata

File hashes

Hashes for heizer-999999.dev1679776579-py3-none-any.whl
Algorithm Hash digest
SHA256 72db0772f4df330cd12522a387c6c0e20065307099f40f68df5be97d139e38a3
MD5 a46c932ecb0b977a9732dbd82bd8a27d
BLAKE2b-256 db3dd1d905e46826eec504883cd4f3e660e6b18bc68c5eff6b268fb4482b56e4

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page