Source code for explore

# -*- coding: utf-8 -*-
from explorepy.bt_client import BtClient
from explorepy.parser import Parser
from explorepy.dashboard.dashboard import Dashboard
from explorepy._exceptions import *
from explorepy.packet import CommandRCV, CommandStatus, CalibrationInfo, DeviceInfo
from import FileRecorder
import csv
import os
import time
import signal
import sys
from pylsl import StreamInfo, StreamOutlet
from threading import Thread, Timer

[docs]class Explore: r"""Mentalab Explore device""" def __init__(self, n_device=1): r""" Args: n_device (int): Number of devices to be connected """ self.device = [] self.socket = None self.parser = None self.m_dashboard = None for i in range(n_device): self.device.append(BtClient()) self.is_connected = False self.is_acquiring = None
[docs] def connect(self, device_name=None, device_addr=None, device_id=0): r""" Connects to the nearby device. If there are more than one device, the user is asked to choose one of them. Args: device_name (str): Device name in the format of "Explore_XXXX" device_addr (str): The MAC address in format "XX:XX:XX:XX:XX:XX" Either Address or name should be in the input device_id (int): device id (not needed in the current version) """ self.device[device_id].init_bt(device_name=device_name, device_addr=device_addr) if self.socket is None: self.socket = self.device[device_id].bt_connect() if self.parser is None: self.parser = Parser(socket=self.socket) self.is_connected = True packet = None
[docs] def disconnect(self, device_id=None): r"""Disconnects from the device Args: device_id (int): device id (not needed in the current version) """ self.device[device_id].socket.close() self.is_connected = False
[docs] def acquire(self, device_id=0, duration=None): r"""Start getting data from the device Args: device_id (int): device id (not needed in the current version) duration (float): duration of acquiring data (if None it streams data endlessly) """ assert self.is_connected, "Explore device is not connected. Please connect the device first." is_acquiring = [True] def stop_acquiring(flag): flag[0] = False if duration is not None: Timer(duration, stop_acquiring, [is_acquiring]).start() print("Start acquisition for ", duration, " seconds...") while is_acquiring[0]: try: self.parser.parse_packet(mode="print") except ConnectionAbortedError: print("Device has been disconnected! Scanning for last connected device...") try: self.parser.socket = self.device[device_id].bt_connect() except DeviceNotFoundError as e: print(e) return 0 print("Data acquisition stopped after ", duration, " seconds.")
[docs] def record_data(self, file_name, do_overwrite=False, device_id=0, duration=None, file_type='csv'): r"""Records the data in real-time Args: file_name (str): Output file name device_id (int): Device id (not needed in the current version) do_overwrite (bool): Overwrite if files exist already duration (float): Duration of recording in seconds (if None records endlessly). file_type (str): File type of the recorded file. Supported file types: 'csv', 'edf' """ assert self.is_connected, "Explore device is not connected. Please connect the device first." # Check invalid characters if set(r'<>{}[]~`*%').intersection(file_name): raise ValueError("Invalid character in file name") n_chan = self.parser.n_chan if file_type not in ['edf', 'csv']: raise ValueError('{} is not a supported file extension!'.format(file_type)) time_offset = None exg_out_file = file_name + "_ExG" orn_out_file = file_name + "_ORN" marker_out_file = file_name + "_Marker" exg_ch = ['TimeStamp', 'ch1', 'ch2', 'ch3', 'ch4', 'ch5', 'ch6', 'ch7', 'ch8'][0:n_chan+1] exg_unit = ['s', 'V', 'V', 'V', 'V', 'V', 'V', 'V', 'V'][0:n_chan+1] exg_max = [86400, .4, .4, .4, .4, .4, .4, .4, .4][0:n_chan + 1] exg_min = [0, -.4, -.4, -.4, -.4, -.4, -.4, -.4, -.4][0:n_chan + 1] exg_recorder = FileRecorder(file_name=exg_out_file, ch_label=exg_ch, fs=self.parser.fs, ch_unit=exg_unit, file_type=file_type, do_overwrite=do_overwrite, ch_min=exg_min, ch_max=exg_max) orn_ch = ['TimeStamp', 'ax', 'ay', 'az', 'gx', 'gy', 'gz', 'mx', 'my', 'mz'] orn_unit = ['s', 'mg', 'mg', 'mg', 'mdps', 'mdps', 'mdps', 'mgauss', 'mgauss', 'mgauss'] orn_max = [86400, 2000, 2000, 2000, 250000, 250000, 250000, 50000, 50000, 50000] orn_min = [0, -2000, -2000, -2000, -250000, -250000, -250000, -50000, -50000, -50000] orn_recorder = FileRecorder(file_name=orn_out_file, ch_label=orn_ch, fs=20, ch_unit=orn_unit, file_type=file_type, do_overwrite=do_overwrite, ch_min=orn_min, ch_max=orn_max) if file_type == 'csv': marker_ch = ['TimeStamp', 'Code'] marker_unit = ['s', '-'] marker_recorder = FileRecorder(file_name=marker_out_file, ch_label=marker_ch, fs=0, ch_unit=marker_unit, file_type=file_type, do_overwrite=do_overwrite) elif file_type == 'edf': marker_recorder = exg_recorder is_acquiring = [True] def stop_acquiring(flag): flag[0] = False if duration is not None: if duration <= 0: raise ValueError("Recording time must be a positive number!") rec_timer = Timer(duration, stop_acquiring, [is_acquiring]) rec_timer.start() print("Start recording for ", duration, " seconds...") else: print("Recording...") is_disconnect_occurred = False while is_acquiring[0]: try: packet = self.parser.parse_packet(mode="record", recorders=(exg_recorder, orn_recorder, marker_recorder)) if time_offset is not None: packet.timestamp = packet.timestamp-time_offset else: time_offset = packet.timestamp except ConnectionAbortedError: print("Device has been disconnected! Scanning for last connected device...") try: self.parser.socket = self.device[device_id].bt_connect() except DeviceNotFoundError as e: print(e) rec_timer.cancel() return 0 if is_disconnect_occurred: print("Error: Recording finished before ", duration, "seconds.") rec_timer.cancel() else: print("Recording finished after ", duration, " seconds.") exg_recorder.stop() orn_recorder.stop() if file_type == 'csv': marker_recorder.stop()
[docs] def push2lsl(self, device_id=0, duration=None): r"""Push samples to two lsl streams Args: device_id (int): device id (not needed in the current version) duration (float): duration of data acquiring (if None it streams endlessly). """ assert self.is_connected, "Explore device is not connected. Please connect the device first." info_orn = StreamInfo('Explore', 'Orientation', 9, 20, 'float32', 'ORN') info_exg = StreamInfo('Explore', 'ExG', self.parser.n_chan, self.parser.fs, 'float32', 'ExG') info_marker = StreamInfo('Explore', 'Markers', 1, 0, 'int32', 'Marker') orn_outlet = StreamOutlet(info_orn) exg_outlet = StreamOutlet(info_exg) marker_outlet = StreamOutlet(info_marker) is_acquiring = [True] def stop_acquiring(flag): flag[0] = False if duration is not None: Timer(duration, stop_acquiring, [is_acquiring]).start() print("Start pushing to lsl for ", duration, " seconds...") else: print("Pushing to lsl...") while is_acquiring[0]: try: self.parser.parse_packet(mode="lsl", outlets=(orn_outlet, exg_outlet, marker_outlet)) except ConnectionAbortedError: print("Device has been disconnected! Scanning for last connected device...") try: self.parser.socket = self.device[device_id].bt_connect() except DeviceNotFoundError as e: print(e) return 0 print("Data acquisition finished after ", duration, " seconds.")
[docs] def visualize(self, device_id=0, bp_freq=(1, 30), notch_freq=50, calibre_file=None): r"""Visualization of the signal in the dashboard Args: device_id (int): Device ID (not needed in the current version) bp_freq (tuple): Bandpass filter cut-off frequencies (low_cutoff_freq, high_cutoff_freq), No bandpass filter if it is None. notch_freq (int): Line frequency for notch filter (50 or 60 Hz), No notch filter if it is None calibre_file (str): Calibration data file name """ import numpy as np assert self.is_connected, "Explore device is not connected. Please connect the device first." if calibre_file is not None: with open(calibre_file, "r") as f_calibre: csv_reader_calibre = csv.reader(f_calibre, delimiter=",") calibre_set = list(csv_reader_calibre) self.parser.calibre_set = np.asarray(calibre_set[1], dtype=np.float64) self.parser.notch_freq = notch_freq if bp_freq is not None: self.parser.apply_bp_filter = True self.parser.bp_freq = bp_freq self.m_dashboard = Dashboard(n_chan=self.parser.n_chan, exg_fs=self.parser.fs, firmware_version=self.parser.firmware_version) self.m_dashboard.start_server() thread = Thread(target=self._io_loop) thread.setDaemon(True) thread.start() self.m_dashboard.start_loop()
[docs] def _io_loop(self, device_id=0, mode="visualize"): self.is_acquiring = [True] if self.parser.calibre_set is not None: is_initialized = False else: is_initialized = True # flag as True since it doesn't matter and we skip orientation calculation process # Wait until dashboard is initialized. while not hasattr(self.m_dashboard, 'doc'): print('wait...') time.sleep(.5) while self.is_acquiring[0]: if is_initialized: try: packet = self.parser.parse_packet(mode=mode, dashboard=self.m_dashboard) except ConnectionAbortedError: print("Device has been disconnected! Scanning for last connected device...") try: self.parser.socket = self.device[device_id].bt_connect() except DeviceNotFoundError as e: print(e) self.is_acquiring[0] = False if mode == "visualize": os._exit(0) else: try: packet = self.parser.parse_packet(mode="initialize", dashboard=self.m_dashboard) if hasattr(packet, 'acc'): if self.parser.init_set is not None: is_initialized = True except ConnectionAbortedError: print("Device has been disconnected! Scanning for last connected device...") try: self.parser.socket = self.device[device_id].bt_connect() except DeviceNotFoundError as e: print(e) self.is_acquiring[0] = False if mode == "visualize": os._exit(0) os.exit(0)
[docs] def signal_handler(self, signal, frame): # Safe handler of keyboardInterrupt self.is_acquiring = [False] print("Program is exiting...") sys.exit(0)
[docs] def measure_imp(self, device_id=0, notch_freq=50): """ Visualization of the electrode impedances Args: device_id (int): Device ID notch_freq (int): Notch frequency for filtering the line noise (50 or 60 Hz) """ assert self.is_connected, "Explore device is not connected. Please connect the device first." assert self.parser.fs == 250, "Impedance mode only works in 250 Hz sampling rate!" self.is_acquiring = [True] signal.signal(signal.SIGINT, self.signal_handler) try: thread = Thread(target=self._io_loop, args=(device_id, "impedance",)) thread.setDaemon(True) self.parser.apply_bp_filter = True self.parser.bp_freq = (61, 64) self.parser.notch_freq = notch_freq thread.start() # Activate impedance measurement mode in the device from explorepy import command imp_activate_cmd = command.ZmeasurementEnable() if self.change_settings(imp_activate_cmd): self.m_dashboard = Dashboard(n_chan=self.parser.n_chan, mode="impedance", exg_fs=self.parser.fs, firmware_version=self.parser.firmware_version) self.m_dashboard.start_server() self.m_dashboard.start_loop() else: os._exit(0) finally: print("Disabling impedance mode...") from explorepy import command imp_deactivate_cmd = command.ZmeasurementDisable() self.change_settings(imp_deactivate_cmd) sys.exit(0)
[docs] def set_marker(self, code): """Sets an event marker during the recording Args: code (int): Marker code. It must be an integer larger than 7 (codes from 0 to 7 are reserved for hardware markers). """ assert self.is_connected, "Explore device is not connected. Please connect the device first." self.parser.set_marker(marker_code=code)
[docs] def change_settings(self, command, device_id=0): """ sends a message to the device Args: device_id (int): Device ID command (explorepy.command.Command): Command object Returns: """ from explorepy.command import send_command assert self.is_connected, "Explore device is not connected. Please connect the device first." sending_attempt = 5 while sending_attempt: try: sending_attempt = sending_attempt-1 time.sleep(0.1) send_command(command, self.socket) sending_attempt = 0 except ConnectionAbortedError: print("Device has been disconnected! Scanning for last connected device...") try: self.parser.socket = self.device[device_id].bt_connect() except DeviceNotFoundError as e: print(e) return 0 is_listening = [True] command_processed = False def stop_listening(flag): flag[0] = False waiting_time = 10 command_timer = Timer(waiting_time, stop_listening, [is_listening]) command_timer.start() print("waiting for ack and status messages...") while is_listening[0]: try: packet = self.parser.parse_packet(mode="listen") if isinstance(packet, CommandRCV): temp = command.int2bytearray(packet.opcode, 1) if command.int2bytearray(packet.opcode, 1) == command.opcode.value: print("The opcode matches the sent command, Explore has received the command") if isinstance(packet, CalibrationInfo): self.parser.imp_calib_info['slope'] = packet.slope self.parser.imp_calib_info['offset'] = packet.offset if isinstance(packet, CommandStatus): if command.int2bytearray(packet.opcode, 1) == command.opcode.value: command_processed = True is_listening = [False] command_timer.cancel() print("The opcode matches the sent command, Explore has processed the command") return True except ConnectionAbortedError: print("Device has been disconnected! Scanning for last connected device...") try: self.parser.socket = self.device[device_id].bt_connect() except DeviceNotFoundError as e: print(e) return 0 if not command_processed: print("No status message has been received after ", waiting_time, " seconds. Please restart the device and " "send the command again.") return False
[docs] def calibrate_orn(self, file_name, device_id=0, do_overwrite=False): r"""Calibrate the orientation module of the specified device Args: device_id (int): device id file_name (str): filename to be used for calibration. If you pass this parameter, ORN module should be ACTIVE! do_overwrite (bool): Overwrite if files exist already """ print("Start recording for 100 seconds, please move the device around during this time, in all directions") self.record_data(file_name, do_overwrite=do_overwrite, device_id=device_id, duration=100, file_type='csv') calibre_out_file = file_name + "_calibre_coef.csv" assert not (os.path.isfile(calibre_out_file) and do_overwrite), calibre_out_file + " already exists!" import numpy as np with open((file_name + "_ORN.csv"), "r") as f_set, open(calibre_out_file, "w") as f_coef: f_coef.write("kx, ky, kz, mx_offset, my_offset, mz_offset\n") csv_reader = csv.reader(f_set, delimiter=",") csv_coef = csv.writer(f_coef, delimiter=",") np_set = list(csv_reader) np_set = np.array(np_set[1:], dtype=np.float) mag_set_x = np.sort(np_set[:, -3]) mag_set_y = np.sort(np_set[:, -2]) mag_set_z = np.sort(np_set[:, -1]) mx_offset = 0.5 * (mag_set_x[0] + mag_set_x[-1]) my_offset = 0.5 * (mag_set_y[0] + mag_set_y[-1]) mz_offset = 0.5 * (mag_set_z[0] + mag_set_z[-1]) kx = 0.5 * (mag_set_x[-1] - mag_set_x[0]) ky = 0.5 * (mag_set_y[-1] - mag_set_y[0]) kz = 0.5 * (mag_set_z[-1] - mag_set_z[0]) k = np.sort(np.array([kx, ky, kz])) kx = 1 / kx ky = 1 / ky kz = 1 / kz calibre_set = np.array([kx, ky, kz, mx_offset, my_offset, mz_offset]) csv_coef.writerow(calibre_set) f_set.close() f_coef.close() os.remove((file_name + "_ORN.csv")) os.remove((file_name + "_ExG.csv")) os.remove((file_name + "_Marker.csv"))
if __name__ == '__main__': pass