# -*- coding: utf-8 -*-
"""This module contains all packet classes of Mentalab Explore device"""
import abc
from enum import IntEnum
import time
import numpy as np
from explorepy._exceptions import FletcherError
[docs]class PACKET_ID(IntEnum):
"""Packet ID enum"""
ORN = 13
ENV = 19
TS = 27
DISCONNECT = 111
INFO = 99
EEG94 = 144
EEG98 = 146
EEG99S = 30
EEG99 = 62
EEG94R = 208
EEG98R = 210
CMDRCV = 192
CMDSTAT = 193
MARKER = 194
CALIBINFO = 195
EXG_UNIT = 1e-6
[docs]class Packet:
"""An abstract base class for Explore packet"""
__metadata__ = abc.ABCMeta
def __init__(self, timestamp, payload):
"""Gets the timestamp and payload and initializes the packet object
Args:
payload (bytearray): a byte array including binary data and fletcher
"""
self.timestamp = timestamp
[docs] @abc.abstractmethod
def _convert(self, bin_data):
"""Read the binary data and convert it to real values"""
[docs] @abc.abstractmethod
def _check_fletcher(self, fletcher):
"""Checks if the fletcher is valid"""
[docs] @abc.abstractmethod
def __str__(self):
"""Print the data/info"""
[docs] @staticmethod
def int24to32(bin_data):
"""Converts binary data to int32
Args:
bin_data (list): list of bytes with the structure of int24
Returns:
np.ndarray of int values
"""
assert len(bin_data) % 3 == 0, "Packet length error!"
return np.asarray([int.from_bytes(bin_data[x:x + 3],
byteorder='little',
signed=True) for x in range(0, len(bin_data), 3)])
[docs]class EEG(Packet):
"""EEG packet class"""
__metadata__ = abc.ABCMeta
[docs] def calculate_impedance(self, imp_calib_info):
"""calculate impedance with the help of impedance calibration info
Args:
imp_calib_info (dict): dictionary of impedance calibration info including slope, offset and noise level
"""
self.imp_data = np.round((self.get_ptp() - imp_calib_info['noise_level']) * imp_calib_info['slope']/1.e6 -
imp_calib_info['offset'], decimals=0)
[docs] def get_data(self, exg_fs=None):
"""get time vector and data
If exg_fs is given, it returns time vector and data. If exg_fs is not given, it returns the timestamp of the
packet alongside with the data
"""
if exg_fs:
n_sample = self.data.shape[1]
time_vector = np.linspace(self.timestamp, self.timestamp + (n_sample - 1) / exg_fs, n_sample)
return time_vector, self.data
return self.timestamp, self.data
[docs] def get_impedances(self):
"""get electrode impedances"""
return self.imp_data
[docs] def get_ptp(self):
"""Get peak to peak value"""
return np.ptp(self.data, axis=1)
[docs]class EEG94(EEG):
"""EEG packet for 4 channel device"""
def __init__(self, timestamp, payload):
super().__init__(timestamp, payload)
self._convert(payload[:-4])
self._check_fletcher(payload[-4:])
[docs] def _convert(self, bin_data):
data = Packet.int24to32(bin_data)
n_chan = -1
v_ref = 2.4
n_packet = 33
data = data.reshape((n_packet, n_chan)).astype(np.float).T
gain = EXG_UNIT * ((2 ** 23) - 1) * 6.
self.data = np.round(data[1:, :] * v_ref / gain, 2)
self.data_status = data[0, :]
[docs] def _check_fletcher(self, fletcher):
if not fletcher == b'\xaf\xbe\xad\xde':
raise FletcherError('Fletcher value is incorrect!')
def __str__(self):
return "EEG: " + str(self.data[:, -1]) + "\tEEG STATUS: " + str(self.data_status[-1])
[docs]class EEG98(EEG):
"""EEG packet for 8 channel device"""
def __init__(self, timestamp, payload):
super().__init__(timestamp, payload)
self._convert(payload[:-4])
self._check_fletcher(payload[-4:])
[docs] def _convert(self, bin_data):
data = Packet.int24to32(bin_data)
n_chan = -1
v_ref = 2.4
n_packet = 16
data = data.reshape((n_packet, n_chan)).astype(np.float).T
gain = EXG_UNIT * ((2 ** 23) - 1) * 6.
self.data = np.round(data[1:, :] * v_ref / gain, 2)
self.status = (hex(bin_data[0]), hex(bin_data[1]), hex(bin_data[2]))
[docs] def _check_fletcher(self, fletcher):
if not fletcher == b'\xaf\xbe\xad\xde':
raise FletcherError('Fletcher value is incorrect!')
def __str__(self):
return "EEG: " + str(self.data[:, -1]) + "\tEEG STATUS: " + str(self.status)
[docs]class EEG99s(EEG):
"""EEG packet for 8 channel device"""
def __init__(self, timestamp, payload):
super().__init__(timestamp, payload)
self._convert(payload[:-4])
self._check_fletcher(payload[-4:])
[docs] def _convert(self, bin_data):
data = Packet.int24to32(bin_data)
n_chan = -1
v_ref = 4.5
n_packet = 16
data = data.reshape((n_packet, n_chan)).astype(np.float).T
gain = EXG_UNIT * ((2 ** 23) - 1) * 6.
self.data = np.round(data * v_ref / gain, 2)
self.status = data[0, :]
[docs] def _check_fletcher(self, fletcher):
if not fletcher == b'\xaf\xbe\xad\xde':
raise FletcherError('Fletcher value is incorrect!')
def __str__(self):
return "EEG: " + str(self.data[:, -1]) + "\tEEG STATUS: " + str(self.status)
[docs]class EEG99(EEG):
"""EEG packet for 8 channel device"""
def __init__(self, timestamp, payload):
super().__init__(timestamp, payload)
self._convert(payload[:-4])
self._check_fletcher(payload[-4:])
[docs] def _convert(self, bin_data):
data = Packet.int24to32(bin_data)
n_chan = -1
v_ref = 4.5
n_packet = 16
data = data.reshape((n_packet, n_chan)).astype(np.float).T
gain = EXG_UNIT * ((2 ** 23) - 1) * 6.
self.data = np.round(data * v_ref / gain, 2)
[docs] def _check_fletcher(self, fletcher):
if not fletcher == b'\xaf\xbe\xad\xde':
raise FletcherError('Fletcher value is incorrect!')
def __str__(self):
return "EEG: " + str(self.data[:, -1])
[docs]class Orientation(Packet):
"""Orientation data packet"""
def __init__(self, timestamp, payload):
super().__init__(timestamp, payload)
self._convert(payload[:-4])
self._check_fletcher(payload[-4:])
[docs] def _convert(self, bin_data):
data = np.copy(np.frombuffer(bin_data, dtype=np.dtype(np.int16).newbyteorder('<'))).astype(np.float)
self.acc = 0.061 * data[0:3] # Unit [mg/LSB]
self.gyro = 8.750 * data[3:6] # Unit [mdps/LSB]
self.mag = 1.52 * np.multiply(data[6:], np.array([-1, 1, 1])) # Unit [mgauss/LSB]
self.theta = None
self.rot_axis = None
[docs] def _check_fletcher(self, fletcher):
if not fletcher == b'\xaf\xbe\xad\xde':
raise FletcherError('Fletcher value is incorrect!')
def __str__(self):
return "Acc: " + str(self.acc) + "\tGyro: " + str(self.gyro) + "\tMag: " + str(self.mag)
[docs] def get_data(self, srate=None):
"""Get orientation timestamp and data"""
return [self.timestamp], self.acc.tolist() + self.gyro.tolist() + self.mag.tolist()
[docs] def compute_angle(self, matrix=None):
"""Compute physical angle"""
trace = matrix[0][0]+matrix[1][1]+matrix[2][2]
theta = np.arccos((trace-1)/2)*57.2958
nx = matrix[2][1] - matrix[1][2]
ny = matrix[0][2] - matrix[2][0]
nz = matrix[1][0] - matrix[0][1]
rot_axis = 1/np.sqrt((3-trace)*(1+trace))*np.array([nx, ny, nz])
self.theta = theta
self.rot_axis = rot_axis
return [theta, rot_axis]
[docs]class Environment(Packet):
"""Environment data packet"""
def __init__(self, timestamp, payload):
super().__init__(timestamp, payload)
self._convert(payload[:-4])
self._check_fletcher(payload[-4:])
[docs] def _convert(self, bin_data):
self.temperature = bin_data[0]
self.light = (1000 / 4095) * np.frombuffer(bin_data[1:3],
dtype=np.dtype(np.uint16).newbyteorder('<')) # Unit Lux
self.battery = (16.8 / 6.8) * (1.8 / 2457) * np.frombuffer(bin_data[3:5],
dtype=np.dtype(np.uint16).newbyteorder(
'<')) # Unit Volt
self.battery_percentage = self._volt_to_percent(self.battery)
[docs] def _check_fletcher(self, fletcher):
if not fletcher == b'\xaf\xbe\xad\xde':
raise FletcherError('Fletcher value is incorrect!')
def __str__(self):
return "Temperature: " + str(self.temperature) + "\tLight: " + str(self.light) + "\tBattery: " + str(
self.battery)
[docs] def get_data(self):
"""Get environment data"""
return {'battery': [self.battery_percentage],
'temperature': [self.temperature],
'light': [self.light]}
[docs] @staticmethod
def _volt_to_percent(voltage):
if voltage < 3.1:
percentage = 1
elif voltage < 3.5:
percentage = 1 + (voltage - 3.1) / .4 * 10
elif voltage < 3.8:
percentage = 10 + (voltage - 3.5) / .3 * 40
elif voltage < 3.9:
percentage = 40 + (voltage - 3.8) / .1 * 20
elif voltage < 4.:
percentage = 60 + (voltage - 3.9) / .1 * 15
elif voltage < 4.1:
percentage = 75 + (voltage - 4.) / .1 * 15
elif voltage < 4.2:
percentage = 90 + (voltage - 4.1) / .1 * 10
elif voltage > 4.2:
percentage = 100
percentage = int(percentage)
return percentage
[docs]class TimeStamp(Packet):
"""Time stamp data packet"""
def __init__(self, timestamp, payload):
super().__init__(timestamp, payload)
self._convert(payload[:-4])
self._check_fletcher(payload[-4:])
self.raw_data = None
[docs] def _convert(self, bin_data):
self.host_timestamp = np.frombuffer(bin_data, dtype=np.dtype(np.uint64).newbyteorder('<'))
[docs] def _check_fletcher(self, fletcher):
if not fletcher == b'\xff\xff\xff\xff':
raise FletcherError('Fletcher value is incorrect!')
def __str__(self):
return "Host timestamp: " + str(self.host_timestamp)
[docs]class EventMarker(Packet):
"""Marker packet"""
def __init__(self, timestamp, payload):
super().__init__(timestamp, payload)
self._convert(payload[:-4])
self._check_fletcher(payload[-4:])
[docs] def _convert(self, bin_data):
self.marker_code = np.frombuffer(bin_data, dtype=np.dtype(np.uint16).newbyteorder('<'))[0]
[docs] def _check_fletcher(self, fletcher):
if not fletcher == b'\xaf\xbe\xad\xde':
raise FletcherError('Fletcher value is incorrect!')
def __str__(self):
return "Event marker: " + str(self.marker_code)
[docs] def get_data(self, srate=None):
"""Get marker data
Args:
srate: NOT USED. Only for compatibility purpose"""
return [self.timestamp], [self.marker_code]
[docs]class Disconnect(Packet):
"""Disconnect packet"""
def __init__(self, timestamp, payload):
super().__init__(timestamp, payload)
self._check_fletcher(payload)
[docs] def _convert(self, bin_data):
"""Disconnect packet has no data"""
[docs] def _check_fletcher(self, fletcher):
if not fletcher == b'\xaf\xbe\xad\xde':
raise FletcherError('Fletcher value is incorrect!')
def __str__(self):
return "Device has been disconnected!"
[docs]class DeviceInfo(Packet):
"""Device information packet"""
def __init__(self, timestamp, payload):
super(DeviceInfo, self).__init__(timestamp, payload)
self._convert(payload[:-4])
self._check_fletcher(payload[-4:])
[docs] def _convert(self, bin_data):
fw_num = np.frombuffer(bin_data, dtype=np.dtype(np.uint16).newbyteorder('<'), count=1, offset=0)
self.firmware_version = '.'.join([char for char in str(fw_num)[1:-1]])
self.sampling_rate = 16000 / (2 ** bin_data[2])
self.adc_mask = [int(bit) for bit in format(bin_data[3], '#010b')[2:]]
[docs] def _check_fletcher(self, fletcher):
if not fletcher == b'\xaf\xbe\xad\xde':
raise FletcherError('Fletcher value is incorrect!')
[docs] def get_info(self):
"""Get device information as a dictionary"""
return dict(firmware_version=self.firmware_version,
adc_mask=self.adc_mask,
sampling_rate=self.sampling_rate)
def __str__(self):
return "Firmware version: " + self.firmware_version + " - sampling rate: " + str(self.sampling_rate)\
+ " Hz" + " - ADC mask: " + str(self.adc_mask)
[docs] def get_data(self):
"""Get firmware version"""
return {'firmware_version': [self.firmware_version]}
[docs]class CommandRCV(Packet):
"""Command Status packet"""
def __init__(self, timestamp, payload):
super(CommandRCV, self).__init__(timestamp, payload)
self._convert(payload[:-4])
self._check_fletcher(payload[-4:])
[docs] def _convert(self, bin_data):
self.opcode = bin_data[0]
[docs] def _check_fletcher(self, fletcher):
if not fletcher == b'\xaf\xbe\xad\xde':
raise FletcherError('Fletcher value is incorrect!')
def __str__(self):
return "an acknowledge message for command with this opcode has been received: " + str(self.opcode)
[docs]class CommandStatus(Packet):
"""Command Status packet"""
def __init__(self, timestamp, payload):
super(CommandStatus, self).__init__(timestamp, payload)
self._convert(payload[:-4])
self._check_fletcher(payload[-4:])
[docs] def _convert(self, bin_data):
self.opcode = bin_data[0]
self.status = bin_data[5]
[docs] def _check_fletcher(self, fletcher):
if not fletcher == b'\xaf\xbe\xad\xde':
raise FletcherError('Fletcher value is incorrect!')
def __str__(self):
return "Command status: " + str(self.status) + "\tfor command with opcode: " + str(self.opcode)
[docs]class CalibrationInfo(Packet):
"""Calibration Info packet"""
def __init__(self, timestamp, payload):
super(CalibrationInfo, self).__init__(timestamp, payload)
self._convert(payload[:-4])
self._check_fletcher(payload[-4:])
[docs] def _convert(self, bin_data):
slope = np.frombuffer(bin_data, dtype=np.dtype(np.uint16).newbyteorder('<'), count=1, offset=0)
self.slope = slope * 10.0
offset = np.frombuffer(bin_data, dtype=np.dtype(np.uint16).newbyteorder('<'), count=1, offset=2)
self.offset = offset * 0.001
[docs] def get_info(self):
"""Get calibration info"""
return {'slope': self.slope,
'offset': self.offset}
[docs] def _check_fletcher(self, fletcher):
if not fletcher == b'\xaf\xbe\xad\xde':
raise FletcherError('Fletcher value is incorrect!')
def __str__(self):
return "calibration info: slope = " + str(self.slope) + "\toffset = " + str(self.offset)
PACKET_CLASS_DICT = {
PACKET_ID.ORN: Orientation,
PACKET_ID.ENV: Environment,
PACKET_ID.TS: TimeStamp,
PACKET_ID.DISCONNECT: Disconnect,
PACKET_ID.INFO: DeviceInfo,
PACKET_ID.EEG94: EEG94,
PACKET_ID.EEG98: EEG98,
PACKET_ID.EEG99S: EEG99s,
PACKET_ID.EEG99: EEG99s,
PACKET_ID.EEG94R: EEG94,
PACKET_ID.EEG98R: EEG98,
PACKET_ID.CMDRCV: CommandRCV,
PACKET_ID.CMDSTAT: CommandStatus,
PACKET_ID.CALIBINFO: CalibrationInfo,
PACKET_ID.MARKER: EventMarker
}