Skip to main content

A Python RocketMQ 4.0+ API based on JPype

Project description

1. Introduce

1.1 Why another python RocketMQ API ?

Currently the existing apis are too outdated. For example:

  • The official python API, which you can install via pip install rocketmq-client-python, is for RocketMQ 2.0 and below, and does not support PullConsumer.

  • An unofficial python API written by messense, which you can install via pip install rocketmq, wraps the same cpp dynamic libraries as the official API, does not support RocketMQ 4.0+ either.

  • The official cpp API now has v5.0-rc1 version, but does not provide pre-compiled binaries, which means you have to compile the project your self. If you’re not familiar bazel tool and cpp class export, it may be a little difficult.

1.2 What’s the features of this API ?

  • It’s developed and tested on RocketMQ 4.0+, so it supports many recent features. For example, custom queue selector when producing messages, custom message selector by tag/sql when consuming messages, etc.

  • It exposes the java API as it is as possible, via JPype. You can directly reference the java doc in most cases.

  • The get method of any java class are wrapped as property of the coresponding python class. For example, calling msg.getTopic() in java is equivalent to calling msg.topic in python.

2. Installation

import jpype
import jpype.imports
jpype.startJVM(classpath=['/path/to/rocketmq-all-4.3.2-bin-release/lib/*',])
from pyrocketmq import *
# do something
jpype.shutdownJVM()

3. QuickStart

3.1 Producer

import json
from pyrocketmq.common.message import Message
from pyrocketmq.client.producer import Producer, SendStatus
pr = Producer('test_producer')
pr.setNamesrvAddr('localhost:9876')
pr.start()
body = json.dumps({'name':'Alice', 'age':1}).encode('utf-8')
msg = Message(topic='test_topic', body=body, tags='girl')
# send, tcp-like, return sendStatus
sr = pr.send(msg)
assert(sr.sendStatus == SendStatus.SEND_OK)
pr.shutdown()

3.2 PullConsumer

import json
from pyrocketmq.client.consumer.consumer import PullConsumer, PullStatus
cs = PullConsumer('test_pull_consumer')
cs.setNamesrvAddr('localhost:9876')
topic = 'test_topic'
cs.start()
# pull messages from each queue
mqs = cs.fetchSubscribeMessageQueues(topic)
for mq in mqs:
    ofs = cs.minOffset(mq)
    pr = cs.pull(mq, subExpression='girl', offset=ofs, maxNums=1)
    if pr.pullStatus == PullStatus.FOUND:
        # iterate msg in pull result
        for msg in pr:
            print(json.loads(msg.body))
cs.shutdown()

3.3 PushConsumer

import json
import time
from typing import List
from pyrocketmq.client.consumer.listener import ConsumeConcurrentlyContext, ConsumeConcurrentlyStatus, MessageListenerConcurrently
from pyrocketmq.client.consumer.consumer import MessageSelector, PushConsumer
from pyrocketmq.common.common import ConsumeFromWhere
from pyrocketmq.common.message import MessageExt

# subclass MessageListenerConcurrently to write your own consume action
class MyMessageListenerConcurrently(MessageListenerConcurrently):
    def _consumeMessage(self, msgs:List[MessageExt], context:ConsumeConcurrentlyContext) -> ConsumeConcurrentlyStatus:
        print('Concurrently', context.ackIndex)
        for msg in msgs:
            print(json.loads(msg.body))
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS

cs = PushConsumer('test_push_consumer')
cs.setNamesrvAddr('localhost:9876')
selector = MessageSelector.byTag('girl')
ml = MyMessageListenerConcurrently()
cs.registerMessageListener(ml)
cs.subscribe('test_topic', selector)
cs.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET)
cs.start()
time.sleep(5)
cs.shutdown()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyrocketmq-0.3.4.tar.gz (24.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyrocketmq-0.3.4-py3-none-any.whl (25.5 kB view details)

Uploaded Python 3

File details

Details for the file pyrocketmq-0.3.4.tar.gz.

File metadata

  • Download URL: pyrocketmq-0.3.4.tar.gz
  • Upload date:
  • Size: 24.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.9.6

File hashes

Hashes for pyrocketmq-0.3.4.tar.gz
Algorithm Hash digest
SHA256 c4aee4b9af2466ebf531b343b807711700d50ae811e7869097f2ee1b5302bda5
MD5 0d0079c9398b0a794f368eadc0c9ded3
BLAKE2b-256 953cc8462891cdfc489d0aa5d1f8e46d3f3c3b29a6a08028161fa4491202d0f5

See more details on using hashes here.

File details

Details for the file pyrocketmq-0.3.4-py3-none-any.whl.

File metadata

  • Download URL: pyrocketmq-0.3.4-py3-none-any.whl
  • Upload date:
  • Size: 25.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.9.6

File hashes

Hashes for pyrocketmq-0.3.4-py3-none-any.whl
Algorithm Hash digest
SHA256 134dcd2319163f929c03cbca25d805408243468200cbf1eb859a916c5d95d54a
MD5 c662d73862fbdfb294564a924c4dce3d
BLAKE2b-256 e2f3929e0aa45d128a9a62b02725cbf7c9f43ddd70e9b7c88037115f11588205

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page