# -*- coding: utf-8 -*-
"""Dashboard module"""
import os
from functools import partial
import numpy as np
from bokeh.layouts import widgetbox, row, column, Spacer
from bokeh.models import ColumnDataSource, ResetTool, PrintfTickFormatter, Panel, Tabs, SingleIntervalTicker, widgets, \
Toggle, TextInput, RadioGroup, Div, CustomJS, Button
from bokeh.plotting import figure
from bokeh.server.server import Server
from bokeh.palettes import PRGn
from bokeh.core.property.validation import validate, without_property_validation
from bokeh.transform import dodge
from bokeh.themes import Theme
from tornado import gen
from jinja2 import Template
from datetime import datetime
from explorepy.tools import HeartRateEstimator
from explorepy.stream_processor import TOPICS
ORN_SRATE = 20 # Hz
EXG_VIS_SRATE = 125
WIN_LENGTH = 10 # Seconds
MODE_LIST = ['EEG', 'ECG']
CHAN_LIST = ['Ch1', 'Ch2', 'Ch3', 'Ch4', 'Ch5', 'Ch6', 'Ch7', 'Ch8']
DEFAULT_SCALE = 10 ** 3 # Volt
BATTERY_N_MOVING_AVERAGE = 60
V_TH = [10, 5 * 10 ** 3] # Noise threshold for ECG (microVolt)
ORN_LIST = ['accX', 'accY', 'accZ', 'gyroX', 'gyroY', 'gyroZ', 'magX', 'magY', 'magZ']
SCALE_MENU = {"1 uV": 0, "5 uV": -0.66667, "10 uV": -1, "100 uV": -2, "200 uV": -2.33333, "500 uV": -2.66667,
"1 mV": -3, "5 mV": -3.66667, "10 mV": -4, "100 mV": -5}
TIME_RANGE_MENU = {"10 s": 10., "5 s": 5., "20 s": 20.}
LINE_COLORS = ['green', '#42C4F7', 'red']
FFT_COLORS = PRGn[8]
[docs]class Dashboard:
"""Explorepy dashboard class"""
def __init__(self, explore=None, mode='signal'):
"""
Args:
stream_processor (explorepy.stream_processor.StreamProcessor): Stream processor object
"""
self.explore = explore
self.stream_processor = self.explore.stream_processor
self.n_chan = self.stream_processor.device_info['adc_mask'].count(1)
self.y_unit = DEFAULT_SCALE
self.offsets = np.arange(1, self.n_chan + 1)[:, np.newaxis].astype(float)
self.chan_key_list = [CHAN_LIST[i]
for i, mask in enumerate(reversed(self.stream_processor.device_info['adc_mask'])) if
mask == 1]
self.exg_mode = 'EEG'
self.rr_estimator = None
self.win_length = WIN_LENGTH
self.mode = mode
self.exg_fs = self.stream_processor.device_info['sampling_rate']
# Init ExG data source
exg_temp = np.zeros((self.n_chan, 2))
exg_temp[:, 0] = self.offsets[:, 0]
exg_temp[:, 1] = np.nan
init_data = dict(zip(self.chan_key_list, exg_temp))
self._exg_source_orig = ColumnDataSource(data=init_data)
init_data['t'] = np.array([0., 0.])
self._exg_source_ds = ColumnDataSource(data=init_data) # Downsampled ExG data for visualization purposes
# Init ECG R-peak source
init_data = dict(zip(['r_peak', 't'], [np.array([None], dtype=np.double), np.array([None], dtype=np.double)]))
self._r_peak_source = ColumnDataSource(data=init_data)
# Init marker source
init_data = dict(zip(['marker', 't'], [np.array([None], dtype=np.double), np.array([None], dtype=np.double)]))
self._marker_source = ColumnDataSource(data=init_data)
# Init ORN data source
init_data = dict(zip(ORN_LIST, np.zeros((9, 1))))
init_data['t'] = [0.]
self._orn_source = ColumnDataSource(data=init_data)
# Init table sources
self._heart_rate_source = ColumnDataSource(data={'heart_rate': ['NA']})
self._firmware_source = ColumnDataSource(
data={'firmware_version': [self.stream_processor.device_info['firmware_version']]}
)
self._battery_source = ColumnDataSource(data={'battery': ['NA']})
self.temperature_source = ColumnDataSource(data={'temperature': ['NA']})
self.light_source = ColumnDataSource(data={'light': ['NA']})
self.battery_percent_list = []
self.server = None
# Init fft data source
init_data = dict(zip(self.chan_key_list, np.zeros((self.n_chan, 1))))
init_data['f'] = np.array([0.])
self.fft_source = ColumnDataSource(data=init_data)
# Init impedance measurement source
init_data = {'channel': self.chan_key_list,
'impedance': ['NA' for i in range(self.n_chan)],
'row': ['1' for i in range(self.n_chan)],
'color': ['black' for i in range(self.n_chan)]}
self.imp_source = ColumnDataSource(data=init_data)
# Init timer source
self._timer_source = ColumnDataSource(data={'timer': ['00:00:00']})
[docs] def start_server(self):
"""Start bokeh server"""
validate(False)
self.server = Server({'/': self._init_doc}, num_procs=1)
self.server.start()
[docs] def start_loop(self):
"""Start io loop and show the dashboard"""
self.server.io_loop.add_callback(self.server.show, "/")
self.server.io_loop.start()
[docs] def exg_callback(self, packet):
"""
Update ExG data in the visualization
Args:
packet (explorepy.packet.EEG): Received ExG packet
"""
time_vector, exg = packet.get_data(self.exg_fs)
self._exg_source_orig.stream(dict(zip(self.chan_key_list, exg)), rollover=int(self.exg_fs * self.win_length))
# Downsampling
exg = exg[:, ::int(self.exg_fs / EXG_VIS_SRATE)]
time_vector = time_vector[::int(self.exg_fs / EXG_VIS_SRATE)]
# Update ExG unit
exg = self.offsets + exg / self.y_unit
new_data = dict(zip(self.chan_key_list, exg))
new_data['t'] = time_vector
self.doc.add_next_tick_callback(partial(self._update_exg, new_data=new_data))
[docs] def orn_callback(self, packet):
"""Update orientation data
Args:
packet (explorepy.packet.Orientation): Orientation packet
"""
if self.tabs.active != 1:
return
timestamp, orn_data = packet.get_data()
new_data = dict(zip(ORN_LIST, np.array(orn_data)[:, np.newaxis]))
new_data['t'] = timestamp
self.doc.add_next_tick_callback(partial(self._update_orn, new_data=new_data))
[docs] def info_callback(self, packet):
"""Update device information in the dashboard
Args:
packet (explorepy.packet.Environment): Environment/DeviceInfo packet
"""
new_info = packet.get_data()
for key in new_info.keys():
data = {key: new_info[key]}
if key == 'firmware_version':
self.doc.add_next_tick_callback(partial(self._update_fw_version, new_data=data))
elif key == 'battery':
self.battery_percent_list.append(new_info[key][0])
if len(self.battery_percent_list) > BATTERY_N_MOVING_AVERAGE:
del self.battery_percent_list[0]
value = int(np.mean(self.battery_percent_list) / 5) * 5
if value < 1:
value = 1
self.doc.add_next_tick_callback(partial(self._update_battery, new_data={key: [value]}))
elif key == 'temperature':
self.doc.add_next_tick_callback(partial(self._update_temperature, new_data=data))
elif key == 'light':
data[key] = [int(data[key][0])]
self.doc.add_next_tick_callback(partial(self._update_light, new_data=data))
else:
print("Warning: There is no field named: " + key)
[docs] def marker_callback(self, packet):
"""Update markers
Args:
packet (explorepy.packet.EventMarker): Event marker packet
"""
if self.mode == "impedance":
return
timestamp, _ = packet.get_data()
new_data = dict(zip(['marker', 't', 'code'], [np.array([0.01, self.n_chan + 0.99, None], dtype=np.double),
np.array([timestamp[0], timestamp[0], None], dtype=np.double)]))
self.doc.add_next_tick_callback(partial(self._update_marker, new_data=new_data))
[docs] def impedance_callback(self, packet):
"""Update impedances
Args:
packet (explorepy.packet.EEG): ExG packet
"""
if self.mode == "impedance":
imp = packet.get_impedances()
color = []
imp_status = []
for value in imp:
if value > 500:
color.append("black")
imp_status.append("Open")
elif value > 100:
color.append("red")
imp_status.append(str(round(value, 0)) + " K\u03A9")
elif value > 50:
color.append("orange")
imp_status.append(str(round(value, 0)) + " K\u03A9")
elif value > 10:
color.append("yellow")
imp_status.append(str(round(value, 0)) + " K\u03A9")
elif value > 5:
imp_status.append(str(round(value, 0)) + " K\u03A9")
color.append("green")
else:
color.append("green")
imp_status.append("<5K\u03A9") # As the ADS is not precise in low values.
data = {"impedance": imp_status,
'channel': self.chan_key_list,
'row': ['1' for i in range(self.n_chan)],
'color': color
}
self.doc.add_next_tick_callback(partial(self._update_imp, new_data=data))
else:
raise RuntimeError("Trying to compute impedances while the dashboard is not in Impedance mode!")
@gen.coroutine
@without_property_validation
def _update_exg(self, new_data):
self._exg_source_ds.stream(new_data, rollover=int(2 * EXG_VIS_SRATE * WIN_LENGTH))
@gen.coroutine
@without_property_validation
def _update_orn(self, new_data):
self._orn_source.stream(new_data, rollover=int(2 * WIN_LENGTH * ORN_SRATE))
@gen.coroutine
@without_property_validation
def _update_fw_version(self, new_data):
self._firmware_source.stream(new_data, rollover=1)
@gen.coroutine
@without_property_validation
def _update_battery(self, new_data):
self._battery_source.stream(new_data, rollover=1)
@gen.coroutine
@without_property_validation
def _update_temperature(self, new_data):
self.temperature_source.stream(new_data, rollover=1)
@gen.coroutine
@without_property_validation
def _update_light(self, new_data):
self.light_source.stream(new_data, rollover=1)
@gen.coroutine
@without_property_validation
def _update_marker(self, new_data):
self._marker_source.stream(new_data=new_data, rollover=100)
@gen.coroutine
@without_property_validation
def _update_imp(self, new_data):
self.imp_source.stream(new_data, rollover=self.n_chan)
[docs] @gen.coroutine
@without_property_validation
def _update_fft(self):
""" Update spectral frequency analysis plot"""
# Check if the tab is active and if EEG mode is active
if (self.tabs.active != 2) or (self.exg_mode != 'EEG'):
return
exg_data = np.array([self._exg_source_orig.data[key] for key in self.chan_key_list])
if exg_data.shape[1] < self.exg_fs * 5:
return
fft_content, freq = get_fft(exg_data, self.exg_fs)
data = dict(zip(self.chan_key_list, fft_content))
data['f'] = freq
self.fft_source.data = data
[docs] @gen.coroutine
@without_property_validation
def _update_heart_rate(self):
"""Detect R-peaks and update the plot and heart rate"""
if self.exg_mode == 'EEG':
self._heart_rate_source.stream({'heart_rate': ['NA']}, rollover=1)
return
if CHAN_LIST[0] not in self.chan_key_list:
print('WARNING: Heart rate estimation works only when channel 1 is enabled.')
return
if self.rr_estimator is None:
self.rr_estimator = HeartRateEstimator(fs=self.exg_fs)
# Init R-peaks plot
self.exg_plot.circle(x='t', y='r_peak', source=self._r_peak_source,
fill_color="red", size=8)
ecg_data = (np.array(self._exg_source_ds.data['Ch1'])[-2 * EXG_VIS_SRATE:] - self.offsets[0]) * self.y_unit
time_vector = np.array(self._exg_source_ds.data['t'])[-2 * EXG_VIS_SRATE:]
# Check if the peak2peak value is bigger than threshold
if (np.ptp(ecg_data) < V_TH[0]) or (np.ptp(ecg_data) > V_TH[1]):
print("WARNING: P2P value larger or less than threshold. Cannot compute heart rate!")
return
peaks_time, peaks_val = self.rr_estimator.estimate(ecg_data, time_vector)
peaks_val = (np.array(peaks_val) / self.y_unit) + self.offsets[0]
if peaks_time:
data = dict(zip(['r_peak', 't'], [peaks_val, peaks_time]))
self._r_peak_source.stream(data, rollover=50)
# Update heart rate cell
estimated_heart_rate = self.rr_estimator.heart_rate
data = {'heart_rate': [estimated_heart_rate]}
self._heart_rate_source.stream(data, rollover=1)
[docs] @gen.coroutine
@without_property_validation
def _change_scale(self, attr, old, new):
"""Change y-scale of ExG plot"""
new, old = SCALE_MENU[new], SCALE_MENU[old]
old_unit = 10 ** (-old)
self.y_unit = 10 ** (-new)
for chan, value in self._exg_source_ds.data.items():
if chan in self.chan_key_list:
temp_offset = self.offsets[self.chan_key_list.index(chan)]
self._exg_source_ds.data[chan] = (value - temp_offset) * (old_unit / self.y_unit) + temp_offset
self._r_peak_source.data['r_peak'] = (np.array(self._r_peak_source.data['r_peak']) - self.offsets[0]) * \
(old_unit / self.y_unit) + self.offsets[0]
[docs] @gen.coroutine
@without_property_validation
def _change_t_range(self, attr, old, new):
"""Change time range"""
self._set_t_range(TIME_RANGE_MENU[new])
[docs] @gen.coroutine
def _change_mode(self, attr, old, new):
"""Set EEG or ECG mode"""
self.exg_mode = new
def _init_doc(self, doc):
self.doc = doc
self.doc.title = "Explore Dashboard"
with open(os.path.join(os.path.dirname(__file__), 'templates', 'index.html')) as f:
index_template = Template(f.read())
doc.template = index_template
self.doc.theme = Theme(os.path.join(os.path.dirname(__file__), 'theme.yaml'))
self._init_plots()
m_widgetbox = self._init_controls()
# Create tabs
if self.mode == "signal":
exg_tab = Panel(child=self.exg_plot, title="ExG Signal")
orn_tab = Panel(child=column([self.acc_plot, self.gyro_plot, self.mag_plot], sizing_mode='scale_width'),
title="Orientation")
fft_tab = Panel(child=self.fft_plot, title="Spectral analysis")
self.tabs = Tabs(tabs=[exg_tab, orn_tab, fft_tab], width=400, sizing_mode='scale_width')
self.recorder_widget = self._init_recorder()
self.set_marker_widget = self._init_set_marker()
elif self.mode == "impedance":
imp_tab = Panel(child=self.imp_plot, title="Impedance")
self.tabs = Tabs(tabs=[imp_tab], width=500, sizing_mode='scale_width')
banner = Div(text=""" <a href="https://www.mentalab.com"><img src=
"https://images.squarespace-cdn.com/content/5428308ae4b0701411ea8aaf/1505653866447-R24N86G5X1HFZCD7KBWS/
Mentalab%2C+Name+copy.png?format=1500w&content-type=image%2Fpng" alt="Mentalab" width="225" height="39">""",
width=1500, height=50, css_classes=["banner"], align='center', sizing_mode="stretch_width")
heading = Div(text=""" """, height=2, sizing_mode="stretch_width")
if self.mode == 'signal':
layout = column([heading,
banner,
row(m_widgetbox,
Spacer(width=10, height=200),
self.tabs,
Spacer(width=10, height=300),
column(Spacer(width=170, height=35), self.recorder_widget, self.set_marker_widget),
Spacer(width=50, height=300)),
],
sizing_mode="stretch_both")
elif self.mode == 'impedance':
layout = column(banner,
Spacer(width=600, height=20),
row([m_widgetbox, Spacer(width=25, height=500), self.tabs])
)
self.doc.add_root(layout)
self.doc.add_periodic_callback(self._update_fft, 2000)
self.doc.add_periodic_callback(self._update_heart_rate, 2000)
if self.stream_processor:
self.stream_processor.subscribe(topic=TOPICS.filtered_ExG, callback=self.exg_callback)
self.stream_processor.subscribe(topic=TOPICS.raw_orn, callback=self.orn_callback)
self.stream_processor.subscribe(topic=TOPICS.device_info, callback=self.info_callback)
self.stream_processor.subscribe(topic=TOPICS.marker, callback=self.marker_callback)
self.stream_processor.subscribe(topic=TOPICS.env, callback=self.info_callback)
self.stream_processor.subscribe(topic=TOPICS.imp, callback=self.impedance_callback)
[docs] def _init_plots(self):
"""Initialize all plots in the dashboard"""
self.exg_plot = figure(y_range=(0.01, self.n_chan + 1 - 0.01), y_axis_label='Voltage', x_axis_label='Time (s)',
title="ExG signal",
plot_height=250, plot_width=500,
y_minor_ticks=int(10),
tools=[ResetTool()], active_scroll=None, active_drag=None,
active_inspect=None, active_tap=None, sizing_mode="scale_width")
self.mag_plot = figure(y_axis_label='Mag [mgauss/LSB]', x_axis_label='Time (s)',
plot_height=100, plot_width=500,
tools=[ResetTool()], active_scroll=None, active_drag=None,
active_inspect=None, active_tap=None, sizing_mode="scale_width")
self.acc_plot = figure(y_axis_label='Acc [mg/LSB]',
plot_height=75, plot_width=500,
tools=[ResetTool()], active_scroll=None, active_drag=None,
active_inspect=None, active_tap=None, sizing_mode="scale_width")
self.acc_plot.xaxis.visible = False
self.gyro_plot = figure(y_axis_label='Gyro [mdps/LSB]',
plot_height=75, plot_width=500,
tools=[ResetTool()], active_scroll=None, active_drag=None,
active_inspect=None, active_tap=None, sizing_mode="scale_width")
self.gyro_plot.xaxis.visible = False
self.fft_plot = figure(y_axis_label='Amplitude (uV)', x_axis_label='Frequency (Hz)', title="FFT",
x_range=(0, 70), plot_height=250, plot_width=500, y_axis_type="log",
sizing_mode="scale_width")
self.imp_plot = self._init_imp_plot()
# Set yaxis properties
self.exg_plot.yaxis.ticker = SingleIntervalTicker(interval=1, num_minor_ticks=0)
# Initial plot line
for i in range(self.n_chan):
self.exg_plot.line(x='t', y=self.chan_key_list[i], source=self._exg_source_ds,
line_width=1.0, alpha=.9, line_color="#42C4F7")
self.fft_plot.line(x='f', y=self.chan_key_list[i], source=self.fft_source,
legend_label=self.chan_key_list[i] + " ",
line_width=1.0, alpha=.9, line_color=FFT_COLORS[i])
self.fft_plot.yaxis.axis_label_text_font_style = 'normal'
self.exg_plot.line(x='t', y='marker', source=self._marker_source,
line_width=1, alpha=.8, line_color='#7AB904', line_dash="4 4")
for i in range(3):
self.acc_plot.line(x='t', y=ORN_LIST[i], source=self._orn_source, legend_label=ORN_LIST[i] + " ",
line_width=1.5, line_color=LINE_COLORS[i], alpha=.9)
self.gyro_plot.line(x='t', y=ORN_LIST[i + 3], source=self._orn_source, legend_label=ORN_LIST[i + 3] + " ",
line_width=1.5, line_color=LINE_COLORS[i], alpha=.9)
self.mag_plot.line(x='t', y=ORN_LIST[i + 6], source=self._orn_source, legend_label=ORN_LIST[i + 6] + " ",
line_width=1.5, line_color=LINE_COLORS[i], alpha=.9)
# Set x_range
self.plot_list = [self.exg_plot, self.acc_plot, self.gyro_plot, self.mag_plot]
self._set_t_range(WIN_LENGTH)
# Set the formatting of yaxis ticks' labels
self.exg_plot.yaxis.major_label_overrides = dict(zip(range(1, self.n_chan + 1), self.chan_key_list))
for plot in self.plot_list:
plot.toolbar.autohide = True
plot.yaxis.axis_label_text_font_style = 'normal'
if len(plot.legend) != 0:
plot.legend.location = "bottom_left"
plot.legend.orientation = "horizontal"
plot.legend.padding = 2
def _init_imp_plot(self):
plot = figure(plot_width=600, plot_height=200, x_range=self.chan_key_list[0:self.n_chan],
y_range=[str(1)], toolbar_location=None, sizing_mode="scale_width")
plot.circle(x='channel', y="row", size=50, source=self.imp_source, fill_alpha=0.6, color="color",
line_color='color', line_width=2)
text_props = {"source": self.imp_source, "text_align": "center",
"text_color": "white", "text_baseline": "middle", "text_font": "helvetica",
"text_font_style": "bold"}
x = dodge("channel", -0.1, range=plot.x_range)
plot.text(x=x, y=dodge('row', -.35, range=plot.y_range),
text="impedance", **text_props).glyph.text_font_size = "10pt"
plot.text(x=x, y=dodge('row', -.25, range=plot.y_range), text="channel",
**text_props).glyph.text_font_size = "12pt"
plot.outline_line_color = None
plot.grid.grid_line_color = None
plot.axis.axis_line_color = None
plot.axis.major_tick_line_color = None
plot.axis.major_label_standoff = 0
plot.axis.visible = False
return plot
[docs] def _init_controls(self):
"""Initialize all controls in the dashboard"""
# EEG/ECG Radio button
self.mode_control = widgets.Select(title="Signal", value='EEG', options=MODE_LIST, width=170, height=50)
self.mode_control.on_change('value', self._change_mode)
self.t_range = widgets.Select(title="Time window", value="10 s", options=list(TIME_RANGE_MENU.keys()),
width=170, height=50)
self.t_range.on_change('value', self._change_t_range)
self.y_scale = widgets.Select(title="Y-axis Scale", value="1 mV", options=list(SCALE_MENU.keys()),
width=170, height=50)
self.y_scale.on_change('value', self._change_scale)
# Create device info tables
columns = [widgets.TableColumn(field='heart_rate', title="Heart Rate (bpm)")]
self.heart_rate = widgets.DataTable(source=self._heart_rate_source, index_position=None, sortable=False,
reorderable=False,
columns=columns, width=170, height=50)
columns = [widgets.TableColumn(field='firmware_version', title="Firmware Version")]
self.firmware = widgets.DataTable(source=self._firmware_source, index_position=None, sortable=False,
reorderable=False,
columns=columns, width=170, height=50)
columns = [widgets.TableColumn(field='battery', title="Battery (%)")]
self.battery = widgets.DataTable(source=self._battery_source, index_position=None, sortable=False,
reorderable=False,
columns=columns, width=170, height=50)
columns = [widgets.TableColumn(field='temperature', title="Device temperature (C)")]
self.temperature = widgets.DataTable(source=self.temperature_source, index_position=None, sortable=False,
reorderable=False, columns=columns, width=170, height=50)
columns = [widgets.TableColumn(field='light', title="Light (Lux)")]
self.light = widgets.DataTable(source=self.light_source, index_position=None, sortable=False, reorderable=False,
columns=columns, width=170, height=50)
widget_box = widgetbox(
[Spacer(width=170, height=30), self.mode_control, self.y_scale, self.t_range, self.heart_rate,
self.battery, self.temperature, self.firmware], width=175, height=450, sizing_mode='fixed')
return widget_box
def _init_recorder(self):
self.rec_button = Toggle(label=u"\u25CF Record", button_type="default", active=False,
width=170, height=35)
self.file_name_widget = TextInput(value="test_file", title="File name:", width=170, height=50)
self.file_type_widget = RadioGroup(labels=["EDF (BDF+)", "CSV"], active=0, width=170, height=50)
columns = [widgets.TableColumn(field='timer', title="Record time",
formatter=widgets.StringFormatter(text_align='center'))]
self.timer = widgets.DataTable(source=self._timer_source, index_position=None, sortable=False,
reorderable=False,
header_row=False, columns=columns,
width=170, height=50, css_classes=["timer_widget"])
self.rec_button.on_click(self._toggle_rec)
return column([Spacer(width=170, height=5), self.file_name_widget, self.file_type_widget, self.rec_button,
self.timer], width=170, height=200, sizing_mode='fixed')
def _toggle_rec(self, active):
if active:
self.event_code_input.disabled = False
self.marker_button.disabled = False
if self.explore.is_connected:
self.explore.record_data(file_name=self.file_name_widget.value,
file_type=['edf', 'csv'][self.file_type_widget.active],
do_overwrite=True)
self.rec_button.label = u"\u25A0 Stop"
self.rec_start_time = datetime.now()
self.rec_timer_id = self.doc.add_periodic_callback(self._timer_callback, 1000)
else:
self.rec_button.active = False
self.doc.remove_periodic_callback(self.rec_timer_id)
self.doc.add_next_tick_callback(partial(self._update_rec_timer, new_data={'timer': '00:00:00'}))
else:
self.explore.stop_recording()
self.rec_button.label = u"\u25CF Record"
self.doc.add_next_tick_callback(partial(self._update_rec_timer, new_data={'timer': '00:00:00'}))
self.doc.remove_periodic_callback(self.rec_timer_id)
self.event_code_input.disabled = True
self.marker_button.disabled = True
def _timer_callback(self):
t_delta = (datetime.now() - self.rec_start_time).seconds
timer_text = ':'.join([str(int(t_delta / 3600)).zfill(2), str(int(t_delta / 60) % 60).zfill(2),
str(int(t_delta % 60)).zfill(2)])
data = {'timer': timer_text}
self.doc.add_next_tick_callback(partial(self._update_rec_timer, new_data=data))
def _init_set_marker(self):
self.marker_button = Button(label=u"Set", button_type="default", width=80, height=31, disabled=True)
self.event_code_input = TextInput(value="8", title="Event code:", width=80, disabled=True)
self.event_code_input.on_change('value', self._check_marker_value)
self.marker_button.on_click(self._set_marker)
return column([Spacer(width=170, height=5),
row([self.event_code_input,
column(Spacer(width=50, height=19), self.marker_button)], height=50, width=170)],
width=170, height=50, sizing_mode='fixed'
)
def _set_marker(self):
code = self.event_code_input.value
self.stream_processor.set_marker(int(code))
def _check_marker_value(self, attr, old, new):
try:
code = int(self.event_code_input.value)
if code < 7 or code > 65535:
raise ValueError('Value must be an integer between 8 and 65535')
except ValueError:
self.event_code_input.value = "7<val<65535"
@gen.coroutine
@without_property_validation
def _update_rec_timer(self, new_data):
self._timer_source.stream(new_data, rollover=1)
[docs] def _set_t_range(self, t_length):
"""Change time range of ExG and orientation plots"""
for plot in self.plot_list:
self.win_length = int(t_length)
plot.x_range.follow = "end"
plot.x_range.follow_interval = t_length
plot.x_range.range_padding = 0.
plot.x_range.min_interval = t_length
def get_fft(exg, s_rate):
"""Compute FFT"""
n_point = 1024
freq = s_rate * np.arange(int(n_point / 2)) / n_point
fft_content = np.fft.fft(exg, n=n_point) / n_point
fft_content = np.abs(fft_content[:, range(int(n_point / 2))])
return fft_content[:, 1:], freq[1:]
if __name__ == '__main__':
from explorepy import Explore
from explorepy.stream_processor import StreamProcessor
explore = Explore()
explore.stream_processor = StreamProcessor()
explore.stream_processor.device_info = {'firmware_version': '0.0.0',
'adc_mask': [1 for i in range(8)],
'sampling_rate': 250}
dashboard = Dashboard(explore=explore)
dashboard.start_server()
dashboard.start_loop()