Skip to main content

A python library to easily create kafka producer and consumer

Project description

Heizer

A python library to easily create kafka producer and consumer

Install

pip install --pre heizer

Setup

Use docker-compose.yaml file to start kafka service

docker-compose up -d

Sample

Producer

from heizer import HeizerConfig, HeizerTopic, producer

producer_config = HeizerConfig(
    {
        "bootstrap.servers": "localhost:9092",
    }
)

my_topics = [
    HeizerTopic(name="my.topic1", partitions=[0]),
    HeizerTopic(name="my.topic2", partitions=[0, 1]),
]


@producer(
    topics=my_topics,
    config=producer_config,
)
def my_producer(my_name: str):
    return {
        "name": my_name
    }


if __name__ == "__main__":
    my_producer("Jack")
    my_producer("Alice")

Consumer

from heizer import HeizerConfig, HeizerTopic, consumer, producer, HeizerMessage
import json

producer_config = HeizerConfig(
    {
        "bootstrap.servers": "localhost:9092",
    }
)

consumer_config = HeizerConfig(
    {
        "bootstrap.servers": "localhost:9092",
        "group.id": "default",
        "auto.offset.reset": "earliest",
    }
)

topics = [HeizerTopic(name="my.topic1")]


@producer(
    topics=topics,
    config=producer_config
)
def produce_data(status: str, result: str):
    return {
        "status": status,
        "result": result,
    }


# Heizer expects consumer stopper func return Bool type result
# For this example, consumer will stop and return value if 
# `status` is `success` in msg
# If there is no stopper func, consumer will keep running forever
def stopper(msg: HeizerMessage):
    data = json.loads(msg.value)
    if data["status"] == "success":
        return True

    return False


@consumer(
    topics=topics,
    config=consumer_config,
    stopper=stopper,
)
def consume_data(message: HeizerMessage):
    data = json.loads(message.value)
    print(data)
    return data["result"]


if __name__ == "__main__":
    produce_data("start", "1")
    produce_data("loading", "2")
    produce_data("success", "3")
    produce_data("postprocess", "4")

    result = consume_data()

    print("Expected Result:", result)

After you executed this code block, you will see those output on your terminal

{'status': 'start', 'result': '1'}
{'status': 'loading', 'result': '2'}
{'status': 'success', 'result': '3'}

Expected Result: 3

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

heizer-999999.dev1679777007.tar.gz (627.7 kB view details)

Uploaded Source

Built Distribution

heizer-999999.dev1679777007-py3-none-any.whl (9.8 kB view details)

Uploaded Python 3

File details

Details for the file heizer-999999.dev1679777007.tar.gz.

File metadata

  • Download URL: heizer-999999.dev1679777007.tar.gz
  • Upload date:
  • Size: 627.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.9.16

File hashes

Hashes for heizer-999999.dev1679777007.tar.gz
Algorithm Hash digest
SHA256 955fc3166b64ab3af7f0a3e292faf505e1fa1d69b78795649576e7fdae58f9c9
MD5 bed0ae72d3d4ab79c7fea8977a30f9b0
BLAKE2b-256 7a90ae5b2d7e210258f023cf9022765c4a19ebf5e05a18f31e505dd0a6df1f72

See more details on using hashes here.

File details

Details for the file heizer-999999.dev1679777007-py3-none-any.whl.

File metadata

File hashes

Hashes for heizer-999999.dev1679777007-py3-none-any.whl
Algorithm Hash digest
SHA256 445a4f0330759377fdd1e4f1753cd7cb996fd3a2fabd51a1580480978ad06be6
MD5 2a118e3f05d589b18200cb5bb5eb4c12
BLAKE2b-256 a2a9f4e2a892d98fd3a1ad10d2a066e17d6b2b80d387f4a774868bbac52ed4e4

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page