Doubao Aliyun RocketMQ Python SDK
Project description
DOUBAO-ALIYUN-MQ-SDK
豆包阿里云rocketMQ SDK
安装
pip3 install doubao-aliyun-mq-sdk
或者
python3 setup.py install
使用
from doubao_aliyun_mq import Client as AliyunMQClient
aliyun_mq_client = AliyunMQClient(<http_endpoint>, <access_key>, <secret_key>)
# 获取消费者实例
consumer = aliyun_mq_client.get_consumer(<instance_id>, <topic_name>, <group_id>)
# 获取生产者实例
producer = aliyun_mq_client.get_producer(<instance_id>, <topic_name>)
生产者(producer)
producer.send
发送消息
msg
: 消息内容, 类型strtag
: 消息标签, 默认 ''properties
: 属性, 类型dict, 默认 Nonestart_deliver_time
: 定时消息,毫秒级绝对时间,默认 None
producer.send_json
发送json格式的消息
msg
: 消息内容(json), 类型dicttag
: 消息标签, 默认 ''properties
: 属性, 类型dict, 默认 Nonestart_deliver_time
: 定时消息,毫秒级绝对时间,默认 None
消费者(consumer)
consumer.receive
接收消息
batch
: 一次最多接收条数(最多可设置为16条), 默认 1wait_seconds
: 长轮询时间(最多可设置为30秒), 默认 3
consumer.ack_message
确认消息
recv_msgs
: 接收到的消息列表, 类型 list
consumer.consume
使用with语句实现消费消息(接收并确认)
batch
: 一次最多接收条数(最多可设置为16条), 默认 1wait_seconds
: 长轮询时间(最多可设置为30秒), 默认 3
consumer.consume_decorator
消费装饰器,被装饰的函数第一个参数返回接收到的消息泪飙
batch
: 一次最多接收条数(最多可设置为16条), 默认 1wait_seconds
: 长轮询时间(最多可设置为30秒), 默认 3
示例
from doubao_config import Client as ConfigClient
import time
from doubao_aliyun_mq import Client as AliyunMQClient
# 配合doubao-config使用
config_client = ConfigClient(<config_host>, <config_username>, <config_password>)
config = config_client.get_config(<application>, <profile>)
aliyun_mq_client = AliyunMQClient(
config['base.comm.rocket-mq.onsAddr.digital.http'], config['base.rocket-mq.accessKeyId'],
config['base.rocket-mq.accessKeySecret'])
# 获取消费者实例
consumer = aliyun_mq_client.get_consumer(config['base.comm.rocket-mq.digital.id'], <topic_name>, <group_id>)
# 获取生产者实例
producer = aliyun_mq_client.get_producer(config['base.comm.rocket-mq.digital.id'], <topic_name>)
size = 10
# 发送消息
for i in range(size):
msg = 'test %d' % i
producer.send(msg)
print('send:', msg)
# 接收消息
msgs = consumer.receive(batch=size)
if msgs:
for msg in msgs:
print('receive:', msg.message_id, msg.message_body, msg.message_tag)
# 确认消息消费成功
consumer.ack_message(msgs)
print('ack message:', msgs)
# 发送消息(json)
for i in range(size):
msg = {'test': 1}
producer.send_json(msg)
print('send:', {'test': 1})
# 消费消息(接收消息并确认)
for i in range(size):
with consumer.consume() as msgs:
if msgs:
for msg in msgs:
print('consume:', msg.message_id, msg.message_body, msg.message_tag)
# 发送消息(json)带标签
for i in range(size):
msg = {'test': 1}
producer.send_json(msg, tag='ttt')
print('send:', {'test': 1})
# 消费消息(装饰器)
@consumer.consume_decorator(batch=10)
def test_func(msgs, *args, **kwargs):
print('decorator consume:', msgs)
for msg in msgs:
print('--- message_id', msg.message_id)
print('--- message_tag', msg.message_tag)
print('--- message_body', msg.message_body)
print('--- publish_time', msg.publish_time)
print('--- consumed_times', msg.consumed_times)
print('--- next_consume_time', msg.next_consume_time)
for i in range(size):
test_func()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Close
Hashes for doubao-aliyun-mq-sdk-1.0.0.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 70268da175a9f30409c61c7d51576e17ed69f2efcf4800124aa9611f1e075a23 |
|
MD5 | 956befec9bd365d67b500f57f0d1512c |
|
BLAKE2b-256 | 464d62bbda0fbcc3dfc0201cade4596d2ce58a1d91f64d6ae2623f876d256456 |