Skip to main content

Doubao Aliyun RocketMQ Python SDK

Project description

DOUBAO-ALIYUN-MQ-SDK

豆包阿里云rocketMQ SDK

安装

pip3 install doubao-aliyun-mq-sdk

或者

python3 setup.py install

使用

from doubao_aliyun_mq import Client as AliyunMQClient


aliyun_mq_client = AliyunMQClient(<http_endpoint>, <access_key>, <secret_key>)
# 获取消费者实例
consumer = aliyun_mq_client.get_consumer(<instance_id>, <topic_name>, <group_id>)
# 获取生产者实例
producer = aliyun_mq_client.get_producer(<instance_id>, <topic_name>)

生产者(producer)

producer.send

发送消息

  • msg: 消息内容, 类型str
  • tag: 消息标签, 默认 ''
  • properties: 属性, 类型dict, 默认 None
  • start_deliver_time: 定时消息,毫秒级绝对时间,默认 None

producer.send_json

发送json格式的消息

  • msg: 消息内容(json), 类型dict
  • tag: 消息标签, 默认 ''
  • properties: 属性, 类型dict, 默认 None
  • start_deliver_time: 定时消息,毫秒级绝对时间,默认 None

消费者(consumer)

consumer.receive

接收消息

  • batch: 一次最多接收条数(最多可设置为16条), 默认 1
  • wait_seconds: 长轮询时间(最多可设置为30秒), 默认 3

consumer.ack_message

确认消息

  • recv_msgs: 接收到的消息列表, 类型 list

consumer.consume

使用with语句实现消费消息(接收并确认)

  • batch: 一次最多接收条数(最多可设置为16条), 默认 1
  • wait_seconds: 长轮询时间(最多可设置为30秒), 默认 3

consumer.consume_decorator

消费装饰器,被装饰的函数第一个参数返回接收到的消息泪飙

  • batch: 一次最多接收条数(最多可设置为16条), 默认 1
  • wait_seconds: 长轮询时间(最多可设置为30秒), 默认 3

示例

from doubao_config import Client as ConfigClient
import time
from doubao_aliyun_mq import Client as AliyunMQClient


# 配合doubao-config使用
config_client = ConfigClient(<config_host>, <config_username>, <config_password>)
config = config_client.get_config(<application>, <profile>)

aliyun_mq_client = AliyunMQClient(
    config['base.comm.rocket-mq.onsAddr.digital.http'], config['base.rocket-mq.accessKeyId'],
    config['base.rocket-mq.accessKeySecret'])

# 获取消费者实例
consumer = aliyun_mq_client.get_consumer(config['base.comm.rocket-mq.digital.id'], <topic_name>, <group_id>)
# 获取生产者实例
producer = aliyun_mq_client.get_producer(config['base.comm.rocket-mq.digital.id'], <topic_name>)

size = 10

# 发送消息
for i in range(size):
    msg = 'test %d' % i
    producer.send(msg)
    print('send:', msg)

# 接收消息
msgs = consumer.receive(batch=size)
if msgs:
    for msg in msgs:
        print('receive:', msg.message_id, msg.message_body, msg.message_tag)

# 确认消息消费成功
consumer.ack_message(msgs)
print('ack message:', msgs)

# 发送消息(json)
for i in range(size):
    msg = {'test': 1}
    producer.send_json(msg)
    print('send:', {'test': 1})

# 消费消息(接收消息并确认)
for i in range(size):
    with consumer.consume() as msgs:
        if msgs:
            for msg in msgs:
                print('consume:', msg.message_id, msg.message_body, msg.message_tag)

# 发送消息(json)带标签
for i in range(size):
    msg = {'test': 1}
    producer.send_json(msg, tag='ttt')
    print('send:', {'test': 1})


# 消费消息(装饰器)
@consumer.consume_decorator(batch=10)
def test_func(msgs, *args, **kwargs):
    print('decorator consume:', msgs)
    for msg in msgs:
        print('--- message_id', msg.message_id)
        print('--- message_tag', msg.message_tag)
        print('--- message_body', msg.message_body)
        print('--- publish_time', msg.publish_time)
        print('--- consumed_times', msg.consumed_times)
        print('--- next_consume_time', msg.next_consume_time)

for i in range(size):
    test_func()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

doubao-aliyun-mq-sdk-1.0.0.tar.gz (3.9 kB view details)

Uploaded Source

File details

Details for the file doubao-aliyun-mq-sdk-1.0.0.tar.gz.

File metadata

  • Download URL: doubao-aliyun-mq-sdk-1.0.0.tar.gz
  • Upload date:
  • Size: 3.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.22.0 setuptools/45.2.0 requests-toolbelt/0.9.1 tqdm/4.43.0 CPython/3.7.6

File hashes

Hashes for doubao-aliyun-mq-sdk-1.0.0.tar.gz
Algorithm Hash digest
SHA256 70268da175a9f30409c61c7d51576e17ed69f2efcf4800124aa9611f1e075a23
MD5 956befec9bd365d67b500f57f0d1512c
BLAKE2b-256 464d62bbda0fbcc3dfc0201cade4596d2ce58a1d91f64d6ae2623f876d256456

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page