Skip to main content

A connect pulsar message queue package, support multi-threaded production and consumption

Project description

pulsar-thread

一、介绍

一个连接pulsar消息队列的包,优点是:支持多线程生产和消费。

A connect pulsar message queue package, support multi-threaded production and consumption.

  1. 本包是以pulsar-client为基础创建的
  2. pulsar-client使用链接:https://pulsar.apache.org/docs/en/client-libraries-python/
  3. 默认 schema=pulsar.schema.StringSchema()
  4. 若想使用其他的schema, 使用方法与pulsar-client相同, 详情可看上面pulsar-client使用链接
  5. 默认的多线程最大数thread_count为5个

二、使用说明

1. 连接 (client)

 import pulsar_thread as pt
 client = pt.client('pulsar://0.0.0.0:6655')
 
 #请将 0.0.0.0:6655 换成你的pulsar地址

2. 生产者(producer)

 import json
 import pulsar_thread as pt
 
 # 1. 连接client
 client = pt.client('pulsar://0.0.0.0:6655')
    
 # 模拟要发送的数据
 data = {'name':'jack', 'age':25, 'have': ['item1', 'item2']}
 data2 = {'name':'rose', 'age':28, 'have': ['item1', 'item2']}
 data3 = {'name':'joe', 'age':28, 'have': ['item1', 'item2']}
 data4 = {'name':'mark', 'age':28, 'have': ['item1', 'item2']}
 
 # 2. 将要发送的数据和topic组合成字典
 # {'topic_1': msg_1, ... , 'topic_n': msg_n}
 
 data_dict = {'test1':[json.dumps(data), json.dumps(data2)], 'test2':[json.dumps(data3), json.dumps(data4)]}
 
 # 3. 创建生产者
 producer = client.create_producer()
 
 
 # 4. 发送消息 
 # 可选 4.1 同步发送send 或 4.2 异步发送send_async
 
 # 4.1 同步发送
 # 可选 4.1.1 默认模式 或 4.1.2 自定义模式
 
 # 4.1.1 默认模式
 # 默认参数:thread_count=5, schema=pulsar.schema.StringSchema()
 # 默认多线程最大数thread_count为5个, schema是以StringSchema()字符串模式
 
 result = producer.send(data_dict)
 
 # 4.1.2 自定义模式
 # schema参数设置规范详见pulsar-client的使用
 
 result = producer.send_async(data_dict,
                              thread_count=10,
                              schema=pulsar.schema.StringSchema()) 
 
 # 4.2 异步发送
 # 可选 4.2.1 默认模式 或 4.2.2 自定义模式
 
 # 4.2.1 默认模式
 # 默认参数:callback=None, thread_count=5, schema=pulsar.schema.StringSchema()
 # 默认回调函数callback为None, 多线程最大数thread_count为5个, schema是以StringSchema()字符串模式
 
 result = producer.send_async(data_dict)
 
 # 4.2.2 自定义模式(callback, schema参数设置规范详见pulsar-client的使用)
 
 result = producer.send_async(data_dict, 
                              callback=None,
                              thread_count=10,
                              schema=pulsar.schema.StringSchema()) 

3.消费者(consumer)

import json
import pulsar_thread as pt

# 业务程序,处理消息队列发来的消息
# msg 是 接收的消息队列传来的消息
def deal_msg(msg):
    print(msg.value())
    import time
    time.sleep(5)
    print(json.loads(msg.data()))

# 1. 连接
client = pt.client('pulsar://0.0.0.0:6655')

# 2. 创建consumer
# 可从多个 topic 里接收数据
# 默认接收的 schema=pulsar.schema.StringSchema()
# 格式:consumer = client.create_consumer(['topic_1', ......, 'topic_n'], 消费者的名字, schema=pulsar.schema.StringSchema())


consumer = client.create_consumer(['test1', 'test2'], 'my-subscription')


# 3. 接收数据并用业务程序(例:deal_msg)处理
# 可选 3.1  单线程处理 consumer.receive() 或 
# 3.2 多线程处理 consumer.receive_thread()


