Skip to main content

Mobio Profiling Libs

Project description

  • Thư viện Consumer của Profiling :
  • Tự động tạo kafka topics
from mobio.libs.kafka_consumer_lib.helpers.ensure_kafka_topic import create_kafka_topics
create_kafka_topics(['test1'])
  • Kafka Python Consumer
from mobio.libs.kafka_consumer_lib.helpers.kafka_python_consumer_manager_v2 import KafkaPythonConsumerManagerV2, KafkaPythonMessageQueue
from mobio.libs.kafka_consumer_lib import ConsumerGroup
import os
from pymongo import MongoClient

# Đây là function khởi tạo client-mongo
def create_db():
    print("create_db: ok")

    try:
        url_connection = os.getenv('PROFILING_MONGODB_URI')
        client = MongoClient(url_connection, connect=False)
    except Exception as ex:
        print('ERROR BaseModel::create_db: %r', ex)
        client = None

    return client

# Đây là class xử lý message
class KafkaPythonConsumer(KafkaPythonMessageQueue):
    def __init__(self, mongo_client, topic_name, num_worker, group_id):
        super().__init__(mongo_client, topic_name, num_worker, group_id)

    def process_msg(self, payload):
        try:
            print('payload: {}'.format(payload))
        except Exception as e:
            print("ThreadMergeConsumer::ERR: {}".format(e))

# Đây là function khởi tạo consumers
def start_kafka_python():
    mongo_client = create_db()
    # FORMAT: CLASS, CLIENT-MONGO, TOPIC, NUMBER_CONSUMER, GROUP_NAME
    consumer_list = [(KafkaPythonConsumer, mongo_client, 'test1', 1, ConsumerGroup.DEFAULT_CONSUMER_GROUP_ID)]

    if consumer_list:
        manager = KafkaPythonConsumerManagerV2(consumer_list)
if __name__ == "__main__":
    start_kafka_python()
  • Confluent Kafka Consumer
import os
from mobio.libs.kafka_consumer_lib import ConsumerGroup
from pymongo import MongoClient 
from mobio.libs.kafka_consumer_lib.helpers.confulent_consumer_manager import ConfluentConsumerManager, ConfluentMessageQueue

# Đây là function khởi tạo client-mongo
def create_db():
    print("create_db: ok")

    try:
        url_connection = os.getenv('PROFILING_MONGODB_URI')
        client = MongoClient(url_connection, connect=False)
    except Exception as ex:
        print('ERROR BaseModel::create_db: %r', ex)
        client = None

    return client

class ConfluentKafkaConsumer(ConfluentMessageQueue):

    def __init__(self, mongo_client, topic_name, num_worker, group_id):
        super().__init__(mongo_client, topic_name, num_worker, group_id)

    def process_msg(self, payload):
        print('payload: {}'.format(payload))
        raise Exception('test')

def start_confluent_consumer():
    mongo_client = create_db()
    consumer_list = [(ConfluentKafkaConsumer, mongo_client, 'test1', 1, ConsumerGroup.DEFAULT_CONSUMER_GROUP_ID)]

    if consumer_list:
        manager = ConfluentConsumerManager(consumer_list)


if __name__ == "__main__":
    # test_create_topic()
    # start_kafka_python()
    start_confluent_consumer()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

Profiling Libs-0.1.2.tar.gz (6.9 kB view details)

Uploaded Source

File details

Details for the file Profiling Libs-0.1.2.tar.gz.

File metadata

  • Download URL: Profiling Libs-0.1.2.tar.gz
  • Upload date:
  • Size: 6.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.15.0 pkginfo/1.5.0.1 requests/2.24.0 setuptools/50.3.0 requests-toolbelt/0.9.1 tqdm/4.50.1 CPython/3.5.6

File hashes

Hashes for Profiling Libs-0.1.2.tar.gz
Algorithm Hash digest
SHA256 5fb52227d64658c3caa6b67f76a4f40acc0544fc6713d8dad03ce1635504315b
MD5 c4781d20730f81169e123924fc376e67
BLAKE2b-256 93db07242544b1e25f313ccfb2dc3eec33d16dafeb9f20ae2252b9bc7ea621f0

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page