Skip to main content

Mobio Profiling Libs

Project description

  • Thư viện Consumer của Profiling :
  • Tự động tạo kafka topics
from mobio.libs.kafka_consumer_lib.helpers.ensure_kafka_topic import create_kafka_topics
create_kafka_topics(['test1'])
  • Kafka Python Consumer
from mobio.libs.kafka_consumer_lib.helpers.kafka_python_consumer_manager_v2 import KafkaPythonConsumerManagerV2, KafkaPythonMessageQueue
from mobio.libs.kafka_consumer_lib import ConsumerGroup
import os
from pymongo import MongoClient

# Đây là function khởi tạo client-mongo
def create_db():
    print("create_db: ok")

    try:
        url_connection = os.getenv('PROFILING_MONGODB_URI')
        client = MongoClient(url_connection, connect=False)
    except Exception as ex:
        print('ERROR BaseModel::create_db: %r', ex)
        client = None

    return client

# Đây là class xử lý message
class KafkaPythonConsumer(KafkaPythonMessageQueue):
    def __init__(self, mongo_client, topic_name, num_worker, group_id):
        super().__init__(mongo_client, topic_name, num_worker, group_id)

    def process_msg(self, payload):
        try:
            print('payload: {}'.format(payload))
        except Exception as e:
            print("ThreadMergeConsumer::ERR: {}".format(e))

# Đây là function khởi tạo consumers
def start_kafka_python():
    mongo_client = create_db()
    # FORMAT: CLASS, CLIENT-MONGO, TOPIC, NUMBER_CONSUMER, GROUP_NAME
    consumer_list = [(KafkaPythonConsumer, mongo_client, 'test1', 1, ConsumerGroup.DEFAULT_CONSUMER_GROUP_ID)]

    if consumer_list:
        manager = KafkaPythonConsumerManagerV2(consumer_list)
if __name__ == "__main__":
    start_kafka_python()
  • Confluent Kafka Consumer
import os
from mobio.libs.kafka_consumer_lib import ConsumerGroup
from pymongo import MongoClient 
from mobio.libs.kafka_consumer_lib.helpers.confulent_consumer_manager import ConfluentConsumerManager, ConfluentMessageQueue

# Đây là function khởi tạo client-mongo
def create_db():
    print("create_db: ok")

    try:
        url_connection = os.getenv('PROFILING_MONGODB_URI')
        client = MongoClient(url_connection, connect=False)
    except Exception as ex:
        print('ERROR BaseModel::create_db: %r', ex)
        client = None

    return client

class ConfluentKafkaConsumer(ConfluentMessageQueue):

    def __init__(self, mongo_client, topic_name, num_worker, group_id):
        super().__init__(mongo_client, topic_name, num_worker, group_id)

    def process_msg(self, payload):
        print('payload: {}'.format(payload))
        raise Exception('test')

def start_confluent_consumer():
    mongo_client = create_db()
    consumer_list = [(ConfluentKafkaConsumer, mongo_client, 'test1', 1, ConsumerGroup.DEFAULT_CONSUMER_GROUP_ID)]

    if consumer_list:
        manager = ConfluentConsumerManager(consumer_list)


if __name__ == "__main__":
    # test_create_topic()
    # start_kafka_python()
    start_confluent_consumer()

** Update version 0.1.1

  • support Enable/Disable retry consumer

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

m-profiling-sdk-0.1.1.tar.gz (7.1 kB view details)

Uploaded Source

File details

Details for the file m-profiling-sdk-0.1.1.tar.gz.

File metadata

  • Download URL: m-profiling-sdk-0.1.1.tar.gz
  • Upload date:
  • Size: 7.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.15.0 pkginfo/1.5.0.1 requests/2.24.0 setuptools/50.3.0 requests-toolbelt/0.9.1 tqdm/4.50.1 CPython/3.5.6

File hashes

Hashes for m-profiling-sdk-0.1.1.tar.gz
Algorithm Hash digest
SHA256 54639421fce9eb4b1fb7fb624696cde3c887d0d2ee44ed2d712ba52ba867be5d
MD5 34310fb1957fe7a9ff8fd0552d33c975
BLAKE2b-256 b99ad32459a9ab9ec73ecee190ebb62d757a08e1289678da56a6bcbaec883e6f

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page