A python library to easily create kafka producer and consumer
Project description
Heizer
A python library to easily create kafka producer and consumer
Install
pip install --pre heizer
Setup
Use docker-compose.yaml
file to start kafka service
docker-compose up -d
Sample
Producer
from heizer import HeizerConfig, HeizerTopic, producer
producer_config = HeizerConfig(
{
"bootstrap.servers": "localhost:9092",
}
)
my_topics = [
HeizerTopic(name="my.topic1", partitions=[0]),
HeizerTopic(name="my.topic2", partitions=[0, 1]),
]
@producer(
topics=my_topics,
config=producer_config,
)
def my_producer(my_name: str):
return {
"name": my_name
}
if __name__ == "__main__":
my_producer("Jack")
my_producer("Alice")
Consumer
from heizer import HeizerConfig, HeizerTopic, consumer, producer, HeizerMessage
import json
producer_config = HeizerConfig(
{
"bootstrap.servers": "localhost:9092",
}
)
consumer_config = HeizerConfig(
{
"bootstrap.servers": "localhost:9092",
"group.id": "default",
"auto.offset.reset": "earliest",
}
)
topics = [HeizerTopic(name="my.topic1")]
@producer(
topics=topics,
config=producer_config
)
def produce_data(status: str, result: str):
return {
"status": status,
"result": result,
}
# Heizer expects consumer stopper func return Bool type result
# For this example, consumer will stop and return value if
# `status` is `success` in msg
# If there is no stopper func, consumer will keep running forever
def stopper(msg: HeizerMessage):
data = json.loads(msg.value)
if data["status"] == "success":
return True
return False
@consumer(
topics=topics,
config=consumer_config,
stopper=stopper,
)
def consume_data(message: HeizerMessage):
data = json.loads(message.value)
print(data)
return data["result"]
if __name__ == "__main__":
produce_data("start", "1")
produce_data("loading", "2")
produce_data("success", "3")
produce_data("postprocess", "4")
result = consume_data()
print("Expected Result:", result)
After you executed this code block, you will see those output on your terminal
{'status': 'start', 'result': '1'}
{'status': 'loading', 'result': '2'}
{'status': 'success', 'result': '3'}
Expected Result: 3
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
heizer-999999.dev1679777007.tar.gz
(627.7 kB
view details)
Built Distribution
File details
Details for the file heizer-999999.dev1679777007.tar.gz
.
File metadata
- Download URL: heizer-999999.dev1679777007.tar.gz
- Upload date:
- Size: 627.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.9.16
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 955fc3166b64ab3af7f0a3e292faf505e1fa1d69b78795649576e7fdae58f9c9 |
|
MD5 | bed0ae72d3d4ab79c7fea8977a30f9b0 |
|
BLAKE2b-256 | 7a90ae5b2d7e210258f023cf9022765c4a19ebf5e05a18f31e505dd0a6df1f72 |
File details
Details for the file heizer-999999.dev1679777007-py3-none-any.whl
.
File metadata
- Download URL: heizer-999999.dev1679777007-py3-none-any.whl
- Upload date:
- Size: 9.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.9.16
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 445a4f0330759377fdd1e4f1753cd7cb996fd3a2fabd51a1580480978ad06be6 |
|
MD5 | 2a118e3f05d589b18200cb5bb5eb4c12 |
|
BLAKE2b-256 | a2a9f4e2a892d98fd3a1ad10d2a066e17d6b2b80d387f4a774868bbac52ed4e4 |