Source code for parser

# -*- coding: utf-8 -*-
import numpy as np
import struct
from explorepy.packet import PACKET_ID, PACKET_CLASS_DICT, TimeStamp, EEG, Environment, CommandRCV, CommandStatus,\
                                Orientation, DeviceInfo, Disconnect, MarkerEvent, CalibrationInfo
from explorepy.filters import Filter
import copy
import time


[docs]def generate_packet(pid, timestamp, bin_data): """Generates the packets according to the pid Args: pid (int): Packet ID timestamp (int): Timestamp bin_data: Binary dat Returns: Packet """ if pid in PACKET_CLASS_DICT: packet = PACKET_CLASS_DICT[pid](timestamp, bin_data) else: print("Unknown Packet ID:" + str(pid)) print("Length of the binary data:", len(bin_data)) packet = None return packet
[docs]class Parser: def __init__(self, bp_freq=None, notch_freq=None, socket=None, fid=None): """Parser class for explore device Args: socket (BluetoothSocket): Bluetooth Socket (Should be None if fid is provided) fid (file object): File object for reading data (Should be None if socket is provided) bp_freq (tuple): Tuple of cut-off frequencies of bandpass filter (low cut-off frequency, high cut-off frequency) notch_freq (int): Notch filter frequency (50 or 60 Hz) """ self.socket = socket self.fid = fid self.dt_int16 = np.dtype(np.int16).newbyteorder('<') self.dt_uint16 = np.dtype(np.uint16).newbyteorder('<') self.time_offset = None self.events = [] self.start_timer = None if bp_freq is not None: assert bp_freq[0] < bp_freq[1], "High cut-off frequency must be larger than low cut-off frequency" self.bp_freq = bp_freq self.apply_bp_filter = True else: self.apply_bp_filter = False self.bp_freq = (0, 100) # dummy values self.notch_freq = notch_freq self._filter = None self.firmware_version = None self.fs = None self.adc_mask = None self.n_chan = None self.imp_calib_info = {} self.calibre_set = None self.init_set = None self.ED_prv = None self.theta = 0. self.axis = np.array([0, 0, -1]) self.matrix = np.identity(4) packet = None while not isinstance(packet, DeviceInfo): packet = self.parse_packet(mode=None) self.signal_dc = np.zeros((self.n_chan,), dtype=np.float) @property def filter(self): if self._filter is None: self._init_filters() return self._filter
[docs] def parse_packet(self, mode="print", recorders=None, outlets=None, dashboard=None): """Reads and parses a package from a file or socket Args: mode (str): Parsing mode {'print', 'record', 'lsl', 'visualize', 'impedance', None} recorders (tuple): Tuple of recorder objects (ExG_recorder, ORN_recorder, Marker_recorder) outlets (tuple): Tuple of lsl StreamOutlet (orientation_outlet, EEG_outlet, marker_outlet) dashboard (Dashboard): Dashboard object for visualization Returns: packet object """ pid = struct.unpack('B', self.read(1))[0] cnt = self.read(1)[0] payload = struct.unpack('<H', self.read(2))[0] timestamp = struct.unpack('<I', self.read(4))[0] # Timestamp conversion if self.time_offset is None: self.time_offset = timestamp * .0001 timestamp = 0 self.start_timer = time.time() else: timestamp = timestamp * .0001 - self.time_offset # Timestamp unit is .1 ms payload_data = self.read(payload - 4) packet = generate_packet(pid, timestamp, payload_data) if isinstance(packet, DeviceInfo): self.firmware_version = packet.firmware_version self.fs = int(packet.data_rate_info) self.adc_mask = packet.adc_mask self.n_chan = self.adc_mask.count('1') if mode == "print": print(packet) elif mode == "record": assert isinstance(recorders, tuple), "Invalid writer objects!" if isinstance(packet, Orientation): packet.write_to_file(recorders[1]) elif isinstance(packet, EEG): packet.write_to_file(recorders[0]) elif isinstance(packet, MarkerEvent): packet.write_to_file(recorders[2]) for event in self.events: event.write_to_file(recorders[2]) self.events = [] elif mode == "lsl": if isinstance(packet, Orientation): packet.push_to_lsl(outlets[0]) elif isinstance(packet, EEG): packet.push_to_lsl(outlets[1]) elif isinstance(packet, MarkerEvent): packet.push_to_lsl(outlets[2]) elif mode == "visualize": if isinstance(packet, EEG): if self.notch_freq: packet.apply_notch_filter(exg_filter=self.filter) if self.apply_bp_filter: packet.apply_bp_filter(exg_filter=self.filter) # remove DC # n_samples = packet.data.shape[1] # for column in range(n_samples): # self.signal_dc = (self.bp_freq[0] / (self.fs*0.5)) * packet.data[:, column] + ( # 1 - (self.bp_freq[0] / (self.fs*0.5))) * self.signal_dc # packet.data[:, column] = packet.data[:, column] - self.signal_dc if isinstance(packet, Orientation) and self.calibre_set is not None: packet = self._compute_NED(packet) packet.push_to_dashboard(dashboard) elif mode == "listen": if isinstance(packet, CommandRCV): print(packet) elif isinstance(packet, CommandStatus): print(packet) elif mode == "debug": if isinstance(packet, EEG): print(packet) elif mode == "impedance": if isinstance(packet, EEG): if self.notch_freq: packet.apply_notch_filter(exg_filter=self.filter) if self.apply_bp_filter: temp_packet = copy.deepcopy(packet) temp_packet.apply_bp_filter_noise(exg_filter=self.filter) mag = np.ptp(temp_packet.data, axis=1) self.imp_calib_info['noise_level'] = mag packet.apply_bp_filter(exg_filter=self.filter) packet.push_to_imp_dashboard(dashboard, self.imp_calib_info) elif isinstance(packet, Environment) | isinstance(packet, DeviceInfo): packet.push_to_dashboard(dashboard) elif mode == "initialize": if isinstance(packet, Orientation): D = packet.acc / (np.dot(packet.acc, packet.acc) ** 0.5) [kx, ky, kz, mx_offset, my_offset, mz_offset] = self.calibre_set packet.mag[0] = kx * (packet.mag[0] - mx_offset) packet.mag[1] = ky * (packet.mag[1] - my_offset) packet.mag[2] = kz * (packet.mag[2] - mz_offset) E = -1*np.cross(D, packet.mag) E = E / (np.dot(E, E) ** 0.5) # here you can find an estimation of actual north from packet.mag, it is perpendicular to D and still # co-planar with D and mag, somehow reducing error N = -1*np.cross(E, D) N = N / (np.dot(N, N) ** 0.5) T_init = np.column_stack((E, N, D)) N_init = np.matmul(np.transpose(T_init), N) E_init = np.matmul(np.transpose(T_init), E) D_init = np.matmul(np.transpose(T_init), D) self.init_set = [T_init, N_init, E_init, D_init] self.ED_prv = [E, D] return packet
[docs] def read(self, n_bytes): """Read n_bytes from socket or file Args: n_bytes (int): number of bytes to be read Returns: list of bytes """ if self.socket is not None: try: byte_data = self.socket.recv(n_bytes) except ValueError as e: raise ConnectionAbortedError(e.__str__()) elif not self.fid.closed: byte_data = self.fid.read(n_bytes) else: raise ValueError("File has been closed unexpectedly!") if len(byte_data) != n_bytes: raise ValueError("Number of received bytes is less than expected") # TODO: Create a specific exception for this case return byte_data
[docs] def set_marker(self, marker_code): if type(marker_code) is not int: raise TypeError('Marker code must be an integer!') if 0 <= marker_code <= 7: raise ValueError('Marker code value is not valid') self.events.append(MarkerEvent(timestamp=time.time()-self.start_timer, payload=bytearray(struct.pack('<H', marker_code) + b'\xaf\xbe\xad\xde')))
[docs] def _init_filters(self): if self.apply_bp_filter or self.notch_freq: self._filter = Filter(l_freq=self.bp_freq[0], h_freq=self.bp_freq[1], line_freq=self.notch_freq, sampling_freq=self.fs)
[docs] def _compute_NED(self, packet): [kx, ky, kz, mx_offset, my_offset, mz_offset] = self.calibre_set D_prv = self.ED_prv[1] E_prv = self.ED_prv[0] acc = packet.acc acc = acc / (np.dot(acc, acc) ** 0.5) gyro = packet.gyro * 1.745329e-5 # radian per second packet.mag[0] = kx * (packet.mag[0] - mx_offset) packet.mag[1] = ky * (packet.mag[1] - my_offset) packet.mag[2] = kz * (packet.mag[2] - mz_offset) mag = packet.mag D = acc dD = D-D_prv da = np.cross(D_prv, dD) E = -1*np.cross(D, mag) E = E / (np.dot(E, E) ** 0.5) dE = E-E_prv dm = np.cross(E_prv, dE) dg = 0.05 * gyro dth = -0.95 * dg + 0.025 * da + 0.025 * dm D = D_prv + np.cross(dth, D_prv) D = D / (np.dot(D, D) ** 0.5) Err = np.dot(D, E) D_tmp = D - 0.5*Err*E E_tmp = E - 0.5*Err*D D = D_tmp / (np.dot(D_tmp, D_tmp) ** 0.5) E = E_tmp / (np.dot(E_tmp, E_tmp) ** 0.5) N = -1*np.cross(E, D) N = N / (np.dot(N, N) ** 0.5) ''' If you comment this block it will give you the absolute orientation based on {East,North,Up} coordinate system. If you keep this block of code it will give you the relative orientation based on itial state of the device. so It is important to keep the device steady, so that the device can capture the initial direction properly. ''' ########################## T = np.zeros((3,3)) T_init = self.init_set[0] N_init = self.init_set[1] E_init = self.init_set[2] D_init = self.init_set[3] T = np.column_stack((E, N, D)) T_test = np.matmul(T, T_init.transpose()) N = np.matmul(T_test.transpose(), N_init) E = np.matmul(T_test.transpose(), E_init) D = np.matmul(T_test.transpose(), D_init) ########################## matrix = np.identity(4) matrix[0][0] = E[0] matrix[0][1] = N[0] matrix[0][2] = D[0] matrix[1][0] = E[1] matrix[1][1] = N[1] matrix[1][2] = D[1] matrix[2][0] = E[2] matrix[2][1] = N[2] matrix[2][2] = D[2] N = N / (np.dot(N, N) ** 0.5) E = E / (np.dot(E, E) ** 0.5) D = D / (np.dot(D, D) ** 0.5) self.ED_prv = [E, D] self.matrix = self.matrix*0.9+0.1*matrix [theta, rot_axis] = packet.compute_angle(matrix=self.matrix) self.theta = self.theta*0.9+0.1*theta packet.theta = self.theta self.axis = self.axis*0.9+0.1*rot_axis packet.rot_axis = self.axis return packet