Skip to main content

No project description provided

Project description

项目简介

  • 项目代号:EchoWall-回音壁
  • 项目定位:通用消息中间件兼容层
  • 项目功能:消息中间件配置,消息、消费者、生产者的统一封装
  • 详细描述: 目前仅针对 RocketMQ 进行封装。

rocket_mq

  • 版本
  • python 3.6

打包与安装(whl)

  • 打包: python setup.py sdist bdist_wheel
  • 安装: pip install echowall-2.6.0-py3-none-any.whl

打包与安装(egg)

  • 打包: python setup.py bdist_egg
  • 安装 egg:
    • 解压 pf_mq_sdk-0.1-py3.6.egg 源码包
    • cd pf_mq_sdk-0.1-py3.6.egg
    • python setup.py install

依赖环境

  • librocketmq
    • centos 7
        wget https://github.com/apache/rocketmq-client-cpp/releases/download/2.2.0/rocketmq-client-cpp-2.2.0-centos7.x86_64.rpm
        sudo rpm -ivh rocketmq-client-cpp-2.2.0-centos7.x86_64.rpm
    
    • debian
        wget https://github.com/apache/rocketmq-client-cpp/releases/download/2.2.0/rocketmq-client-cpp-2.2.0.amd64.deb
        sudo dpkg -i rocketmq-client-cpp-2.2.0.amd64.deb
    
    • macOS
        wget https://github.com/apache/rocketmq-client-cpp/releases/download/2.2.0/rocketmq-client-cpp-2.2.0-bin-release-darwin.tar.gz
        tar -xzf rocketmq-client-cpp-2.2.0-bin-release-darwin.tar.gz
        cd rocketmq-client-cpp
        mkdir /usr/local/include/rocketmq
        cp include/* /usr/local/include/rocketmq
        cp lib/* /usr/local/lib
        install_name_tool -id "@rpath/librocketmq.dylib" /usr/local/lib/librocketmq.dylib
    
  • 详细文档: https://github.com/apache/rocketmq-client-python

rocket_mq 服务

  • 本地部署
        docker run -itd \
        --name=rocketmq \
        --hostname rocketmq \
        --restart=always \
        -p 8080:8080 \
        -p 9876:9876 \
        -p 10909:10909 \
        -p 10911:10911 \
        -p 10912:10912 \
        -v rocketmq_data:/home/app/data \
        -v /etc/localtime:/etc/localtime \
        -v /var/run/docker.sock:/var/run/docker.sock \
        xuchengen/rocketmq:latest
    
  • 常用命令
    # 进入docker
    docker exec -it rocketmq /bin/bash
    
    # 配置参考 /home/app/rocketmq/conf/broker.conf
    brokerClusterName = DefaultCluster
    brokerName = broker-a
    brokerId = 0
    deleteWhen = 04
    fileReservedTime = 48
    brokerRole = ASYNC_MASTER
    flushDiskType = ASYNC_FLUSH
    autoCreateTopicEnable = True
    namesrvAddr = 127.0.0.1:9876
    brokerIP1 = 127.0.0.1 # 必填
    
    # 启动命令
    nohup /bin/sh /home/app/rocketmq/bin/mqnamesrv &
    nohup /bin/sh /home/app/rocketmq/bin/mqbroker -c /home/app/rocketmq/conf/broker.conf -n 127.0.0.1:9876 &
    
    # 服务停止
    /bin/sh /home/app/rocketmq/bin/mqshutdown broker
    /bin/sh /home/app/rocketmq/bin/mqshutdown namesrv

使用方式

  1. 集成 EchoWall 项目

    1. requirements.txt 添加依赖项: echowall 或者 直接
      pip install echowall
      
    2. 增加MessageQueue(Rocketmq server 配置)
      # 消息队列配置信息
      MQ_HOST = 172.18.0.76
      MQ_PORT = 9876
      MQ_TIMEOUT = 6000
      MQ_GROUP_NAME = SDB-GROUP-MAIN    
      
    3. flask项目中,配置文件(config.py )增加对应变量解析
      #消息队列配置信息
      MQ_HOST = os.getenv("MQ_HOST")
      MQ_PORT = os.getenv("MQ_PORT")
      MQ_TIMEOUT = os.getenv("MQ_TIMEOUT")
      MQ_GROUP_NAME = os.getenv("MQ_GROUP_NAME")`
      
    4. flask 项目中, extensions.py 增加 echowall 项目的 初始化变量
      from echowall import (
          EWConfig,
          EWClient
      )
      from base.app_context import auto_app_contexts
      
      def init_main_mq():
          """
              初始化 EchoWall,启动生产者
          :return:
          """
          from flask import current_app as app
          from apps.commons.message_queue import set_message_record
      
          @auto_app_contexts()
          def _fun(_app):
              conf = EWConfig()
              conf.host = _app.config['MQ_HOST']
              conf.port = int(_app.config['MQ_PORT'])
              conf.timeout = int(_app.config.get('MQ_TIMEOUT', '6000'))
              conf.compress_level = 5
      
              # 生产者发送重试
              conf.send_retry_count = 2
              conf.send_retry_backoff = 1
      
              # 生产者发送消息落库
              conf.record_func = set_message_record
              conf.record_only_success = False
      
              # 消费相关
              conf.consumer_thread_count = 1
      
              rmq_client = EWClient(conf)
      
              group_name = app.config['MQ_GROUP_NAME']
      
              rmq_client.start_producer(group_name)
      
              return rmq_client
      
          return _fun(app)
      
    5. 配置topic(参见Topic命名规范)
  2. 发送消息

    try:
        message = mq_client.new_message(MessageTopic.sdb_bu_main.value)
        message.keys = 'send-1'
        message.tags = 'send-1'
        message.body = 'send-%s' % (str(datetime.datetime.now()))
        send_message(message)
    except Exception as e:
        print(traceback.format_exc())
    
  3. 接收消息
    1. 定义 处理消息的回调函数
      import traceback
      
      from echowall import EWConsumeStatus
      
      from apps.extensions import ew
      
      @ew.pc_callback
      def sdb_callback(msg):
          try:
              print(type(msg))
              print(str(msg))
              body = msg.body.decode('utf-8')
              print('Message body is: {}'.format(body))
              print(msg.latest_send_timestamp)
              print(msg.version)
              return EWConsumeStatus.CONSUME_SUCCESS
          except Exception as e:
              print(e)
              print(traceback.format_exc())
              return EWConsumeStatus.RECONSUME_LATER
      
    2. 注册回调函数,启动消费者
      # start push consumers example
      settings = [
          ('GID_1', 'TopicTest', '*', 'sdb_callback')
      ]
      ew = echo_wall
      rmq_client.bind_echo_wall(ew)
      rmq_client.set_push_consumers_with_echo_wall(settings)
      rmq_client.start_push_consumers()
      while True:
          sleep(3)  
      

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

echowall-2.6.7.tar.gz (17.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

echowall-2.6.7-py3-none-any.whl (22.7 kB view details)

Uploaded Python 3

File details

Details for the file echowall-2.6.7.tar.gz.

File metadata

  • Download URL: echowall-2.6.7.tar.gz
  • Upload date:
  • Size: 17.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.8.0 pkginfo/1.8.3 readme-renderer/34.0 requests/2.27.1 requests-toolbelt/0.10.0 urllib3/1.26.12 tqdm/4.64.1 importlib-metadata/4.8.3 keyring/23.4.1 rfc3986/1.5.0 colorama/0.4.5 CPython/3.6.12

File hashes

Hashes for echowall-2.6.7.tar.gz
Algorithm Hash digest
SHA256 3790dda6a54678a590b7baa57ec417256c6f4b401b35546901ce38f5d0332e4e
MD5 86f9a05d8087f7dc5057880f3939e4fc
BLAKE2b-256 d4f7a062bd247d3fecd94d4599c50e75596d4782387c7af936d83c1591ac4ae6

See more details on using hashes here.

File details

Details for the file echowall-2.6.7-py3-none-any.whl.

File metadata

  • Download URL: echowall-2.6.7-py3-none-any.whl
  • Upload date:
  • Size: 22.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.8.0 pkginfo/1.8.3 readme-renderer/34.0 requests/2.27.1 requests-toolbelt/0.10.0 urllib3/1.26.12 tqdm/4.64.1 importlib-metadata/4.8.3 keyring/23.4.1 rfc3986/1.5.0 colorama/0.4.5 CPython/3.6.12

File hashes

Hashes for echowall-2.6.7-py3-none-any.whl
Algorithm Hash digest
SHA256 ff282b23384c3d862433065c507aa45d016d601d92d5b99b3443d5339c4388ce
MD5 6c5a52170d5a2d284ba838c3087e88dd
BLAKE2b-256 de66f5e5163203a25e2c77d80ce74906be9b4d9af7c3086bf7dad3cf26d84847

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page