Skip to main content

Python ali rocketMQ api

Project description

ali rocketMQ for python

说明

官方so路径要加到LD_LIBRARY_PATH中

调用示例

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : HaiFeng
# @Email   : 24918700@qq.com
# @Time    : 2019/1/16
# @desc    : demo ali rocketmq

import sys
import time
from rmq_consumer import Consumer
from rmq_producer import Producer
from rmq_consumer import OrderConsumer
from rmq_producer import OrderProducer

def OnConsumer(topic: str, tag: str, key: str, id: str, deliver_time: int, body: str, reconsume_times: int, store_time: int, offset: int):
    print(body)


if __name__ == '__main__':

    t:str = ''
    id:str = ''
    access:str = ''
    secret:str = ''
    topic:str = ''

    if len(sys.argv) > 1:
        t = sys.argv[1].lower()
        if len(sys.argv) > 2:
            id = sys.argv[2]
            if len(sys.argv) > 3:
                access = sys.argv[3]
                if len(sys.argv) > 4:
                    secret = sys.argv[4]
                    if len(sys.argv) > 5:
                        topic = sys.argv[5]
    if t == '':
        t = input('select consumer or producer:').lower()
    if id == '':
        id = input('input cid or pid:')
    if access == '':
        access = input('input acces key:')
    if secret == '':
        secret = input('input secret key:')
    if topic == '':
        topic = input('input topic:')
    if t == 'consumer':
        print('test consumer...')
        c = Consumer(id, access, secret)
        c.OnConsumer = OnConsumer
        c.subscribe(topic, '*')
        input('press enter to continue test order consumer')
        c.close()

        print('test order consumer...')
        c = OrderConsumer(id, access, secret)
        c.OnConsumer = OnConsumer
        c.subscribe(topic, '*')
        input('press enter exit')
        c.close()
    else:
        tag = 'tag test'

        key = time.strftime('%Y%m%d%H%M%S', time.localtime())

        print('test producer...')
        p = Producer(id, access, secret)
        msg = input('input content: ')
        rst = p.send_msg(topic, tag, key, msg)
        print(f'result: {rst}')
        print('test delive msg, delive 30s')
        rst = p.send_msg(topic, tag, key, msg + 'delive', int(time.time() * 1000) + 30000)
        print(f'delive msg result: {rst}')
        p.close()

        input('press enter continue test order producer')
        p = OrderProducer(id, access, secret)
        sharding_key = input('input sharding_key: ')
        rst = p.send_msg(topic, tag, key, msg, sharding_key)
        print(f'order msg result: {rst}')
        p.close()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

py_rmq-0.0.3.tar.gz (26.9 kB view details)

Uploaded Source

File details

Details for the file py_rmq-0.0.3.tar.gz.

File metadata

  • Download URL: py_rmq-0.0.3.tar.gz
  • Upload date:
  • Size: 26.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.12.1 pkginfo/1.4.2 requests/2.19.1 setuptools/40.2.0 requests-toolbelt/0.8.0 tqdm/4.26.0 CPython/3.7.0

File hashes

Hashes for py_rmq-0.0.3.tar.gz
Algorithm Hash digest
SHA256 ceae6a9455cb9c422d106684bf18dcf446894559aed2c41707831904a0226552
MD5 4cc36f2a18f4a1520fcd3bb7a82eef98
BLAKE2b-256 4c1aab52261d208000ce10251682699035c0d0bb55cd4d914503aab6cb053863

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page