# -*- coding: utf-8 -*-
from explorepy.parser import Parser
import os.path
import csv
import bluetooth
import numpy as np
from explorepy.filters import Filter
from scipy import signal
import pyedflib
import datetime
[docs]def bt_scan():
""""Scan for bluetooth devices
Scans for available explore devices.
Prints out MAC address and name of each found device
Args:
Returns:
"""
print("Searching for nearby devices...")
nearby_devices = bluetooth.discover_devices(lookup_names=True)
explore_devices = []
for address, name in nearby_devices:
if "Explore" in name:
print("Device found: %s - %s" % (name, address))
explore_devices.append((address, name))
if not nearby_devices:
print("No Devices found")
return explore_devices
[docs]def bin2csv(bin_file, do_overwrite=False, out_dir=''):
"""Binary to CSV file converter.
This function converts the given binary file to ExG and ORN csv files.
Args:
bin_file (str): Binary file full address
out_dir (str): Relative output directory (if not given, it uses the current working directory.)
do_overwrite (bool): Overwrite if files exist already
"""
head_path, full_filename = os.path.split(bin_file)
filename, extension = os.path.splitext(full_filename)
assert os.path.isfile(bin_file), "Error: File does not exist!"
assert extension == '.BIN', "File type error! File extension must be BIN."
out_dir = os.getcwd() + out_dir + '/'
exg_out_file = out_dir + filename + '_exg'
orn_out_file = out_dir + filename + '_orn'
marker_out_file = out_dir + filename + '_marker'
exg_ch = ['TimeStamp', 'ch1', 'ch2', 'ch3', 'ch4', 'ch5', 'ch6', 'ch7', 'ch8']
exg_unit = ['s', 'V', 'V', 'V', 'V', 'V', 'V', 'V', 'V']
exg_recorder = FileRecorder(file_name=exg_out_file, ch_label=exg_ch, fs=250, ch_unit=exg_unit,
file_type='csv', do_overwrite=do_overwrite)
orn_ch = ['TimeStamp', 'ax', 'ay', 'az', 'gx', 'gy', 'gz', 'mx', 'my', 'mz']
orn_unit = ['s', 'mg', 'mg', 'mg', 'mdps', 'mdps', 'mdps', 'mgauss', 'mgauss', 'mgauss']
orn_recorder = FileRecorder(file_name=orn_out_file, ch_label=orn_ch, fs=20,
ch_unit=orn_unit, file_type='csv', do_overwrite=do_overwrite)
marker_ch = ['TimeStamp', 'Code']
marker_unit = ['s', '-']
marker_recorder = FileRecorder(file_name=marker_out_file, ch_label=marker_ch, fs=0, ch_unit=marker_unit,
file_type='csv', do_overwrite=do_overwrite)
with open(bin_file, "rb") as f_bin:
parser = Parser(fid=f_bin)
print("Converting...")
while True:
try:
parser.parse_packet(mode='record', recorders=(exg_recorder, orn_recorder, marker_recorder))
except ValueError:
print("Binary file ended! Conversion finished!")
break
[docs]def bin2edf(bin_file, do_overwrite=False, out_dir=''):
"""Binary to EDF file converter.
This function converts the given binary file to ExG and ORN csv files.
Args:
bin_file (str): Binary file full address
out_dir (str): Output directory (if None, uses the same directory as binary file)
do_overwrite (bool): Overwrite if files exist already
"""
head_path, full_filename = os.path.split(bin_file)
filename, extension = os.path.splitext(full_filename)
assert os.path.isfile(bin_file), "Error: File does not exist!"
assert extension == '.BIN', "File type error! File extension must be BIN."
out_dir = os.getcwd() + out_dir + '/'
exg_out_file = out_dir + filename + '_exg'
orn_out_file = out_dir + filename + '_orn'
with open(bin_file, "rb") as f_bin:
parser = Parser(fid=f_bin)
exg_ch = ['TimeStamp', 'ch1', 'ch2', 'ch3', 'ch4', 'ch5', 'ch6', 'ch7', 'ch8'][0:parser.n_chan + 1]
exg_unit = ['s', 'V', 'V', 'V', 'V', 'V', 'V', 'V', 'V'][0:parser.n_chan + 1]
exg_max = [86400, .4, .4, .4, .4, .4, .4, .4, .4][0:parser.n_chan + 1]
exg_min = [0, -.4, -.4, -.4, -.4, -.4, -.4, -.4, -.4][0:parser.n_chan + 1]
exg_recorder = FileRecorder(file_name=exg_out_file, ch_label=exg_ch, fs=parser.fs, ch_unit=exg_unit,
file_type='edf', do_overwrite=do_overwrite, ch_min=exg_min, ch_max=exg_max)
orn_ch = ['TimeStamp', 'ax', 'ay', 'az', 'gx', 'gy', 'gz', 'mx', 'my', 'mz']
orn_unit = ['s', 'mg', 'mg', 'mg', 'mdps', 'mdps', 'mdps', 'mgauss', 'mgauss', 'mgauss']
orn_max = [86400, 2000, 2000, 2000, 250000, 250000, 250000, 50000, 50000, 50000]
orn_min = [0, -2000, -2000, -2000, -250000, -250000, -250000, -50000, -50000, -50000]
orn_recorder = FileRecorder(file_name=orn_out_file, ch_label=orn_ch, fs=20, ch_unit=orn_unit, file_type='edf',
do_overwrite=do_overwrite, ch_max=orn_max, ch_min=orn_min)
print("Converting...")
while True:
try:
parser.parse_packet(mode='record', recorders=(exg_recorder, orn_recorder, exg_recorder))
except ValueError:
print("Binary file ended! Conversion finished!")
break
[docs]class HeartRateEstimator:
def __init__(self, fs=250, smoothing_win=20):
"""Real-time heart Rate Estimator class This class provides the tools for heart rate estimation. It basically detects
R-peaks in ECG signal using the method explained in Hamilton 2002 [2].
Args:
fs (int): Sampling frequency
smoothing_win (int): Length of smoothing window
References:
[1] Hamilton, P. S. (2002). Open source ECG analysis software documentation. Computers in cardiology, 2002.
[2] Hamilton, P. S., & Tompkins, W. J. (1986). Quantitative investigation of QRS detection rules using the
MIT/BIH arrhythmia database. IEEE transactions on biomedical engineering.
"""
self.fs = fs
self.threshold = .35 # Generally between 0.3125 and 0.475
self.ns200ms = int(self.fs * .2)
self.r_peaks_buffer = [(0., 0.)]
self.noise_peaks_buffer = [(0., 0., 0.)]
self.prev_samples = np.zeros(smoothing_win)
self.prev_diff_samples = np.zeros(smoothing_win)
self.prev_times = np.zeros(smoothing_win)
self.prev_max_slope = 0
self.bp_filter = Filter(l_freq=1, h_freq=30, order=3, sampling_freq=fs)
self.hamming_window = signal.windows.hamming(smoothing_win, sym=True)
self.hamming_window /= self.hamming_window.sum()
@property
def average_noise_peak(self):
return np.mean([item[0] for item in self.noise_peaks_buffer])
@property
def average_qrs_peak(self):
return np.mean([item[0] for item in self.r_peaks_buffer])
@property
def decision_threshold(self):
return self.average_noise_peak + self.threshold * (self.average_qrs_peak - self.average_noise_peak)
@property
def average_rr_interval(self):
if len(self.r_peaks_buffer) < 7:
return 1.
return np.mean(np.diff([item[1] for item in self.r_peaks_buffer]))
@property
def heart_rate(self):
if len(self.r_peaks_buffer) < 7:
print('Few peaks to get heart rate!')
return 'NA'
else:
r_times = [item[1] for item in self.r_peaks_buffer]
rr_intervals = np.diff(r_times, 1)
if True in (rr_intervals > 3.):
print('Missing peaks!')
return 'NA'
else:
estimated_heart_rate = int(1. / np.mean(rr_intervals) * 60)
if estimated_heart_rate > 140 or estimated_heart_rate < 40:
print('Estimated heart rate <40 or >140!')
estimated_heart_rate = 'NA'
return estimated_heart_rate
[docs] def push_r_peak(self, val, time):
self.r_peaks_buffer.append((val, time))
if len(self.r_peaks_buffer) > 8:
self.r_peaks_buffer.pop(0)
[docs] def push_noise_peak(self, val, peak_idx, peak_time):
self.noise_peaks_buffer.append((val, peak_idx, peak_time))
if len(self.noise_peaks_buffer) > 8:
self.noise_peaks_buffer.pop(0)
[docs] def estimate(self, ecg_sig, time_vector):
""" Detection of R-peaks
Args:
time_vector (np.array): One-dimensional time vector
ecg_sig (np.array): One-dimensional ECG signal
Returns:
List of detected peaks indices
"""
assert len(ecg_sig.shape) == 1, "Signal must be a vector"
# Preprocessing
ecg_filtered = self.bp_filter.apply_bp_filter(ecg_sig).squeeze()
ecg_sig = np.concatenate((self.prev_samples, ecg_sig))
sig_diff = np.diff(ecg_filtered, 1)
sig_abs_diff = np.abs(sig_diff)
sig_smoothed = signal.convolve(np.concatenate((self.prev_diff_samples, sig_abs_diff)),
self.hamming_window, mode='same', method='auto')[:len(ecg_filtered)]
time_vector = np.concatenate((self.prev_times, time_vector))
self.prev_samples = ecg_sig[-len(self.hamming_window):]
self.prev_diff_samples = sig_abs_diff[-len(self.hamming_window):]
self.prev_times = time_vector[-len(self.hamming_window):]
peaks_idx_list, _ = signal.find_peaks(sig_smoothed)
peaks_val_list = sig_smoothed[peaks_idx_list]
peaks_time_list = time_vector[peaks_idx_list]
detected_peaks_idx = []
detected_peaks_time = []
detected_peaks_val = []
# Decision rules by Hamilton 2002 [1]
for peak_idx, peak_val, peak_time in zip(peaks_idx_list, peaks_val_list, peaks_time_list):
# 1- Ignore all peaks that precede or follow larger peaks by less than 200 ms.
peaks_in_lim = [a and b and c for a, b, c in
zip(((peak_idx - self.ns200ms) < peaks_idx_list),
((peak_idx + self.ns200ms) > peaks_idx_list),
(peak_idx != peaks_idx_list)
)
]
if True in (peak_val < peaks_val_list[peaks_in_lim]):
continue
# 2- If a peak occurs, check to see whether the ECG signal contained both positive and negative slopes.
# TODO: Find a better way of checking this.
# if peak_idx == 0:
# continue
# elif peak_idx < 10:
# n_sample = peak_idx
# else:
# n_sample = 10
# The current n_sample leads to missing some R-peaks as it may have wider/thinner width.
# slopes = np.diff(ecg_sig[peak_idx-n_sample:peak_idx])
# if slopes[0] * slopes[-1] >= 0:
# continue
# check missing peak
self.check_missing_peak(peak_time, peak_idx, detected_peaks_idx, ecg_sig, time_vector)
# 3- If the peak occurred within 360 ms of a previous detection and had a maximum slope less than half the
# maximum slope of the previous detection assume it is a T-wave
if (peak_time - self.r_peaks_buffer[-1][1]) < .36:
if peak_idx < 15:
st_idx = 0
else:
st_idx = peak_idx - 15
if (peak_idx + 15) > (len(ecg_sig) - 1):
end_idx = len(ecg_sig) - 1
else:
end_idx = peak_idx + 15
curr_max_slope = np.abs(np.diff(ecg_sig[st_idx:end_idx])).max()
if curr_max_slope < (.5 * self.prev_max_slope):
continue
# 4- If the peak is larger than the detection threshold call it a QRS complex, otherwise call it noise.
if peak_idx < 25:
st_idx = 0
else:
st_idx = peak_idx - 25
pval = peak_val # ecg_sig[st_idx:peak_idx].max()
if pval > self.decision_threshold:
temp_idx = st_idx + np.argmax(ecg_sig[st_idx:peak_idx + 1])
temp_time = time_vector[temp_idx]
detected_peaks_idx.append(temp_idx)
detected_peaks_val.append(ecg_sig[st_idx:peak_idx + 1].max())
detected_peaks_time.append(temp_time)
self.push_r_peak(pval, temp_time)
if peak_idx < 25:
st_idx = 0
else:
st_idx = peak_idx - 25
self.prev_max_slope = np.abs(np.diff(ecg_sig[st_idx:peak_idx + 25])).max()
else:
self.push_noise_peak(pval, peak_idx, peak_time)
# TODO: Check lead inversion!
# Check for two close peaks
occurrence_time = [item[1] for item in self.r_peaks_buffer]
close_idx = (np.diff(np.array(occurrence_time), 1) < .05)
if (True in close_idx) and len(detected_peaks_idx) > 0:
del detected_peaks_time[0]
del detected_peaks_val[0]
return detected_peaks_time, detected_peaks_val
[docs] def check_missing_peak(self, peak_time, peak_idx, detected_peaks_idx, ecg_sig, time_vector):
# 5- If an interval equal to 1.5 times the average R-to-R interval has elapsed since the most recent
# detection, within that interval there was a peak that was larger than half the detection threshold and
# the peak followed the preceding detection by at least 360 ms, classify that peak as a QRS complex.
if (peak_time - self.r_peaks_buffer[-1][1]) > (1.4 * self.average_rr_interval):
last_noise_val, last_noise_idx, last_noise_time = self.noise_peaks_buffer[-1]
if last_noise_val > (.5 * self.decision_threshold):
if (last_noise_time - self.r_peaks_buffer[-1][1]) > .36:
self.noise_peaks_buffer.pop(-1)
if peak_idx > last_noise_idx:
if last_noise_idx < 20:
st_idx = 0
else:
st_idx = last_noise_idx - 20
detected_peaks_idx.append(st_idx + np.argmax(ecg_sig[st_idx:peak_idx]))
self.push_r_peak(last_noise_val, time_vector[detected_peaks_idx[-1]])
if peak_idx < 25:
st_idx = 0
else:
st_idx = peak_idx - 25
self.prev_max_slope = np.abs(np.diff(ecg_sig[st_idx:peak_idx + 25])).max()
else:
# The peak is in the previous chunk
# TODO: return a negative index for it!
pass
[docs]class FileRecorder:
"""Explorepy file recorder class.
This class can write ExG, orientation and environment data into (separated) EDF+ files. It can write data while
streaming from Explore device. The incoming data will be stored in a buffer and after it reached fs samples, it
writes the buffer in EDF file.
Attributes:
"""
def __init__(self, file_name, ch_label, fs, ch_unit, ch_min=None, ch_max=None,
device_name='Explore', file_type='edf', do_overwrite=False):
"""
Args:
file_name (str): File name
ch_label (list): List of channel labels.
fs (int): Sampling rate (must be identical for all channels)
ch_unit (list): List of channels unit (e.g. 'V', 'mG', 's', etc.)
ch_min (list): List of minimum value of each channel. Only needed in edf mode (can be None in csv mode)
ch_max (list): List of maximum value of each channel. Only needed in edf mode (can be None in csv mode)
device_name (str): Recording device name
file_type (str): File type. current options: 'edf' and 'csv'.
do_overwrite (bool): Overwrite file if a file with the same name exists already.
"""
# Check invalid characters
if set(r'<>{}[]~`*%').intersection(file_name):
raise ValueError("Invalid character in file name")
self._file_obj = None
self._file_type = file_type
self._ch_label = ch_label
self._ch_unit = ch_unit
self._ch_max = ch_max
self._ch_min = ch_min
self._n_chan = len(ch_label)
self._device_name = device_name
self._fs = int(fs)
if file_type == 'edf':
if (len(ch_unit) != len(ch_label)) or (len(ch_label) != len(ch_min)) or (len(ch_label) != len(ch_max)):
raise ValueError('ch_label, ch_unit, ch_min and ch_max must have the same length!')
self._file_name = file_name + '.edf'
self._create_edf(do_overwrite=do_overwrite)
self._init_edf_channels()
self._data = np.zeros((self._n_chan, 0))
elif file_type == 'csv':
self._file_name = file_name + '.csv'
self._create_csv(do_overwrite=do_overwrite)
@property
def fs(self):
return self._fs
[docs] def _create_edf(self, do_overwrite):
if (not do_overwrite) and os.path.isfile(self._file_name):
raise FileExistsError(self._file_name + ' already exists!')
assert self._file_obj is None, "Usage Error: File object has been created already."
self._file_obj = pyedflib.EdfWriter(self._file_name, self._n_chan, file_type=pyedflib.FILETYPE_BDFPLUS)
[docs] def _create_csv(self, do_overwrite):
if (not do_overwrite) and os.path.isfile(self._file_name):
raise FileExistsError(self._file_name + ' already exists!')
assert self._file_obj is None, "Usage Error: File object has been created already."
self._file_obj = open(self._file_name, 'w', newline='\n')
self._csv_obj = csv.writer(self._file_obj, delimiter=",")
self._csv_obj.writerow(self._ch_label)
[docs] def stop(self):
"""Stop recording"""
assert self._file_obj is not None, "Usage Error: File object has not been created yet."
if self._file_type == 'edf':
if self._data.shape[1] > 0:
self._file_obj.writeSamples(list(self._data))
self._file_obj.close()
self._file_obj = None
elif self._file_type == 'csv':
self._file_obj.close()
[docs] def _init_edf_channels(self):
self._file_obj.setEquipment(self._device_name)
self._file_obj.setStartdatetime(datetime.datetime.now())
ch_info_list = []
for ch in zip(self._ch_label, self._ch_unit, self._ch_max, self._ch_min):
ch_info_list.append({'label': ch[0],
'dimension': ch[1],
'sample_rate': self._fs,
'physical_max': ch[2],
'physical_min': ch[3],
'digital_max': 8388607,
'digital_min': -8388608,
'prefilter': '',
'transducer': ''
})
for i, ch_info in enumerate(ch_info_list):
self._file_obj.setSignalHeader(i, ch_info)
[docs] def write_data(self, data):
"""writes data to the file
Notes:
If file type is set to EDF, this function writes each 1 seconds of data. If the input is less than 1 second,
it will be buffered in the memory and it will be written in the file when enough data is in the buffer.
Args:
data (np.array): Array of data to be written in the file with dimension of n_chan x n_sample
"""
if self._file_type == 'edf':
if data.shape[0] != self._n_chan:
raise ValueError('Input first dimension must be {}'.format(self._n_chan))
self._data = np.concatenate((self._data, data), axis=1)
if self._data.shape[1] > self._fs:
self._file_obj.writeSamples(list(self._data[:, :self._fs]))
self._data = self._data[:, self._fs:]
elif self._file_type == 'csv':
self._csv_obj.writerows(data.T.tolist())
[docs] def set_marker(self, data):
"""Writes a marker event in the file
Args:
data (np.array): Array of marker data with size 2x1 ([[timestamp],[code]])
"""
if self._file_type == 'csv':
self.write_data(data)
elif self._file_type == 'edf':
self._file_obj.writeAnnotation(data[0, 0], 0.001, str(int(data[1, 0])))
if __name__ == '__main__':
file_name = 'test_rec'
labels = ['timestamp', 'ch01', 'ch02', 'ch_03', 'ch04']
units = ['V', 'V', 'V', 'V', 's']
mins = [-1, -1, -1, -1, 0]
maxs = [1, 1, 1, 1, 86400]
recorder = FileRecorder(file_name=file_name, fs=250, ch_label=labels, file_type='csv',
ch_unit=units, ch_max=maxs, ch_min=mins, do_overwrite=True)
for i in range(1002):
chunk = np.random.normal(0, 1, (5, 33))
recorder.write_data(chunk)
recorder.stop()