# -*- coding: utf-8 -*-
import numpy as np
import time
from functools import partial
from threading import Thread
from explorepy.tools import HeartRateEstimator
from bokeh.layouts import widgetbox, row, column
from bokeh.models import ColumnDataSource, ResetTool, PrintfTickFormatter, Panel, Tabs
from bokeh.plotting import figure
from bokeh.server.server import Server
from bokeh.palettes import Colorblind
from bokeh.models.widgets import Select, DataTable, TableColumn, RadioButtonGroup
from bokeh.models import SingleIntervalTicker
from bokeh.core.property.validation import validate, without_property_validation
from tornado import gen
from bokeh.transform import dodge
ORN_SRATE = 20 # Hz
WIN_LENGTH = 10 # Seconds
MODE_LIST = ['EEG', 'ECG']
CHAN_LIST = ['Ch1', 'Ch2', 'Ch3', 'Ch4', 'Ch5', 'Ch6', 'Ch7', 'Ch8']
DEFAULT_SCALE = 10 ** -3 # Volt
N_MOVING_AVERAGE = 60
V_TH = [10**-5, 5 * 10 ** -3] # Noise threshold for ECG (Volt)
ORN_LIST = ['accX', 'accY', 'accZ', 'gyroX', 'gyroY', 'gyroZ', 'magX', 'magY', 'magZ']
SCALE_MENU = {"1 uV": 6., "5 uV": 5.3333, "10 uV": 5., "100 uV": 4., "200 uV": 3.6666, "500 uV": 3.3333, "1 mV": 3., "5 mV": 2.3333,
"10 mV": 2., "100 mV": 1.}
TIME_RANGE_MENU = {"10 s": 10., "5 s": 5., "20 s": 20.}
LINE_COLORS = ['green', '#42C4F7', 'red']
FFT_COLORS = Colorblind[8]
[docs]class Dashboard:
"""Explorepy dashboard class"""
def __init__(self, n_chan, exg_fs, mode="signal", firmware_version="NA"):
"""
Args:
n_chan (int): Number of channels
exg_fs (int): Sampling rate of ExG signal
mode (str): Visualization mode {'signal', 'impedance'}
firmware_version:
"""
self.n_chan = n_chan
self.y_unit = DEFAULT_SCALE
self.offsets = np.arange(1, self.n_chan + 1)[:, np.newaxis].astype(float)
self.chan_key_list = ['Ch' + str(i + 1) for i in range(self.n_chan)]
self.exg_mode = 'EEG'
self.rr_estimator = None
self.win_length = WIN_LENGTH
self.mode = mode
self.exg_fs = exg_fs
# Init ExG data source
exg_temp = np.zeros((n_chan, 2))
exg_temp[:, 0] = self.offsets[:, 0]
exg_temp[:, 1] = np.nan
init_data = dict(zip(self.chan_key_list, exg_temp))
init_data['t'] = np.array([0., 0.])
self.exg_source = ColumnDataSource(data=init_data)
# Init ECG R-peak source
init_data = dict(zip(['r_peak', 't'], [np.array([None], dtype=np.double), np.array([None], dtype=np.double)]))
self.r_peak_source = ColumnDataSource(data=init_data)
# Init marker source
init_data = dict(zip(['marker', 't'], [np.array([None], dtype=np.double), np.array([None], dtype=np.double)]))
self.marker_source = ColumnDataSource(data=init_data)
# Init ORN data source
init_data = dict(zip(ORN_LIST, np.zeros((9, 1))))
init_data['t'] = [0.]
self.orn_source = ColumnDataSource(data=init_data)
# Init table sources
self.heart_rate_source = ColumnDataSource(data={'heart_rate': ['NA']})
self.firmware_source = ColumnDataSource(data={'firmware_version': [firmware_version]})
self.battery_source = ColumnDataSource(data={'battery': ['NA']})
self.temperature_source = ColumnDataSource(data={'temperature': ['NA']})
self.light_source = ColumnDataSource(data={'light': ['NA']})
self.battery_percent_list = []
self.server = None
# Init fft data source
init_data = dict(zip(self.chan_key_list, np.zeros((self.n_chan, 1))))
init_data['f'] = np.array([0.])
self.fft_source = ColumnDataSource(data=init_data)
# Init impedance measurement source
init_data = {'channel': [CHAN_LIST[i] for i in range(0, self.n_chan)],
'impedance': ['NA' for i in range(self.n_chan)],
'row': ['1' for i in range(self.n_chan)],
'color': ['black' for i in range(self.n_chan)]}
self.imp_source = ColumnDataSource(data=init_data)
[docs] def start_server(self):
"""Start bokeh server"""
validation = validate(False)
self.server = Server({'/': self._init_doc}, num_procs=1)
self.server.start()
[docs] def start_loop(self):
"""Start io loop and show the dashboard"""
self.server.io_loop.add_callback(self.server.show, "/")
self.server.io_loop.start()
def _init_doc(self, doc):
self.doc = doc
self.doc.title = "Explore Dashboard"
# Create plots
self._init_plots()
# Create controls
m_widgetbox = self._init_controls()
# Create tabs
exg_tab = Panel(child=self.exg_plot, title="ExG Signal")
orn_tab = Panel(child=column([self.acc_plot, self.gyro_plot, self.mag_plot], sizing_mode='fixed'),
title="Orientation")
fft_tab = Panel(child=self.fft_plot, title="Spectral analysis")
imp_tab = Panel(child=self.imp_plot, title="Impedance")
if self.mode == "signal":
self.tabs = Tabs(tabs=[exg_tab, orn_tab, fft_tab], width=1200)
elif self.mode == "impedance":
self.tabs = Tabs(tabs=[imp_tab], width=1200)
self.doc.add_root(row([m_widgetbox, self.tabs]))
self.doc.add_periodic_callback(self._update_fft, 2000)
self.doc.add_periodic_callback(self._update_heart_rate, 2000)
[docs] @gen.coroutine
@without_property_validation
def update_exg(self, time_vector, ExG):
"""update_exg()
Update ExG data in the visualization
Args:
time_vector (list): time vector
ExG (np.ndarray): array of new data
"""
# Update ExG data
ExG = self.offsets + ExG / self.y_unit
new_data = dict(zip(self.chan_key_list, ExG))
new_data['t'] = time_vector
self.exg_source.stream(new_data, rollover=2 * self.exg_fs * WIN_LENGTH)
[docs] @gen.coroutine
@without_property_validation
def update_orn(self, timestamp, orn_data):
"""Update orientation data
Args:
timestamp (float): timestamp of the sample
orn_data (float vector): Vector of orientation data with shape of (9,)
"""
if self.tabs.active != 1:
return
new_data = dict(zip(ORN_LIST, np.array(orn_data)[:, np.newaxis]))
new_data['t'] = [timestamp]
self.orn_source.stream(new_data, rollover=2 * WIN_LENGTH * ORN_SRATE)
[docs] @gen.coroutine
@without_property_validation
def update_info(self, new):
"""Update device information in the dashboard
Args:
new(dict): Dictionary of new values
"""
for key in new.keys():
data = {key: new[key]}
if key == 'firmware_version':
self.firmware_source.stream(data, rollover=1)
elif key == 'battery':
self.battery_percent_list.append(new[key][0])
if len(self.battery_percent_list) > N_MOVING_AVERAGE:
del self.battery_percent_list[0]
value = int(np.mean(self.battery_percent_list) / 5) * 5
if value < 1:
value = 1
self.battery_source.stream({key: [value]}, rollover=1)
elif key == 'temperature':
self.temperature_source.stream(data, rollover=1)
elif key == 'light':
data[key] = [int(data[key][0])]
self.light_source.stream(data, rollover=1)
else:
print("Warning: There is no field named: " + key)
[docs] @gen.coroutine
@without_property_validation
def _update_fft(self):
""" Update spectral frequency analysis plot
"""
# Check if the tab is active and if EEG mode is active
if (self.tabs.active != 2) or (self.exg_mode != 'EEG'):
return
exg_data = np.array([self.exg_source.data[key] for key in self.chan_key_list])
if exg_data.shape[1] < self.exg_fs * 4.5:
return
fft_content, freq = get_fft(exg_data, self.exg_fs)
data = dict(zip(self.chan_key_list, fft_content))
data['f'] = freq
self.fft_source.data = data
[docs] @gen.coroutine
@without_property_validation
def _update_heart_rate(self):
"""Detect R-peaks and update the plot and heart rate"""
if self.exg_mode == 'EEG':
self.heart_rate_source.stream({'heart_rate': ['NA']}, rollover=1)
return
if self.rr_estimator is None:
self.rr_estimator = HeartRateEstimator(fs=self.exg_fs)
# Init R-peaks plot
self.exg_plot.circle(x='t', y='r_peak', source=self.r_peak_source,
fill_color="red", size=8)
ecg_data = (np.array(self.exg_source.data['Ch1'])[-500:] - self.offsets[0]) * self.y_unit
time_vector = np.array(self.exg_source.data['t'])[-500:]
# Check if the peak2peak value is bigger than threshold
if (np.ptp(ecg_data) < V_TH[0]) or (np.ptp(ecg_data) > V_TH[1]):
print("P2P value larger or less than threshold!")
return
peaks_time, peaks_val = self.rr_estimator.estimate(ecg_data, time_vector)
peaks_val = (np.array(peaks_val)/self.y_unit) + self.offsets[0]
if peaks_time:
data = dict(zip(['r_peak', 't'], [peaks_val, peaks_time]))
self.r_peak_source.stream(data, rollover=50)
# Update heart rate cell
estimated_heart_rate = self.rr_estimator.heart_rate
data = {'heart_rate': [estimated_heart_rate]}
self.heart_rate_source.stream(data, rollover=1)
@gen.coroutine
@without_property_validation
def update_marker(self, timestamp, code):
if self.mode == "impedance":
return
new_data = dict(zip(['marker', 't', 'code'], [np.array([0.01, self.n_chan+0.99, None], dtype=np.double),
np.array([timestamp, timestamp, None], dtype=np.double)]))
self.marker_source.stream(new_data=new_data, rollover=100)
@gen.coroutine
@without_property_validation
def update_imp(self, imp):
if self.mode == "impedance":
color = []
imp_str = []
for x in imp:
if x > 500:
color.append("black")
imp_str.append("Open")
elif x > 100:
color.append("red")
imp_str.append(str(round(x, 0))+" K\u03A9")
elif x > 50:
color.append("orange")
imp_str.append(str(round(x, 0))+" K\u03A9")
elif x > 10:
color.append("yellow")
imp_str.append(str(round(x, 0))+" K\u03A9")
elif x > 5:
imp_str.append(str(round(x, 0)) + " K\u03A9")
color.append("green")
else:
color.append("green")
imp_str.append("<5K\u03A9") # As the ADS is not precise in low values.
data = {"impedance": imp_str,
'channel': [CHAN_LIST[i] for i in range(0, self.n_chan)],
'row': ['1' for i in range(self.n_chan)],
'color': color
}
self.imp_source.stream(data, rollover=self.n_chan)
else:
raise RuntimeError("Trying to compute impedances while the dashboard is not in Impedance mode!")
[docs] @gen.coroutine
@without_property_validation
def _change_scale(self, attr, old, new):
"""Change y-scale of ExG plot"""
new, old = SCALE_MENU[new], SCALE_MENU[old]
old_unit = 10 ** (-old)
self.y_unit = 10 ** (-new)
for ch, value in self.exg_source.data.items():
if ch in CHAN_LIST:
temp_offset = self.offsets[CHAN_LIST.index(ch)]
self.exg_source.data[ch] = (value - temp_offset) * (old_unit / self.y_unit) + temp_offset
self.r_peak_source.data['r_peak'] = (np.array(self.r_peak_source.data['r_peak'])-self.offsets[0]) *\
(old_unit / self.y_unit) + self.offsets[0]
[docs] @gen.coroutine
@without_property_validation
def _change_t_range(self, attr, old, new):
"""Change time range"""
self._set_t_range(TIME_RANGE_MENU[new])
[docs] @gen.coroutine
def _change_mode(self, new):
"""Set EEG or ECG mode"""
self.exg_mode = MODE_LIST[new]
[docs] def _init_plots(self):
"""Initialize all plots in the dashboard"""
self.exg_plot = figure(y_range=(0.01, self.n_chan + 1 - 0.01), y_axis_label='Voltage', x_axis_label='Time (s)',
title="ExG signal",
plot_height=600, plot_width=1270,
y_minor_ticks=int(10),
tools=[ResetTool()], active_scroll=None, active_drag=None,
active_inspect=None, active_tap=None)
self.mag_plot = figure(y_axis_label='Magnetometer [mgauss/LSB]', x_axis_label='Time (s)',
plot_height=230, plot_width=1270,
tools=[ResetTool()], active_scroll=None, active_drag=None,
active_inspect=None, active_tap=None)
self.acc_plot = figure(y_axis_label='Accelerometer [mg/LSB]',
plot_height=190, plot_width=1270,
tools=[ResetTool()], active_scroll=None, active_drag=None,
active_inspect=None, active_tap=None)
self.acc_plot.xaxis.visible = False
self.gyro_plot = figure(y_axis_label='Gyroscope [mdps/LSB]',
plot_height=190, plot_width=1270,
tools=[ResetTool()], active_scroll=None, active_drag=None,
active_inspect=None, active_tap=None)
self.gyro_plot.xaxis.visible = False
self.fft_plot = figure(y_axis_label='Amplitude (uV)', x_axis_label='Frequency (Hz)', title="FFT",
x_range=(0, 70), plot_height=600, plot_width=1270, y_axis_type="log")
self.imp_plot = self._init_imp_plot()
# Set yaxis properties
self.exg_plot.yaxis.ticker = SingleIntervalTicker(interval=1, num_minor_ticks=10)
# Initial plot line
for i in range(self.n_chan):
self.exg_plot.line(x='t', y=CHAN_LIST[i], source=self.exg_source,
line_width=1.5, alpha=.9, line_color="#42C4F7")
self.fft_plot.line(x='f', y=CHAN_LIST[i], source=self.fft_source, legend_label=CHAN_LIST[i] + " ",
line_width=2, alpha=.9, line_color=FFT_COLORS[i])
self.exg_plot.line(x='t', y='marker', source=self.marker_source,
line_width=1, alpha=.8, line_color='#7AB904', line_dash="4 4")
for i in range(3):
self.acc_plot.line(x='t', y=ORN_LIST[i], source=self.orn_source, legend_label=ORN_LIST[i] + " ",
line_width=1.5, line_color=LINE_COLORS[i], alpha=.9)
self.gyro_plot.line(x='t', y=ORN_LIST[i + 3], source=self.orn_source, legend_label=ORN_LIST[i + 3] + " ",
line_width=1.5, line_color=LINE_COLORS[i], alpha=.9)
self.mag_plot.line(x='t', y=ORN_LIST[i + 6], source=self.orn_source, legend_label=ORN_LIST[i + 6] + " ",
line_width=1.5, line_color=LINE_COLORS[i], alpha=.9)
# Set x_range
self.plot_list = [self.exg_plot, self.acc_plot, self.gyro_plot, self.mag_plot]
self._set_t_range(WIN_LENGTH)
self.exg_plot.ygrid.minor_grid_line_color = 'navy'
self.exg_plot.ygrid.minor_grid_line_alpha = 0.05
# Set the formatting of yaxis ticks' labels
self.exg_plot.yaxis[0].formatter = PrintfTickFormatter(format="Ch %i")
# Autohide toolbar/ Legend location
for plot in self.plot_list:
plot.toolbar.autohide = True
plot.background_fill_color = "#fafafa"
if len(plot.legend) != 0:
plot.legend.location = "bottom_left"
plot.legend.orientation = "horizontal"
plot.legend.padding = 2
def _init_imp_plot(self):
p = figure(plot_width=600, plot_height=200, x_range=CHAN_LIST[0:self.n_chan],
y_range=[str(1)], toolbar_location=None)
p.circle(x='channel', y="row", radius=.3, source=self.imp_source, fill_alpha=0.6, color="color",
line_color='color', line_width=2)
text_props = {"source": self.imp_source, "text_align": "center",
"text_color": "black", "text_baseline": "middle", "text_font": "helvetica",
"text_font_style": "bold"}
x = dodge("channel", -0.1, range=p.x_range)
r = p.text(x=x, y=dodge('row', -.4, range=p.y_range), text="impedance", **text_props)
r.glyph.text_font_size = "10pt"
r = p.text(x=x, y=dodge('row', -.3, range=p.y_range), text="channel", **text_props)
r.glyph.text_font_size = "12pt"
p.outline_line_color = None
p.grid.grid_line_color = None
p.axis.axis_line_color = None
p.axis.major_tick_line_color = None
p.axis.major_label_standoff = 0
p.axis.visible = False
return p
[docs] def _init_controls(self):
"""Initialize all controls in the dashboard"""
# EEG/ECG Radio button
self.mode_control = RadioButtonGroup(labels=MODE_LIST, active=0)
self.mode_control.on_click(self._change_mode)
self.t_range = Select(title="Time window", value="10 s", options=list(TIME_RANGE_MENU.keys()), width=210)
self.t_range.on_change('value', self._change_t_range)
self.y_scale = Select(title="Y-axis Scale", value="1 mV", options=list(SCALE_MENU.keys()), width=210)
self.y_scale.on_change('value', self._change_scale)
# Create device info tables
columns = [TableColumn(field='heart_rate', title="Heart Rate (bpm)")]
self.heart_rate = DataTable(source=self.heart_rate_source, index_position=None, sortable=False,
reorderable=False,
columns=columns, width=200, height=50)
columns = [TableColumn(field='firmware_version', title="Firmware Version")]
self.firmware = DataTable(source=self.firmware_source, index_position=None, sortable=False, reorderable=False,
columns=columns, width=200, height=50)
columns = [TableColumn(field='battery', title="Battery (%)")]
self.battery = DataTable(source=self.battery_source, index_position=None, sortable=False, reorderable=False,
columns=columns, width=200, height=50)
columns = [TableColumn(field='temperature', title="Temperature (C)")]
self.temperature = DataTable(source=self.temperature_source, index_position=None, sortable=False,
reorderable=False, columns=columns, width=200, height=50)
columns = [TableColumn(field='light', title="Light (Lux)")]
self.light = DataTable(source=self.light_source, index_position=None, sortable=False, reorderable=False,
columns=columns, width=200, height=50)
# Add widgets to the doc
m_widgetbox = widgetbox([self.mode_control, self.y_scale, self.t_range, self.heart_rate,
self.battery, self.temperature, self.light, self.firmware], width=220)
return m_widgetbox
[docs] def _set_t_range(self, t_length):
"""Change time range of ExG and orientation plots"""
for plot in self.plot_list:
self.win_length = int(t_length)
plot.x_range.follow = "end"
plot.x_range.follow_interval = t_length
plot.x_range.range_padding = 0.
plot.x_range.min_interval = t_length
def get_fft(exg, EEG_SRATE):
"""Compute FFT"""
n_chan, n_sample = exg.shape
L = n_sample / EEG_SRATE
n = 1024
freq = EEG_SRATE * np.arange(int(n / 2)) / n
fft_content = np.fft.fft(exg, n=n) / n
fft_content = np.abs(fft_content[:, range(int(n / 2))])
return fft_content[:, 1:], freq[1:]
if __name__ == '__main__':
print('Opening Bokeh application on http://localhost:5006/')
m_dashboard = Dashboard(n_chan=8)
m_dashboard.start_server()
def my_loop():
T = 0
time.sleep(2)
while True:
time_vector = np.linspace(T, T + .2, 50)
T += .2
EEG = (np.random.randint(0, 2, (8, 50)) - .5) * .0002 # (np.random.rand(8, 50)-.5) * .0005
m_dashboard.doc.add_next_tick_callback(partial(m_dashboard.update_exg, time_vector=time_vector, ExG=EEG))
device_info_attr = ['firmware_version', 'battery', 'temperature', 'light']
device_info_val = [['2.0.4'], [95], [21], [13]]
new_data = dict(zip(device_info_attr, device_info_val))
m_dashboard.doc.add_next_tick_callback(partial(m_dashboard.update_info, new=new_data))
m_dashboard.doc.add_next_tick_callback(
partial(m_dashboard.update_orn, timestamp=T, orn_data=np.random.rand(9)))
time.sleep(0.2)
thread = Thread(target=my_loop)
thread.start()
m_dashboard.start_loop()