# -*- coding: utf-8 -*-
from explorepy._exceptions import *
import bluetooth
import time
import sys
[docs]class BtClient:
""" Responsible for Connecting and reconnecting explore devices via bluetooth"""
def __init__(self):
self.is_connected = False
self.lastUsedAddress = None
self.socket = None
self.host = None
self.port = None
self.name = None
[docs] def init_bt(self, device_name=None, device_addr=None):
"""
Initialize Bluetooth connection
Args:
device_name(str): Name of the device (either device_name or device address should be given)
device_addr(str): Devices MAC address
"""
if (device_addr is None) and (device_name is None):
raise InputError("Missing name or address")
if device_name is not None:
if device_addr is None:
if not self.find_mac_addr(device_name):
raise DeviceNotFoundError("Error: Couldn't find the device! Restart your device and run the code "
"again and check if MAC address/name is entered correctly.")
if not self._check_mac_address(device_name=device_name, mac_address=self.lastUsedAddress):
raise ValueError("MAC address does not match the expected value!")
else:
self.lastUsedAddress = device_addr
if not self._check_mac_address(device_name=device_name, mac_address=self.lastUsedAddress):
raise ValueError("MAC address does not match the expected value!")
else:
# No need to scan if we have the address
self.lastUsedAddress = device_addr
device_name = "Explore_"+str(device_addr[-5:-3])+str(device_addr[-2:])
address_known = True
service_matches = self.find_explore_service()
if service_matches is None:
raise DeviceNotFoundError("SSP service for the device %s, with MAC address %s could not be found. Please "
"restart the device and try again" %(device_name, self.lastUsedAddress))
for services in service_matches:
self.port = services["port"]
self.name = services["name"]
self.host = services["host"]
# Checking if "Explore_ABCD" matches "XX:XX:XX:XX:AB:CD"
if self._check_mac_address(device_name=device_name, mac_address=self.host):
break
if not self._check_mac_address(device_name=device_name, mac_address=self.host):
raise ValueError("MAC address does not match the expected value on the SSP service!!")
[docs] def bt_connect(self):
"""Creates the socket
"""
timeout = 0
while True:
try:
self.socket = bluetooth.BluetoothSocket(bluetooth.RFCOMM)
print("Connecting to %s with address %s" % (self.name, self.host))
self.socket.connect((self.host, self.port))
break
except bluetooth.BluetoothError as error:
self.socket.close()
print("Could not connect; Retrying in 2s...")
time.sleep(2)
timeout += 1
if timeout > 5:
raise DeviceNotFoundError("Could not find the device! Please make sure the device is on and in "
"advertising mode.")
return self.socket
[docs] def reconnect(self):
"""
tries to open the last bt socket, uses the last port and host. if after 1 minute the connection doesnt succeed,
program will end
"""
timeout = 1
while timeout < 5:
try:
self.socket = bluetooth.BluetoothSocket(bluetooth.RFCOMM)
self.socket.connect((self.host, self.port))
break
except bluetooth.BluetoothError as error:
print("Bluetooth Error: Probably timeout, attempting reconnect. Error: ", error)
time.sleep(5)
timeout += 1
if timeout > 5:
self.socket.close()
raise DeviceNotFoundError("Could not find the device! Please make sure the device is on and in "
"advertising mode.")
[docs] def find_mac_addr(self, device_name):
i = 0
while i < 5:
nearby_devices = bluetooth.discover_devices(lookup_names=True, flush_cache=True )
for address, name in nearby_devices:
if name == device_name:
self.lastUsedAddress = address
return True
i += 1
print("No device found with the name: %s, searching again..." % device_name)
time.sleep(0.1)
return False
[docs] def find_explore_service(self):
uuid = "1101" # Serial Port Profile (SPP) service
i = 0
while i < 5:
service_matches = bluetooth.find_service(uuid=uuid, address=self.lastUsedAddress)
if len(service_matches) > 0:
return service_matches
i += 1
return None
[docs] @staticmethod
def _check_mac_address(device_name, mac_address):
return (device_name[-4:-2] == mac_address[-5:-3]) and (device_name[-2:] == mac_address[-2:])