Skip to main content

Doubao Aliyun RocketMQ Python SDK

Project description

DOUBAO-ALIYUN-MQ-SDK

豆包阿里云rocketMQ SDK

安装

pip3 install doubao-aliyun-mq-sdk

或者

python3 setup.py install

使用

from doubao_aliyun_mq import Client as AliyunMQClient


aliyun_mq_client = AliyunMQClient(<http_endpoint>, <access_key>, <secret_key>)
# 获取消费者实例
consumer = aliyun_mq_client.get_consumer(<instance_id>, <topic_name>, <group_id>)
# 获取生产者实例
producer = aliyun_mq_client.get_producer(<instance_id>, <topic_name>)

生产者(producer)

producer.send

发送消息

  • msg: 消息内容, 类型str
  • tag: 消息标签, 默认 ''
  • properties: 属性, 类型dict, 默认 None
  • start_deliver_time: 定时消息,毫秒级绝对时间,默认 None

producer.send_json

发送json格式的消息

  • msg: 消息内容(json), 类型dict
  • tag: 消息标签, 默认 ''
  • properties: 属性, 类型dict, 默认 None
  • start_deliver_time: 定时消息,毫秒级绝对时间,默认 None

消费者(consumer)

consumer.receive

接收消息

  • batch: 一次最多接收条数(最多可设置为16条), 默认 1
  • wait_seconds: 长轮询时间(最多可设置为30秒), 默认 3

consumer.ack_message

确认消息

  • recv_msgs: 接收到的消息列表, 类型 list

consumer.consume

使用with语句实现消费消息(接收并确认)

  • batch: 一次最多接收条数(最多可设置为16条), 默认 1
  • wait_seconds: 长轮询时间(最多可设置为30秒), 默认 3

consumer.consume_decorator

消费装饰器,被装饰的函数第一个参数返回接收到的消息泪飙

  • batch: 一次最多接收条数(最多可设置为16条), 默认 1
  • wait_seconds: 长轮询时间(最多可设置为30秒), 默认 3

示例

from doubao_config import Client as ConfigClient
import time
from doubao_aliyun_mq import Client as AliyunMQClient


# 配合doubao-config使用
config_client = ConfigClient(<config_host>, <config_username>, <config_password>)
config = config_client.get_config(<application>, <profile>)

aliyun_mq_client = AliyunMQClient(
    config['base.comm.rocket-mq.onsAddr.digital.http'], config['base.rocket-mq.accessKeyId'],
    config['base.rocket-mq.accessKeySecret'])

# 获取消费者实例
consumer = aliyun_mq_client.get_consumer(config['base.comm.rocket-mq.digital.id'], <topic_name>, <group_id>)
# 获取生产者实例
producer = aliyun_mq_client.get_producer(config['base.comm.rocket-mq.digital.id'], <topic_name>)

size = 10

# 发送消息
for i in range(size):
    msg = 'test %d' % i
    producer.send(msg)
    print('send:', msg)

# 接收消息
msgs = consumer.receive(batch=size)
if msgs:
    for msg in msgs:
        print('receive:', msg.message_id, msg.message_body, msg.message_tag)

# 确认消息消费成功
consumer.ack_message(msgs)
print('ack message:', msgs)

# 发送消息(json)
for i in range(size):
    msg = {'test': 1}
    producer.send_json(msg)
    print('send:', {'test': 1})

# 消费消息(接收消息并确认)
for i in range(size):
    with consumer.consume() as msgs:
        if msgs:
            for msg in msgs:
                print('consume:', msg.message_id, msg.message_body, msg.message_tag)

# 发送消息(json)带标签
for i in range(size):
    msg = {'test': 1}
    producer.send_json(msg, tag='ttt')
    print('send:', {'test': 1})


# 消费消息(装饰器)
@consumer.consume_decorator(batch=10)
def test_func(msgs, *args, **kwargs):
    print('decorator consume:', msgs)
    for msg in msgs:
        print('--- message_id', msg.message_id)
        print('--- message_tag', msg.message_tag)
        print('--- message_body', msg.message_body)
        print('--- publish_time', msg.publish_time)
        print('--- consumed_times', msg.consumed_times)
        print('--- next_consume_time', msg.next_consume_time)

for i in range(size):
    test_func()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

doubao-aliyun-mq-sdk-1.0.0.tar.gz (3.9 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page