Mobio Kafka SDK
Project description
-
Thư viện Consumer của JB. Chạy consumer ở Process, phù hợp cho môi trường K8s :
-
NOTE: SDK sử dụng confluent-kafka depend requirements của module. SDK không có install_requires confluent-kafka :) ::cheers::
create topic
Note: truyền vào list tuple với:
- index 0: tên topic
- index 1: số partitions
- index 2: topic settings
from mobio.libs.kafka_lib.helpers.ensure_kafka_topic import create_kafka_topics_v2
create_kafka_topics_v2([("test", 8, {"compression.type": "zstd"})])
Producer
from mobio.libs.kafka_lib.helpers.kafka_producer_manager import KafkaProducerManager
KafkaProducerManager().flush_message(topic="test", key="uuid", value={"test":1})
Consumer
import os
from time import sleep
from pymongo import MongoClient
from mobio.libs.kafka_lib.helpers.kafka_consumer_manager import BaseKafkaConsumer
class TestConsumer(BaseKafkaConsumer):
def message_handle(self, data):
print("TestConsumer: data: {}".format(data))
if __name__ == "__main__":
url_connection = os.getenv('TEST_MONGO_URI')
client_mongo = MongoClient(url_connection, connect=False)
TestConsumer(topic_name="test", group_id="test", client_mongo=client_mongo, retryable=False)
sleep(1000)
change logs
-
0.1.4 (2022-08-23):
- Bổ sung thêm phần lưu mapping pod-name và client-id vào file
-
0.1.3:
- Mặc định compress messages ở đầu producer
- Function create kafka topic hỗ trợ truyền số partitions và settings của topic
-
0.1.1: fix bug init Config
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
m-kafka-sdk-v2-0.1.4.tar.gz
(6.9 kB
view hashes)