Mobio Kafka SDK
Project description
- Thư viện Consumer của Profiling :
- Tự động tạo kafka topics
from mobio.libs.kafka_lib.helpers.ensure_kafka_topic import create_kafka_topics
create_kafka_topics(['test1'])
- Confluent Kafka Consumer
import os
from mobio.libs.kafka_lib import ConsumerGroup
from pymongo import MongoClient
from mobio.libs.kafka_lib.helpers.confulent_consumer_manager import ConfluentConsumerManager, ConfluentMessageQueue
# Đây là function khởi tạo client-mongo
def create_db():
print("create_db: ok")
try:
url_connection = os.getenv('MONGODB_URI')
client = MongoClient(url_connection, connect=False)
except Exception as ex:
print('ERROR:: create_db: {}'.format(ex))
client = None
return client
class ConfluentKafkaConsumer(ConfluentMessageQueue):
def __init__(self, mongo_client, topic_name, num_worker, group_id):
super().__init__(mongo_client, topic_name, num_worker, group_id)
def process_msg(self, payload):
print("payload: {}".format(payload))
raise Exception("test")
def start_confluent_consumer():
mongo_client = create_db()
consumer_list = [(ConfluentKafkaConsumer, mongo_client, 'test1', 1, ConsumerGroup.DEFAULT_CONSUMER_GROUP_ID, True)]
if consumer_list:
manager = ConfluentConsumerManager(consumer_list)
if __name__ == "__main__":
# test_create_topic()
# start_kafka_python()
start_confluent_consumer()
- Update version 0.1.1
support Enable/Disable retry consumer
- Update version 0.1.2
add ConfluentProducerManager
- Update version 0.1.3
remove pandas
- Update version 0.1.4
fix consumer commit asynchronous in python 3.6+
- Update version 0.1.5
upgrade confulent-kafka to v1.5.0 support python 3.5 -> 3.8
- Update version 0.1.6
upgrade confulent-kafka to v1.6.1 Fix PY_SSIZE_T_CLEAN warning
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
m-kafka-sdk-0.1.6.tar.gz
(7.3 kB
view details)
File details
Details for the file m-kafka-sdk-0.1.6.tar.gz
.
File metadata
- Download URL: m-kafka-sdk-0.1.6.tar.gz
- Upload date:
- Size: 7.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/1.15.0 pkginfo/1.5.0.1 requests/2.24.0 setuptools/50.3.2 requests-toolbelt/0.9.1 tqdm/4.50.1 CPython/3.5.6
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 4bbd3c273d0f61064a3a336d9e8b134ae1b1f91ad794e91f350fbed347c85213 |
|
MD5 | 797078fda43aa542bd52eb6ecf7e87b4 |
|
BLAKE2b-256 | c762eeb23011cfa8a0e8ceeca4501b3dc328422b855751e42cf66c3e81ed8af1 |