Skip to main content

Mobio Kafka SDK

Project description

  • Thư viện Consumer của Profiling :
  • Tự động tạo kafka topics
from mobio.libs.kafka_lib.helpers.ensure_kafka_topic import create_kafka_topics
create_kafka_topics(['test1'])
  • Confluent Kafka Consumer
import os
from mobio.libs.kafka_lib import ConsumerGroup
from pymongo import MongoClient 
from mobio.libs.kafka_lib.helpers.confulent_consumer_manager import ConfluentConsumerManager, ConfluentMessageQueue

# Đây là function khởi tạo client-mongo
def create_db():
    print("create_db: ok")

    try:
        url_connection = os.getenv('MONGODB_URI')
        client = MongoClient(url_connection, connect=False)
    except Exception as ex:
        print('ERROR:: create_db: {}'.format(ex))
        client = None

    return client

class ConfluentKafkaConsumer(ConfluentMessageQueue):

    def __init__(self, mongo_client, topic_name, num_worker, group_id):
        super().__init__(mongo_client, topic_name, num_worker, group_id)

    def process_msg(self, payload):
        print("payload: {}".format(payload))
        raise Exception("test")

def start_confluent_consumer():
    mongo_client = create_db()
    consumer_list = [(ConfluentKafkaConsumer, mongo_client, 'test1', 1, ConsumerGroup.DEFAULT_CONSUMER_GROUP_ID, True)]

    if consumer_list:
        manager = ConfluentConsumerManager(consumer_list)


if __name__ == "__main__":
    # test_create_topic()
    # start_kafka_python()
    start_confluent_consumer()
  • Update version 0.1.1
support Enable/Disable retry consumer
  • Update version 0.1.2
add ConfluentProducerManager
  • Update version 0.1.3
remove pandas
  • Update version 0.1.4
fix consumer commit asynchronous in python 3.6+
  • Update version 0.1.5
upgrade confulent-kafka to v1.5.0 support python 3.5 -> 3.8
  • Update version 0.1.6
upgrade confulent-kafka to v1.6.1 Fix PY_SSIZE_T_CLEAN warning 

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

m-kafka-sdk-0.1.6.tar.gz (7.3 kB view details)

Uploaded Source

File details

Details for the file m-kafka-sdk-0.1.6.tar.gz.

File metadata

  • Download URL: m-kafka-sdk-0.1.6.tar.gz
  • Upload date:
  • Size: 7.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.15.0 pkginfo/1.5.0.1 requests/2.24.0 setuptools/50.3.2 requests-toolbelt/0.9.1 tqdm/4.50.1 CPython/3.5.6

File hashes

Hashes for m-kafka-sdk-0.1.6.tar.gz
Algorithm Hash digest
SHA256 4bbd3c273d0f61064a3a336d9e8b134ae1b1f91ad794e91f350fbed347c85213
MD5 797078fda43aa542bd52eb6ecf7e87b4
BLAKE2b-256 c762eeb23011cfa8a0e8ceeca4501b3dc328422b855751e42cf66c3e81ed8af1

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page