A connect pulsar message queue package, support multi-threaded production and consumption
Project description
pulsar-thread
一、介绍
一个连接pulsar消息队列的包,优点是:支持多线程生产和消费。
A connect pulsar message queue package, support multi-threaded production and consumption.
- 本包是以pulsar-client为基础创建的
- pulsar-client使用链接:https://pulsar.apache.org/docs/en/client-libraries-python/
- 默认 schema=pulsar.schema.StringSchema()
- 若想使用其他的schema, 使用方法与pulsar-client相同, 详情可看上面pulsar-client使用链接
- 默认的多线程最大数thread_count为5个
二、使用说明
1. 连接 (client)
import pulsar_thread as pt
client = pt.client('pulsar://0.0.0.0:6655')
#请将 0.0.0.0:6655 换成你的pulsar地址
2. 生产者(producer)
import json
import pulsar_thread as pt
# 1. 连接client
client = pt.client('pulsar://0.0.0.0:6655')
# 模拟要发送的数据
data = {'name':'jack', 'age':25, 'have': ['item1', 'item2']}
data2 = {'name':'rose', 'age':28, 'have': ['item1', 'item2']}
data3 = {'name':'joe', 'age':28, 'have': ['item1', 'item2']}
data4 = {'name':'mark', 'age':28, 'have': ['item1', 'item2']}
# 2. 将要发送的数据和topic组合成字典
# {'topic_1': msg_1, ... , 'topic_n': msg_n}
data_dict = {'test1':[json.dumps(data), json.dumps(data2)], 'test2':[json.dumps(data3), json.dumps(data4)]}
# 3. 创建生产者
producer = client.create_producer()
# 4. 发送消息
# 可选 4.1 同步发送send 或 4.2 异步发送send_async
# 4.1 同步发送
# 可选 4.1.1 默认模式 或 4.1.2 自定义模式
# 4.1.1 默认模式
# 默认参数:thread_count=5, schema=pulsar.schema.StringSchema()
# 默认多线程最大数thread_count为5个, schema是以StringSchema()字符串模式
result = producer.send(data_dict)
# 4.1.2 自定义模式
# schema参数设置规范详见pulsar-client的使用
result = producer.send_async(data_dict,
thread_count=10,
schema=pulsar.schema.StringSchema())
# 4.2 异步发送
# 可选 4.2.1 默认模式 或 4.2.2 自定义模式
# 4.2.1 默认模式
# 默认参数:callback=None, thread_count=5, schema=pulsar.schema.StringSchema()
# 默认回调函数callback为None, 多线程最大数thread_count为5个, schema是以StringSchema()字符串模式
result = producer.send_async(data_dict)
# 4.2.2 自定义模式(callback, schema参数设置规范详见pulsar-client的使用)
result = producer.send_async(data_dict,
callback=None,
thread_count=10,
schema=pulsar.schema.StringSchema())
3.消费者(consumer)
import json
import pulsar_thread as pt
# 业务程序,处理消息队列发来的消息
# msg 是 接收的消息队列传来的消息
def deal_msg(msg):
print(msg.value())
import time
time.sleep(5)
print(json.loads(msg.data()))
# 1. 连接
client = pt.client('pulsar://0.0.0.0:6655')
# 2. 创建consumer
# 可从多个 topic 里接收数据
# 默认接收的 schema=pulsar.schema.StringSchema()
# 格式:consumer = client.create_consumer(['topic_1', ......, 'topic_n'], 消费者的名字, schema=pulsar.schema.StringSchema())
consumer = client.create_consumer(['test1', 'test2'], 'my-subscription')
# 3. 接收数据并用业务程序(例:deal_msg)处理
# 可选 3.1 单线程处理 consumer.receive() 或
# 3.2 多线程处理 consumer.receive_thread()
# 3.1 单线程处理
# 可选 3.1.1 默认模式 或 3.1.2 自定义模式
# 阻塞模式,消费一个,业务程序处理一个,业务程序处理完成,再消费下一个
# 3.1.1 默认模式
# 默认参数:timeout_millis=None, logger=None
# 默认 订阅超时限制 timeout_millis(慎用) 为 None, 单位上ms
# 默认 日志收集器logger 为 None
consumer.receive(deal_msg)
# 3.1.2 自定义模式
import logging,sys
def LogSet():
# 获取logger实例,如果参数为空则返回root logger
logger = logging.getLogger("test.log")
# 指定logger输出格式
formatter = logging.Formatter('%(asctime)s %(pathname)s %(lineno)d %(levelname)-8s: %(message)s')
# 文件日志
file_handler = logging.FileHandler("./test.log")
file_handler.setFormatter(formatter) # 可以通过setFormatter指定输出格式
# 控制台日志
console_handler = logging.StreamHandler(sys.stdout)
console_handler.formatter = formatter # 也可以直接给formatter赋值
# 为logger添加的日志处理器
logger.addHandler(file_handler)
logger.addHandler(console_handler)
# 指定日志的最低输出级别,默认为WARN级别
logger.setLevel(logging.DEBUG)
# 移除一些日志处理器
return logger, file_handler
# 获取 logger
logger,_=LogSet()
consumer.receive(deal_msg, timeout_millis=5000, logger=logger)
# 3.2 多线程处理
# 可选 3.2.1 默认模式 或 3.2.2 自定义模式
# 可以使用多线程进行并发消费,处理业务数据,提高效率
# 3.2.1 默认模式
# 默认参数:thread_count=5, timeout_millis=None, logger=None
# 默认 最大线程数thread_count为5个
# 默认 订阅超时限制 timeout_millis(慎用) 为 None, 单位上ms
# 默认 日志收集器logger 为 None
consumer.receive_thread(deal_msg)
# 3.1.2 自定义模式
# 例 1
consumer.receive_thread(deal_msg, 2)
# 例 2
consumer.receive_thread(deal_msg, 10, timeout_millis=5000, logger=logger)
三、使用 pulsar-client(pulsar.Client)连接时
pulsar_thread的create_producer和create_consumer的使用
1. 生产者(producer)
import json
import pulsar_thread as pt
import pulsar
# 使用 pulsar 连接
client = pulsar.Client('pulsar://0.0.0.0:6655')
data = {'name':'jack', 'age':25, 'have': ['item1', 'item2']}
data2 = {'name':'rose', 'age':28, 'have': ['item1', 'item2']}
data3 = {'name':'joe', 'age':28, 'have': ['item1', 'item2']}
data4 = {'name':'mark', 'age':28, 'have': ['item1', 'item2']}
data_dict = {'test1':[json.dumps(data), json.dumps(data2)], 'test2':[json.dumps(data3), json.dumps(data4)]}
# 使用 pulsar_thread 创建生产者
producer = pt.create_producer(client)
result = producer.send(data_dict)
2. 消费者(consumer)
import json
import pulsar_thread as pt
import pulsar
def deal_msg(msg):
print(msg.value())
import time
time.sleep(5)
print(json.loads(msg.data()))
# 使用 pulsar 连接
client = pulsar.Client('pulsar://0.0.0.0:6655')
# 使用 pulsar_thread 创建消费者
consumer = pt.create_consumer(client, ['test1', 'test2'], 'my-subscription')
consumer.receive_thread(deal_msg, 2)
四、关于schema模式
1. schema支持的模式
schema | note |
---|---|
BytesSchema | Get the raw payload as a bytes object. No serialization/deserialization are performed. This is the default schema mode |
StringSchema | Encode/decode payload as a UTF-8 string. Uses str objects |
JsonSchema | Require record definition. Serializes the record into standard JSON payload |
AvroSchema | Require record definition. Serializes in AVRO format |
2. schema参数用法和pulsar-client相同
若想拓展使用schema,请移步至pulsar-client文档,阅读使用schema。 pulsar-client文档链接:https://pulsar.apache.org/docs/en/client-libraries-python/
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
pulsar-thread-0.1.3.tar.gz
(9.4 kB
view details)
File details
Details for the file pulsar-thread-0.1.3.tar.gz
.
File metadata
- Download URL: pulsar-thread-0.1.3.tar.gz
- Upload date:
- Size: 9.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.6.0 importlib_metadata/4.8.1 pkginfo/1.7.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.8.5
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | f9737169c0bd489ec911ef8a968eb75a20cc4c172cffca7f1c3b06759f958571 |
|
MD5 | 3bde27528282b91b83aea8988ccd40f7 |
|
BLAKE2b-256 | 29f21048f79e0484aeb03565c4dfeca3f223ae5b2d77ac5f89b5db3474a4f7b5 |