Python ali rocketMQ api
Project description
ali rocketMQ for python
说明
官方so路径要加到LD_LIBRARY_PATH中
调用示例
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : HaiFeng
# @Email : 24918700@qq.com
# @Time : 2019/1/16
# @desc : demo ali rocketmq
import sys
import time
from py_rmq.rmq_consumer import Consumer
from py_rmq.rmq_producer import Producer
from py_rmq.rmq_consumer import OrderConsumer
from py_rmq.rmq_producer import OrderProducer
def OnConsumer(topic: str, tag: str, key: str, id: str, deliver_time: int, body: str, reconsume_times: int, store_time: int, offset: int):
print(body)
if __name__ == '__main__':
t:str = ''
id:str = ''
access:str = ''
secret:str = ''
topic:str = ''
if len(sys.argv) > 1:
t = sys.argv[1].lower()
if len(sys.argv) > 2:
id = sys.argv[2]
if len(sys.argv) > 3:
access = sys.argv[3]
if len(sys.argv) > 4:
secret = sys.argv[4]
if len(sys.argv) > 5:
topic = sys.argv[5]
if t == '':
t = input('select consumer or producer:').lower()
if id == '':
id = input('input cid or pid:')
if access == '':
access = input('input acces key:')
if secret == '':
secret = input('input secret key:')
if topic == '':
topic = input('input topic:')
if t == 'consumer':
print('test consumer...')
c = Consumer(id, access, secret)
c.OnConsumer = OnConsumer
c.subscribe(topic, '*')
input('press enter to continue test order consumer')
c.close()
print('test order consumer...')
c = OrderConsumer(id, access, secret)
c.OnConsumer = OnConsumer
c.subscribe(topic, '*')
input('press enter exit')
c.close()
else:
tag = 'tag test'
key = time.strftime('%Y%m%d%H%M%S', time.localtime())
print('test producer...')
p = Producer(id, access, secret)
msg = input('input content: ')
rst = p.send_msg(topic, tag, key, msg)
print(f'result: {rst}')
print('test delive msg, delive 30s')
rst = p.send_msg(topic, tag, key, msg + 'delive', int(time.time() * 1000) + 30000)
print(f'delive msg result: {rst}')
p.close()
input('press enter continue test order producer')
p = OrderProducer(id, access, secret)
sharding_key = input('input sharding_key: ')
rst = p.send_msg(topic, tag, key, msg, sharding_key)
print(f'order msg result: {rst}')
p.close()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
py_rmq-0.0.5.tar.gz
(26.9 kB
view details)
File details
Details for the file py_rmq-0.0.5.tar.gz
.
File metadata
- Download URL: py_rmq-0.0.5.tar.gz
- Upload date:
- Size: 26.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/1.12.1 pkginfo/1.4.2 requests/2.19.1 setuptools/40.2.0 requests-toolbelt/0.8.0 tqdm/4.26.0 CPython/3.7.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | d3a54eebb3b3c99eaf6ba3c379c8606638289a5d97fa9e2b33ef6d3f5c5f4771 |
|
MD5 | 3b113804f4aecf788efea46e27b38bb8 |
|
BLAKE2b-256 | 837f5e10b63451a36de67dd9b3e12cb3bdb25a1ece87afa9bfd43b68ad0367cf |