python client for elbus (sync)
Project description
Python sync client for ELBUS
The module contains Python sync client for ELBUS
Installation
pip3 install elbus
Usage examples
Listener
import elbus
import time
# frame handler
def on_frame(frame):
print('Frame:', hex(frame.type), frame.sender, frame.topic, frame.payload)
name = 'test.client.python.sync'
# create new ELBUS client and connect
bus = elbus.client.Client('/tmp/elbus.sock', name)
bus.on_frame = on_frame
bus.connect()
# subscribe to all topics
bus.subscribe('#').wait_completed()
# wait for frames
print(f'listening for messages to {name}...')
while bus.is_connected():
time.sleep(0.1)
Sender
import elbus
name = 'test.client.python.sender'
bus = elbus.client.Client('/tmp/elbus.sock', name)
bus.connect()
# send a regular message
print(
hex(
bus.send('test.client.python.sync',
elbus.client.Frame('hello')).wait_completed()))
# send a broadcast message
print(
hex(
bus.send(
'test.*',
elbus.client.Frame('hello everyone',
tp=elbus.client.OP_BROADCAST)).wait_completed()))
# publish to a topic with zero QoS (no confirmation required)
bus.send('test/topic',
elbus.client.Frame('something', tp=elbus.client.OP_PUBLISH, qos=0))
RPC layer
Handler
import elbus
import time
import msgpack
# frame handler (topics/broadcasts)
def on_frame(frame):
print('Frame:', hex(frame.type), frame.sender, frame.topic, frame.payload)
# RPC notification handler
def on_notification(event):
print('Notification:', event.frame.sender, event.get_payload())
# RPC call handler
def on_call(event):
# consider payload is encoded in msgpack
print('Call:', event.frame.sender, event.method,
msgpack.loads(event.get_payload(), raw=False))
# msgpack reply
return msgpack.dumps({'ok': True})
name = 'test.client.python.sync.rpc'
# create new ELBUS client and connect
bus = elbus.client.Client('/tmp/elbus.sock', name)
bus.connect()
# subscribe to all topics
bus.subscribe('#').wait_completed()
# init rpc
rpc = elbus.rpc.Rpc(bus)
rpc.on_frame = on_frame
rpc.on_notification = on_notification
rpc.on_call = on_call
# wait for frames
print(f'listening for messages/calls to {name}...')
while rpc.is_connected():
time.sleep(0.1)
Caller
import elbus
import msgpack
name = 'test.client.python.sync.rpc.caller'
# create new ELBUS client and connect
bus = elbus.client.Client('/tmp/elbus.sock', name)
bus.connect()
# init rpc
rpc = elbus.rpc.Rpc(bus)
params = {'hello': 123}
# call a method, no reply required
rpc.call0('test.client.python.sync.rpc',
elbus.rpc.Request('test', msgpack.dumps(params))).wait_completed()
# call a method and wait for the reply
result = rpc.call('test.client.python.sync.rpc',
elbus.rpc.Request('test',
msgpack.dumps(params))).wait_completed()
print(msgpack.loads(result.get_payload(), raw=False))
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
elbus-0.0.13.tar.gz
(6.7 kB
view hashes)