# 3.1  单线程处理
# 可选 3.1.1 默认模式 或 3.1.2 自定义模式
# 阻塞模式,消费一个,业务程序处理一个,业务程序处理完成,再消费下一个

# 3.1.1 默认模式
# 默认参数:timeout_millis=None, logger=None
# 默认 订阅超时限制 timeout_millis(慎用) 为 None, 单位上ms
# 默认 日志收集器logger 为 None


consumer.receive(deal_msg)


# 3.1.2 自定义模式

import logging,sys

def LogSet():
    # 获取logger实例,如果参数为空则返回root logger
    logger = logging.getLogger("test.log")
    # 指定logger输出格式
    formatter = logging.Formatter('%(asctime)s %(pathname)s %(lineno)d %(levelname)-8s: %(message)s')
    # 文件日志
    file_handler = logging.FileHandler("./test.log")
    file_handler.setFormatter(formatter)  # 可以通过setFormatter指定输出格式
    # 控制台日志
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.formatter = formatter  # 也可以直接给formatter赋值
    # 为logger添加的日志处理器
    logger.addHandler(file_handler)
    logger.addHandler(console_handler)
    # 指定日志的最低输出级别,默认为WARN级别
    logger.setLevel(logging.DEBUG)
    # 移除一些日志处理器
    return logger, file_handler

# 获取 logger
logger,_=LogSet()

consumer.receive(deal_msg, timeout_millis=5000, logger=logger)

# 3.2 多线程处理
# 可选 3.2.1 默认模式 或 3.2.2 自定义模式
# 可以使用多线程进行并发消费,处理业务数据,提高效率


# 3.2.1 默认模式
# 默认参数:thread_count=5, timeout_millis=None, logger=None
# 默认 最大线程数thread_count为5个
# 默认 订阅超时限制 timeout_millis(慎用) 为 None, 单位上ms
# 默认 日志收集器logger 为 None

consumer.receive_thread(deal_msg)

# 3.1.2 自定义模式

# 例 1
consumer.receive_thread(deal_msg, 2)

# 例 2
consumer.receive_thread(deal_msg, 10, timeout_millis=5000, logger=logger)

三、使用 pulsar-client(pulsar.Client)连接时

pulsar_thread的create_producer和create_consumer的使用

1. 生产者(producer)
import json
import pulsar_thread as pt
import pulsar

# 使用 pulsar 连接
client = pulsar.Client('pulsar://0.0.0.0:6655')

data = {'name':'jack', 'age':25, 'have': ['item1', 'item2']}
data2 = {'name':'rose', 'age':28, 'have': ['item1', 'item2']}
data3 = {'name':'joe', 'age':28, 'have': ['item1', 'item2']}
data4 = {'name':'mark', 'age':28, 'have': ['item1', 'item2']}

data_dict = {'test1':[json.dumps(data), json.dumps(data2)], 'test2':[json.dumps(data3), json.dumps(data4)]}

# 使用 pulsar_thread 创建生产者
producer = pt.create_producer(client)

result = producer.send(data_dict)
2. 消费者(consumer)
import json
import pulsar_thread as pt
import pulsar


def deal_msg(msg):
    print(msg.value())
    import time
    time.sleep(5)
    print(json.loads(msg.data()))

# 使用 pulsar 连接
client = pulsar.Client('pulsar://0.0.0.0:6655')

# 使用 pulsar_thread 创建消费者
consumer = pt.create_consumer(client, ['test1', 'test2'], 'my-subscription')

consumer.receive_thread(deal_msg, 2)

四、关于schema模式

1. schema支持的模式
schema note
BytesSchema Get the raw payload as a bytes object. No serialization/deserialization are performed. This is the default schema mode
StringSchema Encode/decode payload as a UTF-8 string. Uses str objects
JsonSchema Require record definition. Serializes the record into standard JSON payload
AvroSchema Require record definition. Serializes in AVRO format
2. schema参数用法和pulsar-client相同

若想拓展使用schema,请移步至pulsar-client文档,阅读使用schema。 pulsar-client文档链接:https://pulsar.apache.org/docs/en/client-libraries-python/

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pulsar-thread-0.1.2.tar.gz (9.5 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page