Doubao Aliyun RocketMQ Python SDK
Project description
DOUBAO-ALIYUN-MQ-SDK
豆包阿里云rocketMQ SDK
安装
pip3 install doubao-aliyun-mq-sdk
或者
python3 setup.py install
使用
from doubao_aliyun_mq import Client as AliyunMQClient
aliyun_mq_client = AliyunMQClient(<http_endpoint>, <access_key>, <secret_key>)
# 获取消费者实例
consumer = aliyun_mq_client.get_consumer(<instance_id>, <topic_name>, <group_id>)
# 获取生产者实例
producer = aliyun_mq_client.get_producer(<instance_id>, <topic_name>)
生产者(producer)
producer.send
发送消息
msg
: 消息内容, 类型strtag
: 消息标签, 默认 ''properties
: 属性, 类型dict, 默认 Nonestart_deliver_time
: 定时消息,毫秒级绝对时间,默认 None
producer.send_json
发送json格式的消息
msg
: 消息内容(json), 类型dicttag
: 消息标签, 默认 ''properties
: 属性, 类型dict, 默认 Nonestart_deliver_time
: 定时消息,毫秒级绝对时间,默认 None
消费者(consumer)
consumer.receive
接收消息
batch
: 一次最多接收条数(最多可设置为16条), 默认 1wait_seconds
: 长轮询时间(最多可设置为30秒), 默认 3
consumer.ack_message
确认消息
recv_msgs
: 接收到的消息列表, 类型 list
consumer.consume
使用with语句实现消费消息(接收并确认)
batch
: 一次最多接收条数(最多可设置为16条), 默认 1wait_seconds
: 长轮询时间(最多可设置为30秒), 默认 3
consumer.consume_decorator
消费装饰器,被装饰的函数第一个参数返回接收到的消息泪飙
batch
: 一次最多接收条数(最多可设置为16条), 默认 1wait_seconds
: 长轮询时间(最多可设置为30秒), 默认 3
示例
from doubao_config import Client as ConfigClient
import time
from doubao_aliyun_mq import Client as AliyunMQClient
# 配合doubao-config使用
config_client = ConfigClient(<config_host>, <config_username>, <config_password>)
config = config_client.get_config(<application>, <profile>)
aliyun_mq_client = AliyunMQClient(
config['base.comm.rocket-mq.onsAddr.digital.http'], config['base.rocket-mq.accessKeyId'],
config['base.rocket-mq.accessKeySecret'])
# 获取消费者实例
consumer = aliyun_mq_client.get_consumer(config['base.comm.rocket-mq.digital.id'], <topic_name>, <group_id>)
# 获取生产者实例
producer = aliyun_mq_client.get_producer(config['base.comm.rocket-mq.digital.id'], <topic_name>)
size = 10
# 发送消息
for i in range(size):
msg = 'test %d' % i
producer.send(msg)
print('send:', msg)
# 接收消息
msgs = consumer.receive(batch=size)
if msgs:
for msg in msgs:
print('receive:', msg.message_id, msg.message_body, msg.message_tag)
# 确认消息消费成功
consumer.ack_message(msgs)
print('ack message:', msgs)
# 发送消息(json)
for i in range(size):
msg = {'test': 1}
producer.send_json(msg)
print('send:', {'test': 1})
# 消费消息(接收消息并确认)
for i in range(size):
with consumer.consume() as msgs:
if msgs:
for msg in msgs:
print('consume:', msg.message_id, msg.message_body, msg.message_tag)
# 发送消息(json)带标签
for i in range(size):
msg = {'test': 1}
producer.send_json(msg, tag='ttt')
print('send:', {'test': 1})
# 消费消息(装饰器)
@consumer.consume_decorator(batch=10)
def test_func(msgs, *args, **kwargs):
print('decorator consume:', msgs)
for msg in msgs:
print('--- message_id', msg.message_id)
print('--- message_tag', msg.message_tag)
print('--- message_body', msg.message_body)
print('--- publish_time', msg.publish_time)
print('--- consumed_times', msg.consumed_times)
print('--- next_consume_time', msg.next_consume_time)
for i in range(size):
test_func()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
File details
Details for the file doubao-aliyun-mq-sdk-1.0.0.tar.gz
.
File metadata
- Download URL: doubao-aliyun-mq-sdk-1.0.0.tar.gz
- Upload date:
- Size: 3.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.22.0 setuptools/45.2.0 requests-toolbelt/0.9.1 tqdm/4.43.0 CPython/3.7.6
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 70268da175a9f30409c61c7d51576e17ed69f2efcf4800124aa9611f1e075a23 |
|
MD5 | 956befec9bd365d67b500f57f0d1512c |
|
BLAKE2b-256 | 464d62bbda0fbcc3dfc0201cade4596d2ce58a1d91f64d6ae2623f876d256456 |