Python wrapper for coinesAPI
Project description
coinespy
allows users to access the Bosch Sensortec Application Board using Python
- Control VDD and VDDIO of sensor
- Configure SPI and I2C bus parameters
- Read and write into registers of sensors from Bosch Sensortec via SPI and I2C
- Read and write digital pins of the Application Board.
Example for read data Requirements
HW: https://www.bosch-sensortec.com/software-tools/tools/application-board-3-0/
SW: Windows/Linux/MacOS
Recommended using system 64bit
This is a simple example of BMI085 Interrupt streaming using coinespy:
from enum import Enum
import coinespy as cpy
import bmi08x_common as bmi08x
from bmi08x_common import BMI08X as BMI08X_CLS
import helper_functions as hfunc
class BMI085(BMI08X_CLS):
""" Child class of BMI08X_CLS with methods for interrupt streaming """
def __init__(self, **kwargs):
BMI08X_CLS.__init__(self, kwargs["bus"], kwargs["interface"])
self.accel_stream_settings = dict(
I2C_ADDR_PRIMARY=0x18,
NO_OF_BLOCKS=2,
REG_X_LSB=[0x12, 0x00],
NO_OF_DATA_BYTES=[6, 1],
CHANNEL_ID=1,
CS_PIN=cpy.MultiIOPin.SHUTTLE_PIN_8.value,
INT_PIN=cpy.MultiIOPin.SHUTTLE_PIN_21.value,
INT_TIME_STAMP=1,
HW_PIN_STATE=1,
# SPI_TYPE = 0,
# CLEAR_ON_WRITE = 0,
# INTLINE_COUNT =0
)
self.gyro_stream_settings = dict(
I2C_ADDR_PRIMARY=0x68,
NO_OF_BLOCKS=2,
REG_X_LSB=[0x02, 0x00],
NO_OF_DATA_BYTES=[6, 1],
CHANNEL_ID=2,
CS_PIN=cpy.MultiIOPin.SHUTTLE_PIN_14.value,
INT_PIN=cpy.MultiIOPin.SHUTTLE_PIN_22.value,
INT_TIME_STAMP=1,
HW_PIN_STATE=0,
# SPI_TYPE = 0,
# CLEAR_ON_WRITE = 0,
# INTLINE_COUNT =0
)
# ACCEL Range set to 16G
self.accel_full_range = 16
# GYRO Range set to 250 dps
self.gyro_full_range = 250
self.accel_stream_config, self.accel_data_blocks = self.set_stream_settings(
self.accel_stream_settings, bmi08x.SensorType.ACCEL)
self.gyro_stream_config, self.gyro_data_blocks = self.set_stream_settings(
self.gyro_stream_settings, bmi08x.SensorType.GYRO)
self.number_of_samples = 10
self.accel_int_config = self.set_accel_interrupt_cfg()
self.gyro_int_config = self.set_gyro_interrupt_cfg()
self.packet_counter = {"ACCEL": 1, "GYRO": 1}
self.lost_packets = []
self.lost_packet_count = 0
self.accel_stream_data_packets = []
self.gyro_stream_data_packets = []
def set_stream_settings(self, sensor: dict, sensor_type: bmi08x.SensorType):
""" API to configure Stream and Data blocks """
stream_config = cpy.StreamingConfig()
data_blocks = cpy.StreamingBlocks()
if self.interface == cpy.SensorInterface.I2C:
stream_config.Intf = cpy.SensorInterface.I2C.value
stream_config.I2CBus = cpy.I2CBus.BUS_I2C_0.value
stream_config.DevAddr = sensor["I2C_ADDR_PRIMARY"]
elif self.interface == cpy.SensorInterface.SPI:
stream_config.Intf = cpy.SensorInterface.SPI.value
stream_config.SPIBus = cpy.SPIBus.BUS_SPI_0.value
stream_config.CSPin = sensor["CS_PIN"]
if sensor_type == bmi08x.SensorType.ACCEL and self.interface == cpy.SensorInterface.SPI:
# extra dummy byte for SPI
dummy_byte_offset = 1
else:
dummy_byte_offset = 0
data_blocks.NoOfBlocks = sensor["NO_OF_BLOCKS"]
for i in range(0, data_blocks.NoOfBlocks):
data_blocks.RegStartAddr[i] = sensor["REG_X_LSB"][i]
data_blocks.NoOfDataBytes[i] = sensor["NO_OF_DATA_BYTES"][i] + \
dummy_byte_offset
stream_config.IntTimeStamp = sensor["INT_TIME_STAMP"]
stream_config.IntPin = sensor["INT_PIN"]
stream_config.HwPinState = sensor["HW_PIN_STATE"]
# stream_config.SPIType = sensor["SPI_TYPE"]
# stream_config.ClearOnWrite = sensor["CLEAR_ON_WRITE"]
# if stream_config.ClearOnWrite:
# stream_config.ClearOnWriteConfig.DummyByte = 0
# stream_config.ClearOnWriteConfig.StartAddress = 0
# stream_config.ClearOnWriteConfig.NumBytesToClear = 0
# stream_config.IntlineCount = sensor["INTLINE_COUNT"]
# for i in range(0, stream_config.IntlineCount):
# stream_config.IntlineInfo[i] = sensor["INTLINE_INFO"][i]
return (stream_config, data_blocks)
def send_stream_settings(self, sensor: dict, sensor_type: Enum):
""" API to send streaming settings to board based on Sensor type"""
if sensor_type == bmi08x.SensorType.ACCEL:
ret = self.board.config_streaming(
sensor["CHANNEL_ID"], self.accel_stream_config, self.accel_data_blocks)
else:
ret = self.board.config_streaming(
sensor["CHANNEL_ID"], self.gyro_stream_config, self.gyro_data_blocks)
return ret
def print_accel_gyro_data(self, sensor: dict, sensor_type, sensor_data):
chip_id = 0
""" To Display Accel and Gyro data after unit conversion"""
if sensor_data:
stream_buffer, valid_sample_count = sensor_data
data = stream_buffer
if data:
print(
f'\n{"ACCEL" if sensor_type == bmi08x.SensorType.ACCEL else "GYRO"} DATA \n')
buffer_index = 0
for idx in range(0, valid_sample_count):
if (idx + 5) < len(data):
# First 4 bytes contain packet count info
packet_count = 0
shift_value = 24
for j in range(4):
packet_count |= data[buffer_index] << shift_value
buffer_index += 1
shift_value -= 8
if self.interface == cpy.SensorInterface.SPI and sensor_type == bmi08x.SensorType.ACCEL:
# dummy byte
buffer_index += 1
# Next 6 bytes contain sensor info
x_data = hfunc.twos_comp(
(data[1 + buffer_index] << 8) | data[0 + buffer_index], 16)
y_data = hfunc.twos_comp(
(data[3 + buffer_index] << 8) | data[2 + buffer_index], 16)
z_data = hfunc.twos_comp(
(data[5 + buffer_index] << 8) | data[4 + buffer_index], 16)
buffer_index += 6
if sensor_type == bmi08x.SensorType.ACCEL:
unit_converted_x_data = bmi08x.lsb_to_mps2(
x_data, self.accel_full_range, 16)
unit_converted_y_data = bmi08x.lsb_to_mps2(
y_data, self.accel_full_range, 16)
unit_converted_z_data = bmi08x.lsb_to_mps2(
z_data, self.accel_full_range, 16)
elif sensor_type == bmi08x.SensorType.GYRO:
unit_converted_x_data = bmi08x.lsb_to_dps(
x_data, self.gyro_full_range, 16)
unit_converted_y_data = bmi08x.lsb_to_dps(
y_data, self.gyro_full_range, 16)
unit_converted_z_data = bmi08x.lsb_to_dps(
z_data, self.gyro_full_range, 16)
# Next 1 byte chip_id
chip_id = data[buffer_index]
buffer_index += 1
# Next 6 bytes contain Time stamp
if sensor["INT_TIME_STAMP"] == 1:
time_stamp = 0
shift_value = 40
for j in range(6):
time_stamp |= data[buffer_index] << shift_value
buffer_index += 1
shift_value -= 8
if sensor_type == bmi08x.SensorType.ACCEL:
print(f"Accel[{packet_count}] Acc_ms2_X : {unit_converted_x_data:+.3f}"
f"\tAcc_ms2_Y : {unit_converted_y_data:+.3f}"
f"\tAcc_ms2_Z : {unit_converted_z_data:+.3f}"
f"\tChip_id : {chip_id} T(us): {time_stamp}")
elif sensor_type == bmi08x.SensorType.GYRO:
print(f"Gyro[{packet_count}] Gyr_DPS_X : {unit_converted_x_data:+3.2f}"
f"\tGyr_DPS_Y : {unit_converted_y_data:+3.2f}"
f"\tGyr_DPS_Z : {unit_converted_z_data:+3.2f}"
f"\tChip_id : {chip_id} T(us): {time_stamp}")
def set_accel_interrupt_cfg(self):
""" Set accel interrupt configurations """
accel_int_config = dict()
# Set accel interrupt pin configuration
int_types = bmi08x.Bmi08xAccelIntTypes
int_channels = bmi08x.Bmi08xAccelIntChannel
accel_int_config['int_channel'] = int_channels.BMI08X_INT_CHANNEL_1
accel_int_config['int_type'] = int_types.BMI08X_ACCEL_INT_DATA_RDY
accel_int_config['int_pin_cfg'] = dict()
accel_int_config['int_pin_cfg']['output_mode'] = bmi08x.BMI08X_INT_MODE_PUSH_PULL
accel_int_config['int_pin_cfg']['lvl'] = bmi08x.BMI08X_INT_ACTIVE_HIGH
return accel_int_config
def set_gyro_interrupt_cfg(self):
""" Set gyro interrupt configurations """
gyro_int_config = dict()
# Set gyro interrupt pin
int_types = bmi08x.Bmi08xGyroIntTypes
int_channels = bmi08x.Bmi08xGyroIntChannel
gyro_int_config['int_channel'] = int_channels.BMI08X_INT_CHANNEL_3
gyro_int_config['int_type'] = int_types.BMI08X_GYRO_INT_DATA_RDY
gyro_int_config['int_pin_cfg'] = dict()
gyro_int_config['int_pin_cfg']['output_mode'] = bmi08x.BMI08X_INT_MODE_PUSH_PULL
gyro_int_config['int_pin_cfg']['lvl'] = bmi08x.BMI08X_INT_ACTIVE_HIGH
return gyro_int_config
def enable_disable_bmi08x_interrupt(self, enable_interrupt: bool):
"""API to enable interrupt if enable_interrupt is True and disable otherwise
Args:
enable_interrupt (bool):
"""
self.accel_int_config['int_pin_cfg']['enable_int_pin'] = bmi08x.BMI08X_ENABLE \
if enable_interrupt else bmi08x.BMI08X_DISABLE
# Enable accel data ready interrupt channel
self.api_error_code = bmi08x.bmi08a_set_int_config(
self, self.accel_int_config)
self.verify_api_error("Accel Interrupt Config")
self.gyro_int_config['int_pin_cfg']['enable_int_pin'] = bmi08x.BMI08X_ENABLE \
if enable_interrupt else bmi08x.BMI08X_DISABLE
# Enable gyro data ready interrupt channel
self.api_error_code = bmi08x.bmi08g_set_int_config(
self, self.gyro_int_config)
self.verify_api_error("Gyro Interrupt Config")
def interrupt_streaming(self):
""" API to execute interrupt streaming sequence"""
accel_read_status = cpy.ErrorCodes.COINES_E_COMM_IO_ERROR
gyro_read_status = cpy.ErrorCodes.COINES_E_COMM_IO_ERROR
# Send streaming settings
self.board.error_code = self.send_stream_settings(
self.accel_stream_settings, bmi08x.SensorType.ACCEL)
self.verify_error("Accel Stream settings")
self.board.error_code = self.send_stream_settings(
self.gyro_stream_settings, bmi08x.SensorType.GYRO)
self.verify_error("Gyro Stream settings")
# Enables 48-bit system timer
self.board.error_code = self.board.trigger_timer(
cpy.TimerConfig.TIMER_START.value, cpy.TimerStampConfig.TIMESTAMP_ENABLE.value)
self.verify_error("Start timer")
# Wait for 10 ms
self.board.delay_milli_sec(10)
# Enable data ready interrupts
self.enable_disable_bmi08x_interrupt(enable_interrupt=True)
# Start interrupt streaming
self.board.error_code = self.board.start_stop_streaming(
cpy.StreamingMode.STREAMING_MODE_INTERRUPT.value, cpy.StreamingState.STREAMING_START.value)
self.verify_error("Start Interrupt streaming")
# Read sensor data via interrupt streaming
(self.board.error_code, accel_stream_buffer, accel_valid_sample_count) = self.board.read_stream_sensor_data(
self.accel_stream_settings["CHANNEL_ID"], self.number_of_samples)
accel_read_status = self.board.error_code
self.verify_error("Read Accel stream data")
if self.board.error_code == cpy.ErrorCodes.COINES_SUCCESS:
(self.board.error_code, gyro_stream_buffer, gyro_valid_sample_count) = self.board.read_stream_sensor_data(
self.gyro_stream_settings["CHANNEL_ID"], self.number_of_samples)
gyro_read_status = self.board.error_code
self.verify_error("Read Gyro stream data")
# Stop interrupt streaming
self.board.error_code = self.board.start_stop_streaming(
cpy.StreamingMode.STREAMING_MODE_INTERRUPT.value, cpy.StreamingState.STREAMING_STOP.value)
self.verify_error("Stop Interrupt streaming")
# Stop Timer
self.board.error_code = self.board.trigger_timer(
cpy.TimerConfig.TIMER_STOP.value, cpy.TimerStampConfig.TIMESTAMP_DISABLE.value)
self.verify_error("Stop timer")
# print the streamed accel & gyro data
if accel_read_status == cpy.ErrorCodes.COINES_SUCCESS:
accel_data = [accel_stream_buffer,
accel_valid_sample_count]
self.print_accel_gyro_data(
self.accel_stream_settings, bmi08x.SensorType.ACCEL, accel_data)
if gyro_read_status == cpy.ErrorCodes.COINES_SUCCESS:
gyro_data = [gyro_stream_buffer, gyro_valid_sample_count]
self.print_accel_gyro_data(
self.gyro_stream_settings, bmi08x.SensorType.GYRO, gyro_data)
# flush the buffer
self.board.flush_interface()
# Wait for 100 ms
self.board.delay_milli_sec(100)
# Disable data ready interrupts
self.enable_disable_bmi08x_interrupt(enable_interrupt=False)
# Parse sensor stream data
self.accel_stream_data_packets = self.parse_read_data(
accel_stream_buffer, accel_valid_sample_count, bmi08x.SensorType.ACCEL)
self.gyro_stream_data_packets = self.parse_read_data(
gyro_stream_buffer, gyro_valid_sample_count, bmi08x.SensorType.GYRO)
# Analyze data packets loss
self.analyze_data_loss(
self.accel_stream_data_packets, bmi08x.SensorType.ACCEL)
self.analyze_data_loss(
self.gyro_stream_data_packets, bmi08x.SensorType.GYRO)
def parse_read_data(self, read_data, samples_count, sensor_type):
"""
This function is used to parse the read data to packets
"""
index = 0
streamed_data = []
if self.interface == cpy.SensorInterface.SPI and sensor_type == bmi08x.SensorType.ACCEL:
packet_len = 19
else:
packet_len = 17
for sample in range(samples_count):
# packet_len computation
streamed_data.append(read_data[index:index + packet_len])
index += packet_len
return streamed_data
def check_for_packet_loss(self, sensor_name, packet_cnt):
"""
This function is to count and track lost packets
"""
if packet_cnt == self.packet_counter[sensor_name]:
self.packet_counter[sensor_name] += 1
else:
self.lost_packet_count = packet_cnt - \
self.packet_counter[sensor_name]
for i in range(self.lost_packet_count):
self.lost_packets.append(self.packet_counter[sensor_name] + i)
# Data loss increment the counter to check next packet
self.packet_counter[sensor_name] = packet_cnt + 1
def print_data_loss(self, sensor_type):
"""
Print data loss in streamed packets
"""
print(
f"\n<-----------------{sensor_type.name} Streaming Benchmark Result------------------>")
if self.packet_counter[sensor_type.name] > 1:
loss_percent = (self.lost_packet_count /
self.packet_counter[sensor_type.name]) * 100
print(
f'{sensor_type.name} data loss - {loss_percent:.5f}%, '
f'total packets received - {self.packet_counter[sensor_type.name]}, '
f'packets lost - {self.lost_packet_count}'
)
else:
print(f"{sensor_type.name} streaming not configured")
def analyze_data_loss(self, streamed_data, sensor_type):
"""
Analyze data loss for a specific sensor type
"""
self.lost_packets = []
self.lost_packet_count = 0
for data in streamed_data:
packet_cnt = int.from_bytes(bytes(data[0:4]), "big", signed=False)
self.check_for_packet_loss(sensor_type.name, packet_cnt)
self.print_data_loss(sensor_type)
if __name__ == "__main__":
bmi085 = BMI085(bus=cpy.I2CBus.BUS_I2C_0,
interface=cpy.SensorInterface.I2C)
# bmi085 = BMI085(bus=cpy.SPIBus.BUS_SPI_0,interface=cpy.SensorInterface.SPI)
bmi085.init_board()
# Change sensor config settings as shown below
# bmi085.accel_stream_config.I2CBus = cpy.I2CBus.BUS_I2C_0.value
# bmi085.number_of_samples = 30
# # Change sensor interrupt config settings as shown below
# bmi085.accel_int_config['int_channel'] = bmi08x.Bmi08xAccelIntChannel.BMI08X_INT_CHANNEL_1
# Set Accel power, ODR and range settings
bmi085.set_accel_power_mode()
acc_range = bmi085.set_accel_meas_conf()
# Set Gyro power, ODR and range settings
bmi085.set_gyro_power_mode()
gyr_range = bmi085.set_gyro_meas_conf()
print(f"Accel range: {acc_range} Gyro range: {gyr_range}")
if bmi085.accel_cfg['POWER_MODE'] == bmi08x.BMI08X_ACCEL_PM_SUSPEND and \
(bmi085.gyro_cfg['POWER_MODE'] == bmi08x.BMI08X_GYRO_PM_SUSPEND or
bmi085.gyro_cfg['POWER_MODE'] == bmi08x.BMI08X_GYRO_PM_DEEP_SUSPEND):
print("Warning: Accel and gyro sensors are in suspend mode. Use them in active/normal mode !!")
try:
bmi085.interrupt_streaming()
except Exception as e:
print(f"Exception:{e}")
bmi085.close_comm()
bmi085.close_comm()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file coinespy-test-only-1.0.9.tar.gz
.
File metadata
- Download URL: coinespy-test-only-1.0.9.tar.gz
- Upload date:
- Size: 1.3 MB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.11.5
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | a8f3f6d693c92304eeb91d984d5dd56ac364ff75b924fd63d0db8f0d9697f6a9 |
|
MD5 | e165a60243d4659c5589362b99fdad12 |
|
BLAKE2b-256 | 3ea28cfe038ee0778bd7dd1a85df39a5cba49d0bb0f05de187086bbcb5ed8f32 |
File details
Details for the file coinespy_test_only-1.0.9-py3-none-any.whl
.
File metadata
- Download URL: coinespy_test_only-1.0.9-py3-none-any.whl
- Upload date:
- Size: 1.3 MB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.11.5
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | f28b1fbfd52737abb25c86ba07591a8930aa904b86ef14322876d4902178df8e |
|
MD5 | a021202ac46d93c7f941719182768919 |
|
BLAKE2b-256 | 627071c5138cbf107448e2451006227f08a09f64281d1159d0ab3bb4015770db |