Skip to main content

Aqara Message Push SDK

Project description

aqara_msg_push

SDK for Aqara Message Push

CodeFactor

Package has

  • Pydantic models for Message Push Format
  • function for mapping custom json from message queue to Pydantic model
  • Simple consumer for RocketMQ with Aqara confiquration

Install

Library installs with command:

pip install aqara-msg-push

For consuming you will also need:

Example of Usage:

from aqara_msg_push import MessageConsumer, TopLevelMessage
from rocketmq.client import ReceivedMessage, ConsumeStatus

# Your params from https://developer.aqara.com/console/overview
ADDRESS = 'some-broker1.aqara.com:9876'
TOPIC = '99...99' 
GROUP_ID = '99..ce'
ACCESS_KEY = 'K....'
SECRET_KEY = 'nt..jg'

KEY_ID = ACCESS_KEY


class MyMessageConsumer(MessageConsumer):

    def callback(self, obj: TopLevelMessage, raw_msg: ReceivedMessage) -> ConsumeStatus:
        print(type(obj), obj.data)
        return ConsumeStatus.CONSUME_SUCCESS


MyMessageConsumer(
    group_id=GROUP_ID,
    access_key=ACCESS_KEY,
    secret_key=SECRET_KEY,
    topic=TOPIC,
    address=ADDRESS,
).consume()


"""
Output:

<class 'aqara_msg_push.schema.toplevel.DeviceAttributeMessage'> [DeviceStateMessage(raw_time='1659704748378', subject_id='lumi.9999999', resource_id='4.1.85', value='1', status_code=<TriggerResult.SUCCESS: 0>, trigger_source=TriggerSource(raw_time='1659704748', type=<TriggerType.CLOUD_REMOTE_COMMAND_CONTROL: 4>, id=None), attach=None)]
<class 'aqara_msg_push.schema.toplevel.DeviceAttributeMessage'> [DeviceStateMessage(raw_time='1659704750046', subject_id='lumi.9999999', resource_id='4.1.85', value='0', status_code=<TriggerResult.SUCCESS: 0>, trigger_source=TriggerSource(raw_time='1659704749', type=<TriggerType.CLOUD_REMOTE_COMMAND_CONTROL: 4>, id=None), attach=None)]

"""

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

aqara_msg_push-1.0.2-py2.py3-none-any.whl (10.4 kB view details)

Uploaded Python 2 Python 3

File details

Details for the file aqara_msg_push-1.0.2-py2.py3-none-any.whl.

File metadata

File hashes

Hashes for aqara_msg_push-1.0.2-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 e3a9c3b1df1cca6ccb7d119f9a265a3ebc9916a21e1d0c911312997ef8d2a456
MD5 ac6abfb28ae738fae06c11816d1d6695
BLAKE2b-256 23644ba5eccb6250bfbbb5664e204471b76b2184e639b6e484a769bd02360dff

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page