# -*- coding: utf-8 -*-
"""A module for bluetooth connection"""
import time
import bluetooth
from explorepy._exceptions import DeviceNotFoundError, InputError
SPP_UUID = "1101" # Serial Port Profile (SPP) service
[docs]class BtClient:
""" Responsible for Connecting and reconnecting explore devices via bluetooth"""
def __init__(self, device_name=None, mac_address=None):
"""Initialize Bluetooth connection
Args:
device_name(str): Name of the device (either device_name or device address should be given)
mac_address(str): Devices MAC address
"""
if (mac_address is None) and (device_name is None):
raise InputError("Either name or address options must be provided!")
self.is_connected = False
self.mac_address = mac_address
self.device_name = device_name
self.socket = None
self.host = None
self.port = None
[docs] def connect(self):
"""Connect to the device and return the socket
Returns:
socket (bluetooth.socket)
"""
if self.mac_address is None:
self._find_mac_address()
else:
self.device_name = "Explore_" + str(self.mac_address[-5:-3]) + str(self.mac_address[-2:])
self._find_service()
for _ in range(5):
try:
self.socket = bluetooth.BluetoothSocket(bluetooth.RFCOMM)
print("Connecting to {}".format(self.device_name))
self.socket.connect((self.host, self.port))
self.is_connected = True
return self.socket
except bluetooth.BluetoothError as error:
self.socket.close()
self.is_connected = False
print(error, "\nCould not connect; Retrying in 2s...")
time.sleep(2)
self.is_connected = False
raise DeviceNotFoundError("Could not find the device! Please make sure"
" the device is on and in advertising mode.")
[docs] def reconnect(self):
"""Reconnect to the last used bluetooth socket.
This function reconnects to the the last bluetooth socket. If after 1 minute the connection doesn't succeed,
program will end.
"""
for _ in range(5):
try:
self.socket = bluetooth.BluetoothSocket(bluetooth.RFCOMM)
self.socket.connect((self.host, self.port))
self.is_connected = True
print('Connected to the device')
return self.socket
except bluetooth.BluetoothError as error:
print("Bluetooth Error: {}, attempting to reconnect...".format(error))
self.socket.close()
self.is_connected = False
time.sleep(2)
self.socket.close()
self.is_connected = False
return None
[docs] def disconnect(self):
"""Disconnect from the device"""
self.socket.close()
self.is_connected = False
[docs] def _find_mac_address(self):
for _ in range(5):
nearby_devices = bluetooth.discover_devices(lookup_names=True, flush_cache=True)
for address, name in nearby_devices:
if name == self.device_name:
self.mac_address = address
return
print("No device found with the name: {}, searching again...".format(self.device_name))
time.sleep(0.1)
raise DeviceNotFoundError("No device found with the name: {}".format(self.device_name))
[docs] def _find_service(self):
for _ in range(5):
service = bluetooth.find_service(uuid=SPP_UUID, address=self.mac_address)
if service:
self.port = service[0]["port"]
self.host = service[0]["host"]
return 1
raise DeviceNotFoundError("SSP service for the device {}. Please restart the device and try "
"again".format(self.device_name))
[docs] def read(self, n_bytes):
"""Read n_bytes from the socket
Args:
n_bytes (int): number of bytes to be read
Returns:
list of bytes
"""
if self.socket is None:
raise ConnectionAbortedError('No bluetooth connection!')
try:
byte_data = self.socket.recv(n_bytes)
if len(byte_data) < n_bytes:
raise ConnectionAbortedError('No data in bluetooth buffer.')
except ValueError as error:
raise ConnectionAbortedError(error.__str__())
return byte_data
[docs] def send(self, data):
"""Send data to the device
Args:
data (bytearray): Data to be sent
"""
self.socket.send(data)
[docs] @staticmethod
def _check_mac_address(device_name, mac_address):
return (device_name[-4:-2] == mac_address[-5:-3]) and (device_name[-2:] == mac_address[-2:])