BLE device scanner and data collector for Flowtime
Project description
Enter BLE SDK For PC
简介
Enter BLE SDK For PC 是回车科技提供的,适配回车科技蓝牙芯片的 PC 端 SDK。本 SDK 使用 Python 语言开发,可以在 macOS、Linux、Win 下运行。
安装
pip install enterble
功能
- 搜索蓝牙设备
- 连接蓝牙设备
- 与蓝牙设备通信
- 回车科技 Flowtime 系列芯片数据交互适配
使用
查看 examples
指引
- 打开扫描设备
loop.run_until_complete(device_discover())
- 运行 simple.py
- 观察设备扫描情况
- 修改设备唯一 ID 为你扫到的自己的设备(macOS 系统用 UUID、Win 系统用 MAC 地址)
-
device_identify = ( "d2:ab:3f:c9:37:ad" if platform.system() != "Darwin" else "D5D4362A-1690-4204-B797-3015EEDB510E" )
-
- 打开采集数据的代码
loop.run_until_complete(data_collector())
- 运行 simple.py
应该就可以了
Simple:
import asyncio
import sys
import logging
import platform
from typing import List
from bleak.backends.client import BaseBleakClient
from enterble import DeviceScanner, FlowtimeCollector
if sys.version_info < (3, 7):
asyncio.get_running_loop = asyncio._get_running_loop
logging.basicConfig(
level=logging.INFO,
format='%(levelname)s - %(asctime)s - %(name)s - %(message)s'
)
logger = logging.getLogger(__name__)
def bleak_log(level=logging.INFO):
import bleak
logger.info(f'Bleak version: {bleak.__version__}')
logging.getLogger('bleak').setLevel(level=level)
async def device_discover():
"""设备扫描器:扫描设备并获取设备 MAC 信息等
Raises:
Exception: 设备未发现
"""
devices = await DeviceScanner.discover(
name=None,
model_nbr_uuid='0000ff10-1212-abcd-1523-785feabcd123',
)
if not devices:
raise Exception('No device found, please try again later.')
for device in devices:
try:
services = await device.get_services()
for _id, service in services.characteristics.items():
logger.info(f'{device} - {_id} - {service}')
MAC = await device.get_mac_address()
logger.info(
f'{device} - {MAC}'
)
except Exception as e:
logger.error(f'{device} - {device.identify} - {e}')
async def data_collector():
"""数据采集器:采集数据并输出到日志"""
async def device_disconnected(device: BaseBleakClient) -> None:
"""设备断开回调函数
Args:
device (BaseBleakClient): 设备实例
"""
logger.info(f'Device disconnected: {device}')
async def soc_callback(soc: float) -> None:
"""电池电量回调函数
Args:
soc (float): 电池电量
"""
logger.info(f'Battery SOC: {soc}')
logger.info(f'SOC: {soc}')
pass
async def wear_status_callback(wear_status: bool) -> None:
"""佩戴状态回调函数
Args:
wear_status (bool): 佩戴状态
"""
logger.info(f'Wear status: {wear_status}')
pass
async def eeg_data_collector(data: List[int]) -> None:
"""EEG 数据采集回调函数
Args:
data (List[int]): EEG 数据
"""
logger.info(f'EEG: {data}')
pass
async def hr_data_collector(data: int):
"""HR 数据采集回调函数
Args:
data (int): HR 数据
"""
logger.info(f'HR: {data}')
pass
# 设备广播 UUID
model_nbr_uuid = '0000ff10-1212-abcd-1523-785feabcd123'
# 设备唯一 ID(可以通过扫描设备确认)
device_identify = (
"d2:ab:3f:c9:37:ad"
if platform.system() != "Darwin"
else "D5D4362A-1690-4204-B797-3015EEDB510E"
)
# 初始化采集器
collector = FlowtimeCollector(
name='Flowtime',
model_nbr_uuid=model_nbr_uuid,
device_identify=device_identify,
device_disconnected_callback=device_disconnected,
soc_data_callback=soc_callback,
wear_status_callback=wear_status_callback,
eeg_data_callback=eeg_data_collector,
hr_data_callback=hr_data_collector,
)
# 启动采集器
await collector.start()
# 等待结束
await collector.wait_for_stop()
if __name__ == '__main__':
bleak_log(logging.INFO)
loop = asyncio.get_event_loop()
# 扫描设备
# loop.run_until_complete(device_discover())
# 采集数据
loop.run_until_complete(data_collector())
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
enterble-1.1.2.tar.gz
(11.4 kB
view details)
File details
Details for the file enterble-1.1.2.tar.gz
.
File metadata
- Download URL: enterble-1.1.2.tar.gz
- Upload date:
- Size: 11.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.1 CPython/3.9.7
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 19042a7cd5d349272f0189ffa42914b55987da23a274a918ade9d6258f2d1ab8 |
|
MD5 | ae48cbe4b6611283ae411e6cdac115e2 |
|
BLAKE2b-256 | 98b7c2ea6603829ed448a21e78fdbfbc7e25f737fab0c857b6dde3d4b675793c |