# -*- coding: utf-8 -*-
import numpy as np
import abc
import struct
from functools import partial
from enum import IntEnum
from datetime import datetime
[docs]class PACKET_ID(IntEnum):
ORN = 13
ENV = 19
TS = 27
DISCONNECT = 111
INFO = 99
EEG94 = 144
EEG98 = 146
EEG99S = 30
EEG99 = 62
EEG94R = 208
EEG98R = 210
CMDRCV = 192
CMDSTAT = 193
MARKER = 194
CALIBINFO = 195
[docs]class Packet:
"""An abstract base class for Explore packet"""
__metadata__ = abc.ABCMeta
def __init__(self, timestamp, payload):
"""
Gets the timestamp and payload and initializes the packet object
Args:
payload (bytearray): a byte array including binary data and fletcher
"""
self.timestamp = timestamp
[docs] @abc.abstractmethod
def _convert(self, bin_data):
"""Read the binary data and convert it to real values"""
pass
[docs] @abc.abstractmethod
def _check_fletcher(self, fletcher):
"""Checks if the fletcher is valid"""
pass
[docs] @abc.abstractmethod
def __str__(self):
"""Print the data/info"""
pass
[docs] @staticmethod
def int24to32(bin_data):
"""
converts binary data to int32
Args:
bin_data (list): list of bytes with the structure of int24
Returns:
np.ndarray of int values
"""
assert len(bin_data) % 3 == 0, "Packet length error!"
return np.asarray([int.from_bytes(bin_data[x:x + 3],
byteorder='little',
signed=True) for x in range(0, len(bin_data), 3)])
[docs] @abc.abstractmethod
def push_to_dashboard(self, dashboard):
pass
[docs]class EEG(Packet):
@abc.abstractmethod
def write_to_file(self, recorder):
"""
Write EEG data to csv file
Args:
recorder(explorepy.tools.FileRecorder): File recorder object
"""
pass
[docs] def apply_bp_filter(self, exg_filter):
"""Bandpass filtering of ExG data
Args:
exg_filter: Filter object
"""
self.data = exg_filter.apply_bp_filter(self.data)
[docs] def apply_bp_filter_noise(self, exg_filter):
"""Bandpass filtering of ExG data
Args:
exg_filter: Filter object
"""
self.data = exg_filter.apply_bp_filter_noise(self.data)
[docs] def apply_notch_filter(self, exg_filter):
"""Band_stop filtering of ExG data
Args:
exg_filter: Filter object
"""
self.data = exg_filter.apply_notch_filter(self.data)
[docs] def push_to_lsl(self, outlet):
"""Push data to lsl socket
Args:
outlet (lsl.StreamOutlet): lsl stream outlet
"""
for sample in self.data.T:
outlet.push_sample(sample.tolist())
[docs] def calculate_impedance(self, imp_calib_info):
"""
calculate impedance with the help of impedance calibration info
Args:
imp_calib_info (dict): dictionary of impedance calibration info including slope, offset and noise level
"""
mag = np.ptp(self.data, axis=1)
self.imp_data = np.round(
(mag - imp_calib_info['noise_level']) * imp_calib_info['slope'] - imp_calib_info['offset'], decimals=0)
[docs] def push_to_dashboard(self, dashboard):
n_sample = self.data.shape[1]
time_vector = np.linspace(self.timestamp, self.timestamp + (n_sample - 1) / dashboard.exg_fs, n_sample)
dashboard.doc.add_next_tick_callback(partial(dashboard.update_exg, time_vector=time_vector, ExG=self.data))
[docs] def push_to_imp_dashboard(self, dashboard, imp_calib_info):
self.calculate_impedance(imp_calib_info)
dashboard.doc.add_next_tick_callback(partial(dashboard.update_imp, imp=self.imp_data))
[docs] def write_to_file(self, recorder):
tmpstmp = np.linspace(self.timestamp, self.timestamp + (self.data.shape[1]-1)/recorder.fs,
self.data.shape[1])
recorder.write_data(np.concatenate((tmpstmp[:, np.newaxis], self.data.T), axis=1).T)
[docs]class EEG94(EEG):
"""EEG packet for 4 channel device"""
def __init__(self, timestamp, payload):
super().__init__(timestamp, payload)
self._convert(payload[:-4])
self._check_fletcher(payload[-4:])
[docs] def _convert(self, bin_data):
data = Packet.int24to32(bin_data)
n_chan = -1
v_ref = 2.4
n_packet = 33
data = data.reshape((n_packet, n_chan)).astype(np.float).T
self.data = data[1:, :] * v_ref / ((2 ** 23) - 1) / 6.
self.dataStatus = data[0, :]
[docs] def _check_fletcher(self, fletcher):
assert fletcher == b'\xaf\xbe\xad\xde', "Fletcher error!"
def __str__(self):
return "EEG: " + str(self.data[:, -1]) + "\tEEG STATUS: " + str(self.dataStatus[-1])
[docs]class EEG98(EEG):
"""EEG packet for 8 channel device"""
def __init__(self, timestamp, payload):
super().__init__(timestamp, payload)
self._convert(payload[:-4])
self._check_fletcher(payload[-4:])
[docs] def _convert(self, bin_data):
data = Packet.int24to32(bin_data)
n_chan = -1
v_ref = 2.4
n_packet = 16
data = data.reshape((n_packet, n_chan)).astype(np.float).T
self.data = data[1:, :] * v_ref / ((2 ** 23) - 1) / 6.
self.status = (hex(bin_data[0]), hex(bin_data[1]), hex(bin_data[2]))
[docs] def _check_fletcher(self, fletcher):
assert fletcher == b'\xaf\xbe\xad\xde', "Fletcher error!"
def __str__(self):
return "EEG: " + str(self.data[:, -1]) + "\tEEG STATUS: " + str(self.status)
[docs]class EEG99s(EEG):
"""EEG packet for 8 channel device"""
def __init__(self, timestamp, payload):
super().__init__(timestamp, payload)
self._convert(payload[:-4])
self._check_fletcher(payload[-4:])
[docs] def _convert(self, bin_data):
data = Packet.int24to32(bin_data)
n_chan = -1
v_ref = 4.5
n_packet = 16
data = data.reshape((n_packet, n_chan)).astype(np.float).T
self.data = data[1:, :] * v_ref / ((2 ** 23) - 1) / 6.
self.status = data[0, :]
[docs] def _check_fletcher(self, fletcher):
assert fletcher == b'\xaf\xbe\xad\xde', "Fletcher error!"
def __str__(self):
return "EEG: " + str(self.data[:, -1]) + "\tEEG STATUS: " + str(self.status)
[docs]class EEG99(EEG):
"""EEG packet for 8 channel device"""
def __init__(self, timestamp, payload):
super().__init__(timestamp, payload)
self._convert(payload[:-4])
self._check_fletcher(payload[-4:])
[docs] def _convert(self, bin_data):
data = Packet.int24to32(bin_data)
n_chan = -1
v_ref = 4.5
n_packet = 16
data = data.reshape((n_packet, n_chan)).astype(np.float).T
self.data = data * v_ref / ((2 ** 23) - 1) / 6.
[docs] def _check_fletcher(self, fletcher):
assert fletcher == b'\xaf\xbe\xad\xde', "Fletcher error!"
def __str__(self):
return "EEG: " + str(self.data[:, -1])
[docs]class Orientation(Packet):
"""Orientation data packet"""
def __init__(self, timestamp, payload):
super().__init__(timestamp, payload)
self._convert(payload[:-4])
self._check_fletcher(payload[-4:])
[docs] def _convert(self, bin_data):
data = np.copy(np.frombuffer(bin_data, dtype=np.dtype(np.int16).newbyteorder('<'))).astype(np.float)
self.acc = 0.061 * data[0:3] # Unit [mg/LSB]
self.gyro = 8.750 * data[3:6] # Unit [mdps/LSB]
self.mag = 1.52 * np.multiply (data[6:], np.array([-1, 1, 1])) # Unit [mgauss/LSB]
self.theta = None
self.rot_axis = None
[docs] def _check_fletcher(self, fletcher):
assert fletcher == b'\xaf\xbe\xad\xde', "Fletcher error!"
def __str__(self):
return "Acc: " + str(self.acc) + "\tGyro: " + str(self.gyro) + "\tMag: " + str(self.mag)
[docs] def write_to_file(self, recorder):
recorder.write_data(np.array([self.timestamp] + self.acc.tolist() +
self.gyro.tolist() + self.mag.tolist())[:, np.newaxis])
[docs] def push_to_lsl(self, outlet):
outlet.push_sample(self.acc.tolist() + self.gyro.tolist() + self.mag.tolist())
[docs] def push_to_dashboard(self, dashboard):
data = self.acc.tolist() + self.gyro.tolist() + self.mag.tolist()
dashboard.doc.add_next_tick_callback(partial(dashboard.update_orn, timestamp=self.timestamp, orn_data=data))
[docs] def compute_angle(self, matrix=None):
trace = matrix[0][0]+matrix[1][1]+matrix[2][2]
theta = np.arccos((trace-1)/2)*57.2958
nx = matrix[2][1] - matrix[1][2]
ny = matrix[0][2] - matrix[2][0]
nz = matrix[1][0] - matrix[0][1]
rot_axis = 1/np.sqrt((3-trace)*(1+trace))*np.array([nx, ny, nz])
self.theta = theta
self.rot_axis = rot_axis
return [theta, rot_axis]
[docs]class Environment(Packet):
"""Environment data packet"""
def __init__(self, timestamp, payload):
super().__init__(timestamp, payload)
self._convert(payload[:-4])
self._check_fletcher(payload[-4:])
[docs] def _convert(self, bin_data):
self.temperature = bin_data[0]
self.light = (1000 / 4095) * np.frombuffer(bin_data[1:3],
dtype=np.dtype(np.uint16).newbyteorder('<')) # Unit Lux
self.battery = (16.8 / 6.8) * (1.8 / 2457) * np.frombuffer(bin_data[3:5],
dtype=np.dtype(np.uint16).newbyteorder(
'<')) # Unit Volt
self.battery_percentage = self._volt_to_percent(self.battery)
[docs] def _check_fletcher(self, fletcher):
assert fletcher == b'\xaf\xbe\xad\xde', "Fletcher error!"
def __str__(self):
return "Temperature: " + str(self.temperature) + "\tLight: " + str(self.light) + "\tBattery: " + str(
self.battery)
[docs] def push_to_dashboard(self, dashboard):
data = {'battery': [self.battery_percentage],
'temperature': [self.temperature],
'light': [self.light]}
dashboard.doc.add_next_tick_callback(partial(dashboard.update_info, new=data))
[docs] @staticmethod
def _volt_to_percent(voltage):
if voltage < 3.1:
percentage = 1
elif voltage < 3.5:
percentage = 1 + (voltage - 3.1) / .4 * 10
elif voltage < 3.8:
percentage = 10 + (voltage - 3.5) / .3 * 40
elif voltage < 3.9:
percentage = 40 + (voltage - 3.8) / .1 * 20
elif voltage < 4.:
percentage = 60 + (voltage - 3.9) / .1 * 15
elif voltage < 4.1:
percentage = 75 + (voltage - 4.) / .1 * 15
elif voltage < 4.2:
percentage = 90 + (voltage - 4.1) / .1 * 10
elif voltage > 4.2:
percentage = 100
percentage = int(percentage)
return percentage
[docs]class TimeStamp(Packet):
"""Time stamp data packet"""
def __init__(self, timestamp, payload):
super().__init__(timestamp, payload)
self._convert(payload[:-4])
self._check_fletcher(payload[-4:])
self.raw_data = None
[docs] def _convert(self, bin_data):
self.hostTimeStamp = np.frombuffer(bin_data, dtype=np.dtype(np.uint64).newbyteorder('<'))
[docs] def _check_fletcher(self, fletcher):
assert fletcher == b'\xff\xff\xff\xff', "Fletcher error!"
[docs] def translate(self):
now = datetime.now()
timestamp = int(1000000000 * datetime.timestamp(now)) # time stamp in nanosecond
ts_str = hex(timestamp)
ts_str = ts_str[2:18]
host_ts = bytes.fromhex(ts_str)
ID = b'\x1B'
CNT = b'\x01'
payload_len = b'\x10\x00' # i.e. 0x0010
device_ts = b'\x00\x00\x00\x00'
fletcher = b'\xFF\xFF\xFF\xFF'
self.raw_data = ID + CNT + payload_len + device_ts + host_ts + fletcher
def __str__(self):
return "Host timestamp: " + str(self.hostTimeStamp)
[docs] def push_to_lsl(self, outlet):
outlet.push_sample([1])
[docs]class MarkerEvent(Packet):
"""Marker packet"""
def __init__(self, timestamp, payload):
super().__init__(timestamp, payload)
self._convert(payload[:-4])
self._check_fletcher(payload[-4:])
[docs] def _convert(self, bin_data):
self.marker_code = np.frombuffer(bin_data, dtype=np.dtype(np.uint16).newbyteorder('<'))[0]
[docs] def _check_fletcher(self, fletcher):
assert fletcher == b'\xaf\xbe\xad\xde', "Fletcher error!"
def __str__(self):
return "Event marker: " + str(self.marker_code)
[docs] def write_to_file(self, recorder):
recorder.set_marker(np.array([self.timestamp, self.marker_code])[:, np.newaxis])
[docs] def push_to_lsl(self, outlet):
outlet.push_sample([self.marker_code])
[docs] def push_to_dashboard(self, dashboard):
dashboard.doc.add_next_tick_callback(partial(dashboard.update_marker,
timestamp=self.timestamp,
code=self.marker_code))
[docs]class Disconnect(Packet):
"""Disconnect packet"""
def __init__(self, timestamp, payload):
super().__init__(timestamp, payload)
self._check_fletcher(payload)
[docs] def _convert(self, bin_data):
"""Disconnect packet has no data"""
pass
[docs] def _check_fletcher(self, fletcher):
assert fletcher == b'\xaf\xbe\xad\xde', "Fletcher error!"
def __str__(self):
return "Device has been disconnected!"
[docs]class DeviceInfo(Packet):
"""Device information packet"""
def __init__(self, timestamp, payload):
super(DeviceInfo, self).__init__(timestamp, payload)
self._convert(payload[:-4])
self._check_fletcher(payload[-4:])
[docs] def _convert(self, bin_data):
fw_num = np.frombuffer(bin_data, dtype=np.dtype(np.uint16).newbyteorder('<'), count=1, offset=0)
self.firmware_version = '.'.join([char for char in str(fw_num)[1:-1]])
self.data_rate_info = 16000/(2**bin_data[2])
self.adc_mask = bin(bin_data[3])
print(self)
[docs] def _check_fletcher(self, fletcher):
assert fletcher == b'\xaf\xbe\xad\xde', "Fletcher error!"
def __str__(self):
return "Firmware version: " + self.firmware_version + " - sampling rate: " + str(self.data_rate_info)\
+ " Hz" + " - ADC mask: " + str(self.adc_mask)
[docs] def write_to_file(self, recorder):
recorder.write_data([self.timestamp, self.firmware_version, self.data_rate_info, self.adc_mask])
[docs] def push_to_dashboard(self, dashboard):
data = {'firmware_version': [self.firmware_version]}
dashboard.doc.add_next_tick_callback(partial(dashboard.update_info, new=data))
[docs]class CommandRCV(Packet):
"""Command Status packet"""
def __init__(self, timestamp, payload):
super(CommandRCV, self).__init__(timestamp, payload)
self._convert(payload[:-4])
self._check_fletcher(payload[-4:])
[docs] def _convert(self, bin_data):
self.opcode = bin_data[0]
pass
[docs] def _check_fletcher(self, fletcher):
assert fletcher == b'\xaf\xbe\xad\xde', "Fletcher error!"
def __str__(self):
return "an acknowledge message for command with this opcode has been received: " + str(self.opcode)
[docs]class CommandStatus(Packet):
"""Command Status packet"""
def __init__(self, timestamp, payload):
super(CommandStatus, self).__init__(timestamp, payload)
self._convert(payload[:-4])
self._check_fletcher(payload[-4:])
[docs] def _convert(self, bin_data):
self.opcode = bin_data[0]
self.status = bin_data[5]
[docs] def _check_fletcher(self, fletcher):
assert fletcher == b'\xaf\xbe\xad\xde', "Fletcher error!"
def __str__(self):
return "Command status: " + str(self.status) + "\tfor command with opcode: " + str(self.opcode)
[docs]class CalibrationInfo(Packet):
"""Calibration Info packet"""
def __init__(self, timestamp, payload):
super(CalibrationInfo, self).__init__(timestamp, payload)
self._convert(payload[:-4])
self._check_fletcher(payload[-4:])
[docs] def _convert(self, bin_data):
slope = np.frombuffer(bin_data, dtype=np.dtype(np.uint16).newbyteorder('<'), count=1, offset=0)
self.slope = slope * 10.0
offset = np.frombuffer(bin_data, dtype=np.dtype(np.uint16).newbyteorder('<'), count=1, offset=2)
self.offset = offset * 0.001
[docs] def _check_fletcher(self, fletcher):
assert fletcher == b'\xaf\xbe\xad\xde', "Fletcher error!"
def __str__(self):
return "calibration info: slope = " + str(self.slope) + "\toffset = " + str(self.offset)
PACKET_CLASS_DICT = {
PACKET_ID.ORN: Orientation,
PACKET_ID.ENV: Environment,
PACKET_ID.TS: TimeStamp,
PACKET_ID.DISCONNECT: Disconnect,
PACKET_ID.INFO: DeviceInfo,
PACKET_ID.EEG94: EEG94,
PACKET_ID.EEG98: EEG98,
PACKET_ID.EEG99S: EEG99s,
PACKET_ID.EEG99: EEG99s,
PACKET_ID.EEG94R: EEG94,
PACKET_ID.EEG98R: EEG98,
PACKET_ID.CMDRCV: CommandRCV,
PACKET_ID.CMDSTAT: CommandStatus,
PACKET_ID.CALIBINFO: CalibrationInfo,
PACKET_ID.MARKER: MarkerEvent
}