Source code for explore

# -*- coding: utf-8 -*-
from explorepy.bt_client import BtClient
from explorepy.parser import Parser
from explorepy.dashboard.dashboard import Dashboard
from explorepy._exceptions import *
from explorepy.packet import CommandRCV, CommandStatus, CalibrationInfo, DeviceInfo
from explorepy.tools import FileRecorder
import csv
import os
import time
import signal
import sys
from pylsl import StreamInfo, StreamOutlet
from threading import Thread, Timer


[docs]class Explore: r"""Mentalab Explore device""" def __init__(self, n_device=1): r""" Args: n_device (int): Number of devices to be connected """ self.device = [] self.socket = None self.parser = None self.m_dashboard = None for i in range(n_device): self.device.append(BtClient()) self.is_connected = False self.is_acquiring = None
[docs] def connect(self, device_name=None, device_addr=None, device_id=0): r""" Connects to the nearby device. If there are more than one device, the user is asked to choose one of them. Args: device_name (str): Device name in the format of "Explore_XXXX" device_addr (str): The MAC address in format "XX:XX:XX:XX:XX:XX" Either Address or name should be in the input device_id (int): device id (not needed in the current version) """ self.device[device_id].init_bt(device_name=device_name, device_addr=device_addr) if self.socket is None: self.socket = self.device[device_id].bt_connect() if self.parser is None: self.parser = Parser(socket=self.socket) self.is_connected = True packet = None
[docs] def disconnect(self, device_id=None): r"""Disconnects from the device Args: device_id (int): device id (not needed in the current version) """ self.device[device_id].socket.close() self.is_connected = False
[docs] def acquire(self, device_id=0, duration=None): r"""Start getting data from the device Args: device_id (int): device id (not needed in the current version) duration (float): duration of acquiring data (if None it streams data endlessly) """ assert self.is_connected, "Explore device is not connected. Please connect the device first." is_acquiring = [True] def stop_acquiring(flag): flag[0] = False if duration is not None: Timer(duration, stop_acquiring, [is_acquiring]).start() print("Start acquisition for ", duration, " seconds...") while is_acquiring[0]: try: self.parser.parse_packet(mode="print") except ConnectionAbortedError: print("Device has been disconnected! Scanning for last connected device...") try: self.parser.socket = self.device[device_id].bt_connect() except DeviceNotFoundError as e: print(e) return 0 print("Data acquisition stopped after ", duration, " seconds.")
[docs] def record_data(self, file_name, do_overwrite=False, device_id=0, duration=None, file_type='csv'): r"""Records the data in real-time Args: file_name (str): Output file name device_id (int): Device id (not needed in the current version) do_overwrite (bool): Overwrite if files exist already duration (float): Duration of recording in seconds (if None records endlessly). file_type (str): File type of the recorded file. Supported file types: 'csv', 'edf' """ assert self.is_connected, "Explore device is not connected. Please connect the device first." # Check invalid characters if set(r'<>{}[]~`*%').intersection(file_name): raise ValueError("Invalid character in file name") n_chan = self.parser.n_chan if file_type not in ['edf', 'csv']: raise ValueError('{} is not a supported file extension!'.format(file_type)) time_offset = None exg_out_file = file_name + "_ExG" orn_out_file = file_name + "_ORN" marker_out_file = file_name + "_Marker" exg_ch = ['TimeStamp', 'ch1', 'ch2', 'ch3', 'ch4', 'ch5', 'ch6', 'ch7', 'ch8'][0:n_chan+1] exg_unit = ['s', 'V', 'V', 'V', 'V', 'V', 'V', 'V', 'V'][0:n_chan+1] exg_max = [86400, .4, .4, .4, .4, .4, .4, .4, .4][0:n_chan + 1] exg_min = [0, -.4, -.4, -.4, -.4, -.4, -.4, -.4, -.4][0:n_chan + 1] exg_recorder = FileRecorder(file_name=exg_out_file, ch_label=exg_ch, fs=self.parser.fs, ch_unit=exg_unit, file_type=file_type, do_overwrite=do_overwrite, ch_min=exg_min, ch_max=exg_max) orn_ch = ['TimeStamp', 'ax', 'ay', 'az', 'gx', 'gy', 'gz', 'mx', 'my', 'mz'] orn_unit = ['s', 'mg', 'mg', 'mg', 'mdps', 'mdps', 'mdps', 'mgauss', 'mgauss', 'mgauss'] orn_max = [86400, 2000, 2000, 2000, 250000, 250000, 250000, 50000, 50000, 50000] orn_min = [0, -2000, -2000, -2000, -250000, -250000, -250000, -50000, -50000, -50000] orn_recorder = FileRecorder(file_name=orn_out_file, ch_label=orn_ch, fs=20, ch_unit=orn_unit, file_type=file_type, do_overwrite=do_overwrite, ch_min=orn_min, ch_max=orn_max) if file_type == 'csv': marker_ch = ['TimeStamp', 'Code'] marker_unit = ['s', '-'] marker_recorder = FileRecorder(file_name=marker_out_file, ch_label=marker_ch, fs=0, ch_unit=marker_unit, file_type=file_type, do_overwrite=do_overwrite) elif file_type == 'edf': marker_recorder = exg_recorder is_acquiring = [True] def stop_acquiring(flag): flag[0] = False if duration is not None: if duration <= 0: raise ValueError("Recording time must be a positive number!") rec_timer = Timer(duration, stop_acquiring, [is_acquiring]) rec_timer.start() print("Start recording for ", duration, " seconds...") else: print("Recording...") is_disconnect_occurred = False while is_acquiring[0]: try: packet = self.parser.parse_packet(mode="record", recorders=(exg_recorder, orn_recorder, marker_recorder)) if time_offset is not None: packet.timestamp = packet.timestamp-time_offset else: time_offset = packet.timestamp except ConnectionAbortedError: print("Device has been disconnected! Scanning for last connected device...") try: self.parser.socket = self.device[device_id].bt_connect() except DeviceNotFoundError as e: print(e) rec_timer.cancel() return 0 if is_disconnect_occurred: print("Error: Recording finished before ", duration, "seconds.") rec_timer.cancel() else: print("Recording finished after ", duration, " seconds.") exg_recorder.stop() orn_recorder.stop() if file_type == 'csv': marker_recorder.stop()
[docs] def push2lsl(self, device_id=0, duration=None): r"""Push samples to two lsl streams Args: device_id (int): device id (not needed in the current version) duration (float): duration of data acquiring (if None it streams endlessly). """ assert self.is_connected, "Explore device is not connected. Please connect the device first." info_orn = StreamInfo('Explore', 'Orientation', 9, 20, 'float32', 'ORN') info_exg = StreamInfo('Explore', 'ExG', self.parser.n_chan, self.parser.fs, 'float32', 'ExG') info_marker = StreamInfo('Explore', 'Markers', 1, 0, 'int32', 'Marker') orn_outlet = StreamOutlet(info_orn) exg_outlet = StreamOutlet(info_exg) marker_outlet = StreamOutlet(info_marker) is_acquiring = [True] def stop_acquiring(flag): flag[0] = False if duration is not None: Timer(duration, stop_acquiring, [is_acquiring]).start() print("Start pushing to lsl for ", duration, " seconds...") else: print("Pushing to lsl...") while is_acquiring[0]: try: self.parser.parse_packet(mode="lsl", outlets=(orn_outlet, exg_outlet, marker_outlet)) except ConnectionAbortedError: print("Device has been disconnected! Scanning for last connected device...") try: self.parser.socket = self.device[device_id].bt_connect() except DeviceNotFoundError as e: print(e) return 0 print("Data acquisition finished after ", duration, " seconds.")
[docs] def visualize(self, device_id=0, bp_freq=(1, 30), notch_freq=50, calibre_file=None): r"""Visualization of the signal in the dashboard Args: device_id (int): Device ID (not needed in the current version) bp_freq (tuple): Bandpass filter cut-off frequencies (low_cutoff_freq, high_cutoff_freq), No bandpass filter if it is None. notch_freq (int): Line frequency for notch filter (50 or 60 Hz), No notch filter if it is None calibre_file (str): Calibration data file name """ import numpy as np assert self.is_connected, "Explore device is not connected. Please connect the device first." if calibre_file is not None: with open(calibre_file, "r") as f_calibre: csv_reader_calibre = csv.reader(f_calibre, delimiter=",") calibre_set = list(csv_reader_calibre) self.parser.calibre_set = np.asarray(calibre_set[1], dtype=np.float64) self.parser.notch_freq = notch_freq if bp_freq is not None: self.parser.apply_bp_filter = True self.parser.bp_freq = bp_freq self.m_dashboard = Dashboard(n_chan=self.parser.n_chan, exg_fs=self.parser.fs, firmware_version=self.parser.firmware_version) self.m_dashboard.start_server() thread = Thread(target=self._io_loop) thread.setDaemon(True) thread.start() self.m_dashboard.start_loop()
[docs] def _io_loop(self, device_id=0, mode="visualize"): self.is_acquiring = [True] if self.parser.calibre_set is not None: is_initialized = False else: is_initialized = True # flag as True since it doesn't matter and we skip orientation calculation process # Wait until dashboard is initialized. while not hasattr(self.m_dashboard, 'doc'): print('wait...') time.sleep(.5) while self.is_acquiring[0]: if is_initialized: try: packet = self.parser.parse_packet(mode=mode, dashboard=self.m_dashboard) except ConnectionAbortedError: print("Device has been disconnected! Scanning for last connected device...") try: self.parser.socket = self.device[device_id].bt_connect() except DeviceNotFoundError as e: print(e) self.is_acquiring[0] = False if mode == "visualize": os._exit(0) else: try: packet = self.parser.parse_packet(mode="initialize", dashboard=self.m_dashboard) if hasattr(packet, 'acc'): if self.parser.init_set is not None: is_initialized = True except ConnectionAbortedError: print("Device has been disconnected! Scanning for last connected device...") try: self.parser.socket = self.device[device_id].bt_connect() except DeviceNotFoundError as e: print(e) self.is_acquiring[0] = False if mode == "visualize": os._exit(0) os.exit(0)
[docs] def signal_handler(self, signal, frame): # Safe handler of keyboardInterrupt self.is_acquiring = [False] print("Program is exiting...") sys.exit(0)
[docs] def measure_imp(self, device_id=0, notch_freq=50): """ Visualization of the electrode impedances Args: device_id (int): Device ID notch_freq (int): Notch frequency for filtering the line noise (50 or 60 Hz) """ assert self.is_connected, "Explore device is not connected. Please connect the device first." assert self.parser.fs == 250, "Impedance mode only works in 250 Hz sampling rate!" self.is_acquiring = [True] signal.signal(signal.SIGINT, self.signal_handler) try: thread = Thread(target=self._io_loop, args=(device_id, "impedance",)) thread.setDaemon(True) self.parser.apply_bp_filter = True self.parser.bp_freq = (61, 64) self.parser.notch_freq = notch_freq thread.start() # Activate impedance measurement mode in the device from explorepy import command imp_activate_cmd = command.ZmeasurementEnable() if self.change_settings(imp_activate_cmd): self.m_dashboard = Dashboard(n_chan=self.parser.n_chan, mode="impedance", exg_fs=self.parser.fs, firmware_version=self.parser.firmware_version) self.m_dashboard.start_server() self.m_dashboard.start_loop() else: os._exit(0) finally: print("Disabling impedance mode...") from explorepy import command imp_deactivate_cmd = command.ZmeasurementDisable() self.change_settings(imp_deactivate_cmd) sys.exit(0)
[docs] def set_marker(self, code): """Sets an event marker during the recording Args: code (int): Marker code. It must be an integer larger than 7 (codes from 0 to 7 are reserved for hardware markers). """ assert self.is_connected, "Explore device is not connected. Please connect the device first." self.parser.set_marker(marker_code=code)
[docs] def change_settings(self, command, device_id=0): """ sends a message to the device Args: device_id (int): Device ID command (explorepy.command.Command): Command object Returns: """ from explorepy.command import send_command assert self.is_connected, "Explore device is not connected. Please connect the device first." sending_attempt = 5 while sending_attempt: try: sending_attempt = sending_attempt-1 time.sleep(0.1) send_command(command, self.socket) sending_attempt = 0 except ConnectionAbortedError: print("Device has been disconnected! Scanning for last connected device...") try: self.parser.socket = self.device[device_id].bt_connect() except DeviceNotFoundError as e: print(e) return 0 is_listening = [True] command_processed = False def stop_listening(flag): flag[0] = False waiting_time = 10 command_timer = Timer(waiting_time, stop_listening, [is_listening]) command_timer.start() print("waiting for ack and status messages...") while is_listening[0]: try: packet = self.parser.parse_packet(mode="listen") if isinstance(packet, CommandRCV): temp = command.int2bytearray(packet.opcode, 1) if command.int2bytearray(packet.opcode, 1) == command.opcode.value: print("The opcode matches the sent command, Explore has received the command") if isinstance(packet, CalibrationInfo): self.parser.imp_calib_info['slope'] = packet.slope self.parser.imp_calib_info['offset'] = packet.offset if isinstance(packet, CommandStatus): if command.int2bytearray(packet.opcode, 1) == command.opcode.value: command_processed = True is_listening = [False] command_timer.cancel() print("The opcode matches the sent command, Explore has processed the command") return True except ConnectionAbortedError: print("Device has been disconnected! Scanning for last connected device...") try: self.parser.socket = self.device[device_id].bt_connect() except DeviceNotFoundError as e: print(e) return 0 if not command_processed: print("No status message has been received after ", waiting_time, " seconds. Please restart the device and " "send the command again.") return False
[docs] def calibrate_orn(self, file_name, device_id=0, do_overwrite=False): r"""Calibrate the orientation module of the specified device Args: device_id (int): device id file_name (str): filename to be used for calibration. If you pass this parameter, ORN module should be ACTIVE! do_overwrite (bool): Overwrite if files exist already """ print("Start recording for 100 seconds, please move the device around during this time, in all directions") self.record_data(file_name, do_overwrite=do_overwrite, device_id=device_id, duration=100, file_type='csv') calibre_out_file = file_name + "_calibre_coef.csv" assert not (os.path.isfile(calibre_out_file) and do_overwrite), calibre_out_file + " already exists!" import numpy as np with open((file_name + "_ORN.csv"), "r") as f_set, open(calibre_out_file, "w") as f_coef: f_coef.write("kx, ky, kz, mx_offset, my_offset, mz_offset\n") csv_reader = csv.reader(f_set, delimiter=",") csv_coef = csv.writer(f_coef, delimiter=",") np_set = list(csv_reader) np_set = np.array(np_set[1:], dtype=np.float) mag_set_x = np.sort(np_set[:, -3]) mag_set_y = np.sort(np_set[:, -2]) mag_set_z = np.sort(np_set[:, -1]) mx_offset = 0.5 * (mag_set_x[0] + mag_set_x[-1]) my_offset = 0.5 * (mag_set_y[0] + mag_set_y[-1]) mz_offset = 0.5 * (mag_set_z[0] + mag_set_z[-1]) kx = 0.5 * (mag_set_x[-1] - mag_set_x[0]) ky = 0.5 * (mag_set_y[-1] - mag_set_y[0]) kz = 0.5 * (mag_set_z[-1] - mag_set_z[0]) k = np.sort(np.array([kx, ky, kz])) kx = 1 / kx ky = 1 / ky kz = 1 / kz calibre_set = np.array([kx, ky, kz, mx_offset, my_offset, mz_offset]) csv_coef.writerow(calibre_set) f_set.close() f_coef.close() os.remove((file_name + "_ORN.csv")) os.remove((file_name + "_ExG.csv")) os.remove((file_name + "_Marker.csv"))
if __name__ == '__main__': pass