diff --git a/biosppy/signals/ecg.py b/biosppy/signals/ecg.py index dabbdb55..7c1a0591 100644 --- a/biosppy/signals/ecg.py +++ b/biosppy/signals/ecg.py @@ -15,6 +15,7 @@ # compat from __future__ import absolute_import, division, print_function from six.moves import range, zip +import inspect # 3rd party import math @@ -32,7 +33,50 @@ from scipy.signal import argrelextrema -def ecg(signal=None, sampling_rate=1000.0, units=None, path=None, show=True, interactive=False): +def call_segmenter(segmenter, filtered_signal, sampling_rate, verbose=False, **kwargs): + """Checks **kwargs against valid optional parameters for the selected segmenter + and calls it with the valid arguments. + + Parameters + ---------- + segmenter : Callable + The segmentation function to call. + filtered_signal : array_like + Filtered ECG signal. + sampling_rate : int or float + Sampling frequency in Hz. + verbose : bool, optional + If True, prints which valid parameters were passed to the segmenter. + **kwargs : dict + Additional optional keyword arguments to pass to the segmenter. + + Returns + ------- + rpeaks : array + Indices of detected R-peaks. + """ + sig = inspect.signature(segmenter) + allowed_args = sig.parameters.keys() + valid_args = {kwarg: val for kwarg, val in kwargs.items() if kwarg in allowed_args} + if verbose: + print(f"Passed valid parameters for segmenter: {segmenter}: {valid_args}") + (rpeaks,) = segmenter( + signal=filtered_signal, sampling_rate=sampling_rate, **valid_args + ) + return (rpeaks,) + + +def ecg( + signal=None, + sampling_rate=1000.0, + units=None, + path=None, + show=True, + interactive=False, + segmenter="hamilton", + verbose_segmenting=False, + **kwargs, +): """Process a raw ECG signal and extract relevant signal features using default parameters. @@ -51,6 +95,24 @@ def ecg(signal=None, sampling_rate=1000.0, units=None, path=None, show=True, int If True, show a summary plot. interactive : bool, optional If True, shows an interactive plot. + segmenter : string, optional + Name of the segmenting algorithm. + Available options: + + - `"hamilton"` → `hamilton_segmenter` + - `"ASI"` → `ASI_segmenter` + - `"gamboa"` → `gamboa_segmenter` + - `"engzee"` → `engzee_segmenter` + - `"ssf"` → `ssf_segmenter` + - `"pan-tompkins"` → `Pan_Tompkins_Plus_Plus_segmenter` + If a different name is passed hamilton will be used. + verbose_segmenting : bool, optional + If True, prints which valid parameters (for the chosen segmenter) + were passed. + **kwargs : dict + Additional optional keyword arguments (currently only viable for + segmenter parameters). + Returns ------- @@ -71,10 +133,16 @@ def ecg(signal=None, sampling_rate=1000.0, units=None, path=None, show=True, int """ + ALLOWED_KWARGS = ["Pth", "threshold", "tol", "before", "after"] # check inputs if signal is None: raise TypeError("Please specify an input signal.") + # validate kwargs + invalid_args = [k for k in kwargs if k not in ALLOWED_KWARGS] + if invalid_args: + raise ValueError(f"Unknown argument in kwargs: {invalid_args}") + # ensure numpy signal = np.array(signal) @@ -94,7 +162,22 @@ def ecg(signal=None, sampling_rate=1000.0, units=None, path=None, show=True, int filtered = filtered - np.mean(filtered) # remove DC offset # segment - (rpeaks,) = hamilton_segmenter(signal=filtered, sampling_rate=sampling_rate) + segmenters = { + "hamilton": hamilton_segmenter, + "ASI": ASI_segmenter, + "gamboa": gamboa_segmenter, + "engzee": engzee_segmenter, + "ssf": ssf_segmenter, + "pan-tompkins": Pan_Tompkins_Plus_Plus_segmenter, + } + try: + chosen_segmenter = segmenters[segmenter] + (rpeaks,) = call_segmenter( + chosen_segmenter, filtered, sampling_rate, verbose_segmenting, **kwargs + ) + except KeyError: + print("Unrecognized segmenter chosen, will use default (hamilton)") + (rpeaks,) = hamilton_segmenter(signal=filtered, sampling_rate=sampling_rate) # correct R-peak locations (rpeaks,) = correct_rpeaks( @@ -126,6 +209,7 @@ def ecg(signal=None, sampling_rate=1000.0, units=None, path=None, show=True, int if show: if interactive: from biosppy.inter_plotting import ecg as inter_plotting + inter_plotting.plot_ecg( ts=ts, raw=signal, @@ -1385,13 +1469,13 @@ def ASI_segmenter(signal=None, sampling_rate=1000.0, Pth=5.0): def Pan_Tompkins_Plus_Plus_segmenter(signal=None, sampling_rate=1000.0): - """ ECG QRS-Peak Detection Algorithm + """ECG QRS-Peak Detection Algorithm Follows the approach by Md Niaz and Naimul [MdNiNai22]_. Parameters ---------- - + signal : array Input raw ECG signal. sampling_rate: int, float, optional @@ -1404,7 +1488,7 @@ def Pan_Tompkins_Plus_Plus_segmenter(signal=None, sampling_rate=1000.0): References ---------- - .. [MdNiNai22] Khan, Naimul and Imtiaz, Md Niaz, "Pan-Tompkins++: A Robust Approach to Detect R-peaks in ECG Signals", + .. [MdNiNai22] Khan, Naimul and Imtiaz, Md Niaz, "Pan-Tompkins++: A Robust Approach to Detect R-peaks in ECG Signals", arXiv preprint arXiv:2211.03171, 2022 """ @@ -1412,93 +1496,105 @@ def Pan_Tompkins_Plus_Plus_segmenter(signal=None, sampling_rate=1000.0): # check inputs if signal is None: raise TypeError("Please specify an input signal.") - - ''' Initialize ''' + + """ Initialize """ delay = 0 - skip = 0 # Becomes one when a T wave is detected + skip = 0 # Becomes one when a T wave is detected m_selected_RR = 0 mean_RR = 0 ser_back = 0 - - ''' Noise Cancelation (Filtering) (5-18 Hz) ''' + """ Noise Cancelation (Filtering) (5-18 Hz) """ if sampling_rate == 200: - ''' Remove the mean of Signal ''' - #If fs=200 keep frequency 5-12 Hz otherwise 5-18 Hz - signal = signal - np.mean(signal) + """Remove the mean of Signal""" + # If fs=200 keep frequency 5-12 Hz otherwise 5-18 Hz + signal = signal - np.mean(signal) - Wn = 12*2/sampling_rate + Wn = 12 * 2 / sampling_rate N = 3 - a, b = ss.butter(N, Wn, btype='lowpass') + a, b = ss.butter(N, Wn, btype="lowpass") ecg_l = ss.filtfilt(a, b, signal) - - ecg_l = ecg_l/np.max(np.abs(ecg_l)) #Normalize by dividing high value. This reduces time of calculation - Wn = 5*2/sampling_rate - N = 3 # Order of 3 less processing - a, b = signal.butter(N, Wn, btype='highpass') # Bandpass filtering - ecg_h = signal.filtfilt(a, b, ecg_l, padlen=3*(max(len(a), len(b))-1)) - ecg_h = ecg_h/np.max(np.abs(ecg_h)) #Normalize by dividing high value. This reduces time of calculation + ecg_l = ecg_l / np.max( + np.abs(ecg_l) + ) # Normalize by dividing high value. This reduces time of calculation + + Wn = 5 * 2 / sampling_rate + N = 3 # Order of 3 less processing + a, b = signal.butter(N, Wn, btype="highpass") # Bandpass filtering + ecg_h = signal.filtfilt(a, b, ecg_l, padlen=3 * (max(len(a), len(b)) - 1)) + ecg_h = ecg_h / np.max( + np.abs(ecg_h) + ) # Normalize by dividing high value. This reduces time of calculation else: - ''' Band Pass Filter for noise cancelation of other sampling frequencies (Filtering)''' - f1 = 5 #3 #5 # cutoff low frequency to get rid of baseline wander - f2 = 18 #25 #15 # cutoff frequency to discard high frequency noise - Wn = [f1*2/sampling_rate, f2*2/sampling_rate] # cutoff based on fs - N = 3 - a, b = ss.butter(N=N, Wn=Wn, btype='bandpass') # Bandpass filtering - ecg_h = ss.filtfilt(a, b, signal, padlen=3*(max(len(a), len(b)) - 1)) - - ecg_h = ecg_h/np.max(np.abs(ecg_h)) + """Band Pass Filter for noise cancelation of other sampling frequencies (Filtering)""" + f1 = 5 # 3 #5 # cutoff low frequency to get rid of baseline wander + f2 = 18 # 25 #15 # cutoff frequency to discard high frequency noise + Wn = [f1 * 2 / sampling_rate, f2 * 2 / sampling_rate] # cutoff based on fs + N = 3 + a, b = ss.butter(N=N, Wn=Wn, btype="bandpass") # Bandpass filtering + ecg_h = ss.filtfilt(a, b, signal, padlen=3 * (max(len(a), len(b)) - 1)) + + ecg_h = ecg_h / np.max(np.abs(ecg_h)) vector = [1, 2, 0, -2, -1] if sampling_rate != 200: - int_c = 160/sampling_rate - b = interpolate.interp1d(range(1, 6), [i*sampling_rate/8 for i in vector])(np.arange(1, 5.1, int_c)) - - else: - b = [i*sampling_rate/8 for i in vector] + int_c = 160 / sampling_rate + b = interpolate.interp1d(range(1, 6), [i * sampling_rate / 8 for i in vector])( + np.arange(1, 5.1, int_c) + ) - ecg_d = ss.filtfilt(b, 1, ecg_h, padlen=3*(max(len(a), len(b)) - 1)) + else: + b = [i * sampling_rate / 8 for i in vector] - ecg_d = ecg_d/np.max(ecg_d) + ecg_d = ss.filtfilt(b, 1, ecg_h, padlen=3 * (max(len(a), len(b)) - 1)) + ecg_d = ecg_d / np.max(ecg_d) - ''' Squaring nonlinearly enhance the dominant peaks ''' + """ Squaring nonlinearly enhance the dominant peaks """ ecg_s = ecg_d**2 - - #Smoothing - sm_size = int(0.06 * sampling_rate) - ecg_s = st.smoother(signal=ecg_s, kernel='flattop', size=sm_size, mirror=True) + # Smoothing + sm_size = int(0.06 * sampling_rate) + ecg_s = st.smoother(signal=ecg_s, kernel="flattop", size=sm_size, mirror=True) - temp_vector = np.ones((1, round(0.150*sampling_rate)))/round(0.150*sampling_rate) # 150ms moving window, widest possible QRS width + temp_vector = np.ones((1, round(0.150 * sampling_rate))) / round( + 0.150 * sampling_rate + ) # 150ms moving window, widest possible QRS width temp_vector = temp_vector.flatten() - ecg_m = np.convolve(ecg_s["signal"], temp_vector) #Convolution signal and moving window sample - - delay = delay + round(0.150*sampling_rate)/2 + ecg_m = np.convolve( + ecg_s["signal"], temp_vector + ) # Convolution signal and moving window sample + delay = delay + round(0.150 * sampling_rate) / 2 pks = [] - locs = peakutils.indexes(y=ecg_m, thres=0, min_dist=round(0.231*sampling_rate)) #Find all the peaks apart from previous peak 231ms, peak indices + locs = peakutils.indexes( + y=ecg_m, thres=0, min_dist=round(0.231 * sampling_rate) + ) # Find all the peaks apart from previous peak 231ms, peak indices for val in locs: - pks.append(ecg_m[val]) #Peak magnitudes + pks.append(ecg_m[val]) # Peak magnitudes - ''' Initialize Some Other Parameters ''' + """ Initialize Some Other Parameters """ LLp = len(pks) - ''' Stores QRS with respect to Signal and Filtered Signal ''' - qrs_c = np.zeros(LLp) # Amplitude of R peak in convoluted (after moving window) signal - qrs_i = np.zeros(LLp) # Index of R peak in convoluted (after moving window) signal - qrs_i_raw = np.zeros(LLp) # Index of R peak in filtered (before derivative and moving windoe) signal - qrs_amp_raw = np.zeros(LLp) # Amplitude of R in filtered signal - ''' Noise Buffers ''' + """ Stores QRS with respect to Signal and Filtered Signal """ + qrs_c = np.zeros( + LLp + ) # Amplitude of R peak in convoluted (after moving window) signal + qrs_i = np.zeros(LLp) # Index of R peak in convoluted (after moving window) signal + qrs_i_raw = np.zeros( + LLp + ) # Index of R peak in filtered (before derivative and moving windoe) signal + qrs_amp_raw = np.zeros(LLp) # Amplitude of R in filtered signal + """ Noise Buffers """ nois_c = np.zeros(LLp) nois_i = np.zeros(LLp) - ''' Buffers for signal and noise ''' + """ Buffers for signal and noise """ SIGL_buf = np.zeros(LLp) NOISL_buf = np.zeros(LLp) @@ -1507,297 +1603,364 @@ def Pan_Tompkins_Plus_Plus_segmenter(signal=None, sampling_rate=1000.0): NOISL_buf1 = np.zeros(LLp) THRS_buf1 = np.zeros(LLp) - ''' Initialize the training phase (2 seconds of the signal) to determine the THR_SIG and THR_NOISE ''' - #Threshold of signal after moving average operation; Take first 2s window max peak to set initial Threshold - THR_SIG = np.max(ecg_m[:2*int(sampling_rate)+1])*1/3 # Threshold-1 (paper) #0.33 of the max amplitude - THR_NOISE = np.mean(ecg_m[:2*int(sampling_rate)+1])*1/2 #Threshold-2 (paper) # 0.5 of the mean signal is considered to be noise - SIG_LEV = THR_SIG #SPK for convoluted (after moving window) signal - NOISE_LEV = THR_NOISE #NPK for convoluted (after moving window) signal - - - ''' Initialize bandpath filter threshold (2 seconds of the bandpass signal) ''' - #Threshold of signal before derivative and moving average operation, just after 5-18 Hz filtering - THR_SIG1 = np.max(ecg_h[:2*int(sampling_rate)+1])*1/3 #Threshold-1 - THR_NOISE1 = np.mean(ecg_h[:2*int(sampling_rate)+1])*1/2 #Threshold-2 - SIG_LEV1 = THR_SIG1 # Signal level in Bandpassed filter; SPK for filtered signal - NOISE_LEV1 = THR_NOISE1 # Noise level in Bandpassed filter; NPK for filtered signal - - - - ''' Thresholding and decision rule ''' - - Beat_C = 0 #Beat count for convoluted signal - Beat_C1 = 0 #Beat count for filtred signal + """ Initialize the training phase (2 seconds of the signal) to determine the THR_SIG and THR_NOISE """ + # Threshold of signal after moving average operation; Take first 2s window max peak to set initial Threshold + THR_SIG = ( + np.max(ecg_m[: 2 * int(sampling_rate) + 1]) * 1 / 3 + ) # Threshold-1 (paper) #0.33 of the max amplitude + THR_NOISE = ( + np.mean(ecg_m[: 2 * int(sampling_rate) + 1]) * 1 / 2 + ) # Threshold-2 (paper) # 0.5 of the mean signal is considered to be noise + SIG_LEV = THR_SIG # SPK for convoluted (after moving window) signal + NOISE_LEV = THR_NOISE # NPK for convoluted (after moving window) signal + + """ Initialize bandpath filter threshold (2 seconds of the bandpass signal) """ + # Threshold of signal before derivative and moving average operation, just after 5-18 Hz filtering + THR_SIG1 = np.max(ecg_h[: 2 * int(sampling_rate) + 1]) * 1 / 3 # Threshold-1 + THR_NOISE1 = np.mean(ecg_h[: 2 * int(sampling_rate) + 1]) * 1 / 2 # Threshold-2 + SIG_LEV1 = THR_SIG1 # Signal level in Bandpassed filter; SPK for filtered signal + NOISE_LEV1 = THR_NOISE1 # Noise level in Bandpassed filter; NPK for filtered signal + + """ Thresholding and decision rule """ + + Beat_C = 0 # Beat count for convoluted signal + Beat_C1 = 0 # Beat count for filtred signal Noise_Count = 0 - Check_Flag=0 + Check_Flag = 0 for i in range(LLp): - ''' Locate the corresponding peak in the filtered signal ''' - - if locs[i] - round(0.150*sampling_rate) >= 1 and locs[i] <= len(ecg_h): - temp_vec = ecg_h[locs[i] - round(0.150*sampling_rate):locs[i]+1] # Find the values from the preceding 150ms of the peak - y_i = np.max(temp_vec) #Find the max magnitude in that 150ms window - x_i = list(temp_vec).index(y_i) #Find the index of the max value with respect to (peak-150ms) starts as 0 index + """Locate the corresponding peak in the filtered signal""" + + if locs[i] - round(0.150 * sampling_rate) >= 1 and locs[i] <= len(ecg_h): + temp_vec = ecg_h[ + locs[i] - round(0.150 * sampling_rate) : locs[i] + 1 + ] # Find the values from the preceding 150ms of the peak + y_i = np.max(temp_vec) # Find the max magnitude in that 150ms window + x_i = list(temp_vec).index( + y_i + ) # Find the index of the max value with respect to (peak-150ms) starts as 0 index else: if i == 0: - temp_vec = ecg_h[:locs[i]+1] + temp_vec = ecg_h[: locs[i] + 1] y_i = np.max(temp_vec) x_i = list(temp_vec).index(y_i) ser_back = 1 elif locs[i] >= len(ecg_h): - temp_vec = ecg_h[int(locs[i] - round(0.150*sampling_rate)):] #c + temp_vec = ecg_h[int(locs[i] - round(0.150 * sampling_rate)) :] # c y_i = np.max(temp_vec) x_i = list(temp_vec).index(y_i) - - ''' Update the Hearth Rate ''' + """ Update the Hearth Rate """ if Beat_C >= 9: - diffRR = np.diff(qrs_i[Beat_C-9:Beat_C]) # Calculate RR interval of recent 8 heart beats (taken from R peaks) - mean_RR = np.mean(diffRR) # Calculate the mean of 8 previous R waves interval - comp = qrs_i[Beat_C-1] - qrs_i[Beat_C-2] # Latest RR - - m_selected_RR = mean_RR #The latest regular beats mean - - ''' Calculate the mean last 8 R waves ''' + diffRR = np.diff( + qrs_i[Beat_C - 9 : Beat_C] + ) # Calculate RR interval of recent 8 heart beats (taken from R peaks) + mean_RR = np.mean( + diffRR + ) # Calculate the mean of 8 previous R waves interval + comp = qrs_i[Beat_C - 1] - qrs_i[Beat_C - 2] # Latest RR + + m_selected_RR = mean_RR # The latest regular beats mean + + """ Calculate the mean last 8 R waves """ if bool(m_selected_RR): - test_m = m_selected_RR #if the regular RR available use it + test_m = m_selected_RR # if the regular RR available use it elif bool(mean_RR) and m_selected_RR == 0: test_m = mean_RR else: test_m = 0 - #If no R peaks in 1.4s then check with the reduced Threshold - if (locs[i] - qrs_i[Beat_C-1]) >= round(1.4*sampling_rate): - - temp_vec = ecg_m[int(qrs_i[Beat_C-1] + round(0.360*sampling_rate)):int(locs[i])+1] #Search after 360ms of previous QRS to current peak - if temp_vec.size: - pks_temp = np.max(temp_vec) #search back and locate the max in the interval - locs_temp = list(temp_vec).index(pks_temp) - locs_temp = qrs_i[Beat_C-1] + round(0.360*sampling_rate) + locs_temp - + # If no R peaks in 1.4s then check with the reduced Threshold + if (locs[i] - qrs_i[Beat_C - 1]) >= round(1.4 * sampling_rate): - if pks_temp > THR_NOISE*0.2: #Check with 20% of the noise threshold - - Beat_C = Beat_C + 1 - if (Beat_C-1)>=LLp: + temp_vec = ecg_m[ + int(qrs_i[Beat_C - 1] + round(0.360 * sampling_rate)) : int(locs[i]) + 1 + ] # Search after 360ms of previous QRS to current peak + if temp_vec.size: + pks_temp = np.max( + temp_vec + ) # search back and locate the max in the interval + locs_temp = list(temp_vec).index(pks_temp) + locs_temp = qrs_i[Beat_C - 1] + round(0.360 * sampling_rate) + locs_temp + + if pks_temp > THR_NOISE * 0.2: # Check with 20% of the noise threshold + + Beat_C = Beat_C + 1 + if (Beat_C - 1) >= LLp: + break + qrs_c[Beat_C - 1] = pks_temp + qrs_i[Beat_C - 1] = locs_temp + + """ Locate in Filtered Signal """ + # Once we find the peak in convoluted signal, we will search in the filtered signal for max peak with a 150 ms window before that location + if locs_temp <= len(ecg_h): + + temp_vec = ecg_h[ + int(locs_temp - round(0.150 * sampling_rate)) + + 1 : int(locs_temp) + + 2 + ] + y_i_t = np.max(temp_vec) + x_i_t = list(temp_vec).index(y_i_t) + else: + temp_vec = ecg_h[ + int(locs_temp - round(0.150 * sampling_rate)) : + ] + y_i_t = np.max(temp_vec) + x_i_t = list(temp_vec).index(y_i_t) + + if y_i_t > THR_NOISE1 * 0.2: + Beat_C1 = Beat_C1 + 1 + if (Beat_C1 - 1) >= LLp: break - qrs_c[Beat_C-1] = pks_temp - qrs_i[Beat_C-1] = locs_temp + temp_value = locs_temp - round(0.150 * sampling_rate) + x_i_t + qrs_i_raw[Beat_C1 - 1] = temp_value + qrs_amp_raw[Beat_C1 - 1] = y_i_t + SIG_LEV1 = 0.75 * y_i_t + 0.25 * SIG_LEV1 - ''' Locate in Filtered Signal ''' - #Once we find the peak in convoluted signal, we will search in the filtered signal for max peak with a 150 ms window before that location - if locs_temp <= len(ecg_h): - - temp_vec = ecg_h[int(locs_temp-round(0.150*sampling_rate))+1:int(locs_temp)+2] - y_i_t = np.max(temp_vec) - x_i_t = list(temp_vec).index(y_i_t) - else: - temp_vec = ecg_h[int(locs_temp-round(0.150*sampling_rate)):] - y_i_t = np.max(temp_vec) - x_i_t = list(temp_vec).index(y_i_t) - + not_nois = 1 - if y_i_t > THR_NOISE1*0.2: - Beat_C1 = Beat_C1 + 1 - if (Beat_C1-1)>=LLp: - break - temp_value = locs_temp - round(0.150*sampling_rate) + x_i_t - qrs_i_raw[Beat_C1-1] = temp_value - qrs_amp_raw[Beat_C1-1] = y_i_t - - SIG_LEV1 = 0.75 * y_i_t + 0.25 *SIG_LEV1 - + SIG_LEV = 0.75 * pks_temp + 0.25 * SIG_LEV + else: + not_nois = 0 - not_nois = 1 - - SIG_LEV = 0.75 * pks_temp + 0.25 *SIG_LEV - else: - not_nois = 0 - - - elif bool(test_m): #Check for missed QRS if no QRS is detected in 166 percent of - #the current average RR interval or 1s after the last detected QRS. the maximal peak detected in - #that time interval that lies Threshold1 and Threshold-3 (paper) is considered to be a possible QRS complex - - if ((locs[i] - qrs_i[Beat_C-1]) >= round(1.66*test_m)) or ((locs[i] - qrs_i[Beat_C-1]) > round(1*sampling_rate)): #it shows a QRS is missed - - temp_vec = ecg_m[int(qrs_i[Beat_C-1] + round(0.360*sampling_rate)):int(locs[i])+1] #Search after 360ms of previous QRS to current peak + elif bool( + test_m + ): # Check for missed QRS if no QRS is detected in 166 percent of + # the current average RR interval or 1s after the last detected QRS. the maximal peak detected in + # that time interval that lies Threshold1 and Threshold-3 (paper) is considered to be a possible QRS complex + + if ((locs[i] - qrs_i[Beat_C - 1]) >= round(1.66 * test_m)) or ( + (locs[i] - qrs_i[Beat_C - 1]) > round(1 * sampling_rate) + ): # it shows a QRS is missed + + temp_vec = ecg_m[ + int(qrs_i[Beat_C - 1] + round(0.360 * sampling_rate)) : int(locs[i]) + + 1 + ] # Search after 360ms of previous QRS to current peak if temp_vec.size: - pks_temp = np.max(temp_vec) #search back and locate the max in the interval + pks_temp = np.max( + temp_vec + ) # search back and locate the max in the interval locs_temp = list(temp_vec).index(pks_temp) - locs_temp = qrs_i[Beat_C-1] + round(0.360*sampling_rate) + locs_temp - - #Consider signal between the preceding 3 QRS complexes and the following 3 peaks to calculate Threshold-3 (paper) - - THR_NOISE_TMP=THR_NOISE - if i<(len(locs)-3): - temp_vec_tmp=ecg_m[int(qrs_i[Beat_C-3] + round(0.360*sampling_rate)):int(locs[i+3])+1] #values between the preceding 3 QRS complexes and the following 3 peaks - THR_NOISE_TMP =0.5*THR_NOISE+0.5*( np.mean(temp_vec_tmp)*1/2) #Calculate Threshold3 - - if pks_temp > THR_NOISE_TMP: #If max peak in that range greater than Threshold3 mark that as a heart beat - + locs_temp = ( + qrs_i[Beat_C - 1] + round(0.360 * sampling_rate) + locs_temp + ) + + # Consider signal between the preceding 3 QRS complexes and the following 3 peaks to calculate Threshold-3 (paper) + + THR_NOISE_TMP = THR_NOISE + if i < (len(locs) - 3): + temp_vec_tmp = ecg_m[ + int(qrs_i[Beat_C - 3] + round(0.360 * sampling_rate)) : int( + locs[i + 3] + ) + + 1 + ] # values between the preceding 3 QRS complexes and the following 3 peaks + THR_NOISE_TMP = 0.5 * THR_NOISE + 0.5 * ( + np.mean(temp_vec_tmp) * 1 / 2 + ) # Calculate Threshold3 + + if ( + pks_temp > THR_NOISE_TMP + ): # If max peak in that range greater than Threshold3 mark that as a heart beat + Beat_C = Beat_C + 1 - if (Beat_C-1)>=LLp: + if (Beat_C - 1) >= LLp: break - qrs_c[Beat_C-1] = pks_temp #Mark R peak in the convoluted signal - qrs_i[Beat_C-1] = locs_temp - + qrs_c[Beat_C - 1] = ( + pks_temp # Mark R peak in the convoluted signal + ) + qrs_i[Beat_C - 1] = locs_temp - ''' Locate in Filtered Signal ''' - #Once we find the peak in convoluted signal, we will search in the filtered signal for max peak with a 150 ms window before that location + """ Locate in Filtered Signal """ + # Once we find the peak in convoluted signal, we will search in the filtered signal for max peak with a 150 ms window before that location if locs_temp <= len(ecg_h): - - temp_vec = ecg_h[int(locs_temp-round(0.150*sampling_rate))+1:int(locs_temp)+2] + + temp_vec = ecg_h[ + int(locs_temp - round(0.150 * sampling_rate)) + + 1 : int(locs_temp) + + 2 + ] y_i_t = np.max(temp_vec) x_i_t = list(temp_vec).index(y_i_t) else: - temp_vec = ecg_h[int(locs_temp-round(0.150*sampling_rate)):] + temp_vec = ecg_h[ + int(locs_temp - round(0.150 * sampling_rate)) : + ] y_i_t = np.max(temp_vec) x_i_t = list(temp_vec).index(y_i_t) - - - ''' Band Pass Signal Threshold ''' - THR_NOISE1_TMP=THR_NOISE1 - if i<(len(locs)-3): - temp_vec_tmp=ecg_h[int(qrs_i[Beat_C-3] + round(0.360*sampling_rate)-round(0.150*sampling_rate)+1):int(locs[i+3])+1] - THR_NOISE1_TMP =0.5*THR_NOISE1+0.5*( np.mean(temp_vec_tmp)*1/2) + + """ Band Pass Signal Threshold """ + THR_NOISE1_TMP = THR_NOISE1 + if i < (len(locs) - 3): + temp_vec_tmp = ecg_h[ + int( + qrs_i[Beat_C - 3] + + round(0.360 * sampling_rate) + - round(0.150 * sampling_rate) + + 1 + ) : int(locs[i + 3]) + + 1 + ] + THR_NOISE1_TMP = 0.5 * THR_NOISE1 + 0.5 * ( + np.mean(temp_vec_tmp) * 1 / 2 + ) if y_i_t > THR_NOISE1_TMP: Beat_C1 = Beat_C1 + 1 - if (Beat_C1-1)>=LLp: + if (Beat_C1 - 1) >= LLp: break - temp_value = locs_temp - round(0.150*sampling_rate) + x_i_t - qrs_i_raw[Beat_C1-1] = temp_value # R peak marked with index in filtered signal - qrs_amp_raw[Beat_C1-1] = y_i_t # Amplitude of that R peak - - SIG_LEV1 = 0.75 * y_i_t + 0.25 *SIG_LEV1 - + temp_value = ( + locs_temp - round(0.150 * sampling_rate) + x_i_t + ) + qrs_i_raw[Beat_C1 - 1] = ( + temp_value # R peak marked with index in filtered signal + ) + qrs_amp_raw[Beat_C1 - 1] = y_i_t # Amplitude of that R peak + + SIG_LEV1 = 0.75 * y_i_t + 0.25 * SIG_LEV1 not_nois = 1 - #Changed- For missed R peaks- Update THR - SIG_LEV = 0.75 * pks_temp + 0.25 *SIG_LEV + # Changed- For missed R peaks- Update THR + SIG_LEV = 0.75 * pks_temp + 0.25 * SIG_LEV else: not_nois = 0 else: not_nois = 0 - - - ''' Find noise and QRS Peaks ''' + """ Find noise and QRS Peaks """ if pks[i] >= THR_SIG: - ''' if NO QRS in 360 ms of the previous QRS or in 50 percent of - the current average RR interval, See if T wave ''' - + """if NO QRS in 360 ms of the previous QRS or in 50 percent of + the current average RR interval, See if T wave""" + if Beat_C >= 3: if bool(test_m): - if (locs[i] - qrs_i[Beat_C-1]) <= round(0.5*test_m): #Check 50 percent of the current average RR interval - - Check_Flag=1 - if (locs[i] - qrs_i[Beat_C-1] <= round(0.36*sampling_rate)) or Check_Flag==1: - - temp_vec = ecg_m[locs[i]-round(0.07*sampling_rate):locs[i]+1] - Slope1 = np.mean(np.diff(temp_vec)) # mean slope of the waveform at that position - temp_vec = ecg_m[int(qrs_i[Beat_C-1] - round(0.07*sampling_rate)) - 1 : int(qrs_i[Beat_C-1])+1] - Slope2 = np.mean(np.diff(temp_vec)) # mean slope of previous R wave - - if np.abs(Slope1) <= np.abs(0.6*Slope2): # slope less then 0.6 of previous R; checking if it is noise + if (locs[i] - qrs_i[Beat_C - 1]) <= round( + 0.5 * test_m + ): # Check 50 percent of the current average RR interval + + Check_Flag = 1 + if ( + locs[i] - qrs_i[Beat_C - 1] <= round(0.36 * sampling_rate) + ) or Check_Flag == 1: + + temp_vec = ecg_m[ + locs[i] - round(0.07 * sampling_rate) : locs[i] + 1 + ] + Slope1 = np.mean( + np.diff(temp_vec) + ) # mean slope of the waveform at that position + temp_vec = ecg_m[ + int(qrs_i[Beat_C - 1] - round(0.07 * sampling_rate)) + - 1 : int(qrs_i[Beat_C - 1]) + + 1 + ] + Slope2 = np.mean(np.diff(temp_vec)) # mean slope of previous R wave + + if np.abs(Slope1) <= np.abs( + 0.6 * Slope2 + ): # slope less then 0.6 of previous R; checking if it is noise Noise_Count = Noise_Count + 1 nois_c[Noise_Count] = pks[i] nois_i[Noise_Count] = locs[i] - skip = 1 # T wave identification + skip = 1 # T wave identification else: skip = 0 - ''' Skip is 1 when a T wave is detected ''' + """ Skip is 1 when a T wave is detected """ if skip == 0: Beat_C = Beat_C + 1 - if (Beat_C-1)>=LLp: + if (Beat_C - 1) >= LLp: break - qrs_c[Beat_C-1] = pks[i] #Mark as R peak in the convoluted signal - qrs_i[Beat_C-1] = locs[i] + qrs_c[Beat_C - 1] = pks[i] # Mark as R peak in the convoluted signal + qrs_i[Beat_C - 1] = locs[i] - - ''' Band pass Filter check threshold ''' + """ Band pass Filter check threshold """ if y_i >= THR_SIG1: - Beat_C1 = Beat_C1 + 1 #Mark as R peak in the filtered signal - if (Beat_C1-1)>=LLp: + Beat_C1 = Beat_C1 + 1 # Mark as R peak in the filtered signal + if (Beat_C1 - 1) >= LLp: break if bool(ser_back): # +1 to agree with Matlab implementation temp_value = x_i + 1 - qrs_i_raw[Beat_C1-1] = temp_value + qrs_i_raw[Beat_C1 - 1] = temp_value else: - temp_value = locs[i] - round(0.150*sampling_rate) + x_i - qrs_i_raw[Beat_C1-1] = temp_value - - qrs_amp_raw[Beat_C1-1] = y_i + temp_value = locs[i] - round(0.150 * sampling_rate) + x_i + qrs_i_raw[Beat_C1 - 1] = temp_value - SIG_LEV1 = 0.125*y_i + 0.875*SIG_LEV1 + qrs_amp_raw[Beat_C1 - 1] = y_i + SIG_LEV1 = 0.125 * y_i + 0.875 * SIG_LEV1 - SIG_LEV = 0.125*pks[i] + 0.875*SIG_LEV - + SIG_LEV = 0.125 * pks[i] + 0.875 * SIG_LEV elif THR_NOISE <= pks[i] and pks[i] < THR_SIG: NOISE_LEV1 = 0.125 * y_i + 0.875 * NOISE_LEV1 - NOISE_LEV = 0.125*pks[i] + 0.875 * NOISE_LEV + NOISE_LEV = 0.125 * pks[i] + 0.875 * NOISE_LEV - elif pks[i] < THR_NOISE: #If less than noise threshold (Threshold-2) mark as noise + elif ( + pks[i] < THR_NOISE + ): # If less than noise threshold (Threshold-2) mark as noise nois_c[Noise_Count] = pks[i] nois_i[Noise_Count] = locs[i] Noise_Count = Noise_Count + 1 + NOISE_LEV1 = 0.125 * y_i + 0.875 * NOISE_LEV1 + NOISE_LEV = 0.125 * pks[i] + 0.875 * NOISE_LEV - NOISE_LEV1 = 0.125*y_i +0.875 *NOISE_LEV1 - NOISE_LEV = 0.125*pks[i] + 0.875*NOISE_LEV - - ''' Adjust the threshold with SNR ''' + """ Adjust the threshold with SNR """ if NOISE_LEV != 0 or SIG_LEV != 0: - THR_SIG = NOISE_LEV + 0.25 * (np.abs(SIG_LEV - NOISE_LEV)) #Calculate Threshold-1 for convoluted signal; above this R peak - THR_NOISE = 0.4* THR_SIG #Calculate Threshold-2 for convoluted signal; below this Noise - - ''' Adjust the threshold with SNR for bandpassed signal ''' + THR_SIG = NOISE_LEV + 0.25 * ( + np.abs(SIG_LEV - NOISE_LEV) + ) # Calculate Threshold-1 for convoluted signal; above this R peak + THR_NOISE = ( + 0.4 * THR_SIG + ) # Calculate Threshold-2 for convoluted signal; below this Noise - if NOISE_LEV1 != 0 or SIG_LEV1 != 0: - THR_SIG1 = NOISE_LEV1 + 0.25*(np.abs(SIG_LEV1 - NOISE_LEV1)) #Calculate Threshold-1 for filtered signal; above this R peak - THR_NOISE1 = 0.4* THR_SIG1 #Calculate Threshold-2 for filtered signal; below this Noise + """ Adjust the threshold with SNR for bandpassed signal """ + if NOISE_LEV1 != 0 or SIG_LEV1 != 0: + THR_SIG1 = NOISE_LEV1 + 0.25 * ( + np.abs(SIG_LEV1 - NOISE_LEV1) + ) # Calculate Threshold-1 for filtered signal; above this R peak + THR_NOISE1 = ( + 0.4 * THR_SIG1 + ) # Calculate Threshold-2 for filtered signal; below this Noise - ''' take a track of thresholds of smoothed signal ''' + """ take a track of thresholds of smoothed signal """ SIGL_buf[i] = SIG_LEV NOISL_buf[i] = NOISE_LEV THRS_buf[i] = THR_SIG - ''' take a track of thresholds of filtered signal ''' + """ take a track of thresholds of filtered signal """ SIGL_buf1[i] = SIG_LEV1 NOISL_buf1[i] = NOISE_LEV1 THRS_buf1[i] = THR_SIG1 - ''' reset parameters ''' + """ reset parameters """ skip = 0 not_nois = 0 ser_back = 0 - Check_Flag=0 + Check_Flag = 0 - - - ''' Adjust lengths ''' + """ Adjust lengths """ qrs_i_raw = qrs_i_raw[:Beat_C1] qrs_amp_raw = qrs_amp_raw[:Beat_C1] - qrs_c = qrs_c[:Beat_C+1] - qrs_i = qrs_i[:Beat_C+1] - + qrs_c = qrs_c[: Beat_C + 1] + qrs_i = qrs_i[: Beat_C + 1] + return utils.ReturnTuple((qrs_i_raw.astype(int),), ("rpeaks",)) def find_artifacts(peaks, sampling_rate): - '''find_artifacts: find and classify artifacts + """find_artifacts: find and classify artifacts Parameters ---------- @@ -1812,7 +1975,7 @@ def find_artifacts(peaks, sampling_rate): Struct containing indices of detected artifacts. subspaces: dictionary Subspaces containing rr, drrs, mrrs, s12, s22, c1, c2 used to classify artifacts. - ''' + """ c1 = 0.13 c2 = 0.17 alpha = 5.2 @@ -1825,16 +1988,16 @@ def find_artifacts(peaks, sampling_rate): # Artifact identification drrs = np.diff(rr) drrs = np.insert(drrs, 0, np.mean(drrs)) - th1 = estimate_th(drrs, alpha, ww) + th1 = estimate_th(drrs, alpha, ww) drrs = drrs / th1 padding = 2 drrs_pad = np.pad(drrs, padding, "reflect") - - '''drrs_pad = np.pad(drrs, (padding, padding), 'symmetric') + + """drrs_pad = np.pad(drrs, (padding, padding), 'symmetric') drrs_pad[:padding] += 1 - drrs_pad[-padding:] -= 1''' + drrs_pad[-padding:] -= 1""" s12 = np.zeros(len(drrs)) for d in range(padding, padding + len(drrs)): @@ -1853,7 +2016,7 @@ def find_artifacts(peaks, sampling_rate): medrr = ss.medfilt(rr, medfilt_order) mrrs = rr - medrr mrrs[mrrs < 0] *= 2 - th2 = estimate_th(mrrs, alpha, ww) + th2 = estimate_th(mrrs, alpha, ww) mrrs = mrrs / th2 # Artifacts classification @@ -1911,25 +2074,26 @@ def find_artifacts(peaks, sampling_rate): i += 1 artifacts = { - 'ectopic': ectopic_indices, - 'missed': missed_indices, - 'extra': extra_indices, - 'longshort': longshort_indices + "ectopic": ectopic_indices, + "missed": missed_indices, + "extra": extra_indices, + "longshort": longshort_indices, } subspaces = { - 'rr': rr, - 'drrs': drrs, - 'mrrs': mrrs, - 's12': s12, - 's22': s22, - 'c1': c1, - 'c2': c2 + "rr": rr, + "drrs": drrs, + "mrrs": mrrs, + "s12": s12, + "s22": s22, + "c1": c1, + "c2": c2, } return artifacts, subspaces + def estimate_th(x, alpha, ww): - '''estimate_th: estimate threshold + """estimate_th: estimate threshold Parameters ---------- @@ -1944,19 +2108,20 @@ def estimate_th(x, alpha, ww): ------- th: float Threshold. - ''' + """ x_abs = np.abs(x) - q1 = filters.percentile_filter(x_abs, 25, size=ww, mode='reflect') - q3 = filters.percentile_filter(x_abs, 75, size=ww, mode='reflect') + q1 = filters.percentile_filter(x_abs, 25, size=ww, mode="reflect") + q3 = filters.percentile_filter(x_abs, 75, size=ww, mode="reflect") th = alpha * ((q3 - q1) / 2) return th + def correct_extra(extra_indices, peaks): - '''correct_extra: correct extra beat by deleting it. + """correct_extra: correct extra beat by deleting it. Parameters ---------- @@ -1969,13 +2134,14 @@ def correct_extra(extra_indices, peaks): ------- corrected_peaks: array Vector containing indices of corrected peaks. - ''' + """ corrected_peaks = peaks.copy() corrected_peaks = np.delete(corrected_peaks, extra_indices) return corrected_peaks + def correct_misaligned(misaligned_indices, peaks): - '''correct_misaligned: correct misaligned beat (long or short) by interpolating new values to the RR time series. + """correct_misaligned: correct misaligned beat (long or short) by interpolating new values to the RR time series. Parameters ---------- @@ -1988,13 +2154,12 @@ def correct_misaligned(misaligned_indices, peaks): ------- corrected_peaks: array Vector containing indices of corrected peaks. - ''' + """ corrected_peaks = np.array(peaks.copy()) misaligned_indices = np.array(misaligned_indices) valid_indices = np.logical_and( - misaligned_indices > 1, - misaligned_indices < len(corrected_peaks) - 1 + misaligned_indices > 1, misaligned_indices < len(corrected_peaks) - 1 ) misaligned_indices = misaligned_indices[valid_indices] prev_peaks = corrected_peaks[misaligned_indices - 1] @@ -2008,8 +2173,9 @@ def correct_misaligned(misaligned_indices, peaks): return corrected_peaks + def correct_missed(missed_indices, peaks): - '''correct_missed: correct missed beat by adding new R-wave occurrence time so that + """correct_missed: correct missed beat by adding new R-wave occurrence time so that it divides the detected long RR interval into two equal halves and RR interval series is then recalculated. @@ -2024,24 +2190,27 @@ def correct_missed(missed_indices, peaks): ------- corrected_peaks: array Vector containing indices of corrected peaks. - ''' + """ corrected_peaks = peaks.copy() missed_indices = np.array(missed_indices) valid_indices = np.logical_and( missed_indices > 1, missed_indices < len(corrected_peaks) - ) + ) missed_indices = missed_indices[valid_indices] prev_peaks = corrected_peaks[[i - 1 for i in missed_indices]] next_peaks = corrected_peaks[missed_indices] - added_peaks = [round(prev_peaks[i] + (next_peaks[i] - prev_peaks[i]) / 2) for i in range(len(valid_indices))] + added_peaks = [ + round(prev_peaks[i] + (next_peaks[i] - prev_peaks[i]) / 2) + for i in range(len(valid_indices)) + ] corrected_peaks = np.insert(corrected_peaks, missed_indices, added_peaks) - return corrected_peaks + def update_indices(source_indices, update_indices, update): - '''update_indices: updates the indices in update_indices based on the values in source_indices and update. + """update_indices: updates the indices in update_indices based on the values in source_indices and update. Parameters ---------- @@ -2050,38 +2219,39 @@ def update_indices(source_indices, update_indices, update): update_indices : array Vector containing update_indices. update: int - Update index + Update index Returns ------- list(np.unique(update_indices)): array Vector containing unique updated indices. - ''' + """ if not update_indices: return update_indices for s in source_indices: update_indices = [u + update if u > s else u for u in update_indices] return list(np.unique(update_indices)) + def correct_artifacts(artifacts, peaks): - '''correct_artifacts: correct artifacts according to its type. + """correct_artifacts: correct artifacts according to its type. Parameters ---------- artifacts: dictionary Struct containing indices of detected artifacts. peaks: array - Vector containing indices of detected peaks (R waves locations) + Vector containing indices of detected peaks (R waves locations) Returns ------- peaks: array Vector containing indices of corrected R peaks. - ''' - extra_indices = artifacts['extra'] - missed_indices = artifacts['missed'] - ectopic_indices = artifacts['ectopic'] - longshort_indices = artifacts['longshort'] + """ + extra_indices = artifacts["extra"] + missed_indices = artifacts["missed"] + ectopic_indices = artifacts["ectopic"] + longshort_indices = artifacts["longshort"] if extra_indices: peaks = correct_extra(extra_indices, peaks) @@ -2102,8 +2272,9 @@ def correct_artifacts(artifacts, peaks): return peaks + def plot_artifacts(artifacts, subspaces): - '''plot_artifacts: plot artifacts according to its type. + """plot_artifacts: plot artifacts according to its type. Parameters ---------- @@ -2115,67 +2286,87 @@ def plot_artifacts(artifacts, subspaces): Returns ------- None - ''' - ectopic_indices = artifacts['ectopic'] - missed_indices = artifacts['missed'] - extra_indices = artifacts['extra'] - longshort_indices = artifacts['longshort'] - - rr = subspaces['rr'] - drrs = subspaces['drrs'] - mrrs = subspaces['mrrs'] - s12 = subspaces['s12'] - s22 = subspaces['s22'] - c1 = subspaces['c1'] - c2 = subspaces['c2'] + """ + ectopic_indices = artifacts["ectopic"] + missed_indices = artifacts["missed"] + extra_indices = artifacts["extra"] + longshort_indices = artifacts["longshort"] + + rr = subspaces["rr"] + drrs = subspaces["drrs"] + mrrs = subspaces["mrrs"] + s12 = subspaces["s12"] + s22 = subspaces["s22"] + c1 = subspaces["c1"] + c2 = subspaces["c2"] fig, axs = plt.subplots(3, 2, figsize=(10, 15)) # Artifact types - axs[0, 0].plot(rr, 'k') - axs[0, 0].scatter(longshort_indices, rr[longshort_indices], 15, 'c') - axs[0, 0].scatter(ectopic_indices, rr[ectopic_indices], 15, 'r') - axs[0, 0].scatter(extra_indices, rr[extra_indices], 15, 'm') - axs[0, 0].scatter(missed_indices, rr[missed_indices], 15, 'g') - axs[0, 0].legend(['', 'Long/Short', 'Ectopic', 'False positive', 'False negative'], loc='upper right') - axs[0, 0].set_title('Artifact types') + axs[0, 0].plot(rr, "k") + axs[0, 0].scatter(longshort_indices, rr[longshort_indices], 15, "c") + axs[0, 0].scatter(ectopic_indices, rr[ectopic_indices], 15, "r") + axs[0, 0].scatter(extra_indices, rr[extra_indices], 15, "m") + axs[0, 0].scatter(missed_indices, rr[missed_indices], 15, "g") + axs[0, 0].legend( + ["", "Long/Short", "Ectopic", "False positive", "False negative"], + loc="upper right", + ) + axs[0, 0].set_title("Artifact types") # Th1 axs[1, 0].plot(abs(drrs)) - axs[1, 0].axhline(y=1, color='r', linestyle='--') - axs[1, 0].set_title('Consecutive-difference criterion') + axs[1, 0].axhline(y=1, color="r", linestyle="--") + axs[1, 0].set_title("Consecutive-difference criterion") # Th2 axs[1, 1].plot(abs(mrrs)) - axs[1, 1].axhline(y=3, color='r', linestyle='--') - axs[1, 1].set_title('Difference-from-median criterion') + axs[1, 1].axhline(y=3, color="r", linestyle="--") + axs[1, 1].set_title("Difference-from-median criterion") # Subspace S12 - axs[2, 0].scatter(drrs, s12, 15, 'k') - axs[2, 0].scatter(drrs[longshort_indices], s12[longshort_indices], 15, 'c') - axs[2, 0].scatter(drrs[ectopic_indices], s12[ectopic_indices], 15, 'r') - axs[2, 0].scatter(drrs[extra_indices], s12[extra_indices], 15, 'm') - axs[2, 0].scatter(drrs[missed_indices], s12[missed_indices], 15, 'g') - axs[2, 0].add_patch(patches.Polygon([[-10, 5], [-10, -c1 * -10 + c2], [-1, -c1 * -1 + c2], [-1, 5]], alpha=0.05, color='k')) - axs[2, 0].add_patch(patches.Polygon([[1, -c1 * 1 - c2], [1, -5], [10, -5], [10, -c1 * 10 - c2]], alpha=0.05, color='k')) - axs[2, 0].set_title('Subspace S12') + axs[2, 0].scatter(drrs, s12, 15, "k") + axs[2, 0].scatter(drrs[longshort_indices], s12[longshort_indices], 15, "c") + axs[2, 0].scatter(drrs[ectopic_indices], s12[ectopic_indices], 15, "r") + axs[2, 0].scatter(drrs[extra_indices], s12[extra_indices], 15, "m") + axs[2, 0].scatter(drrs[missed_indices], s12[missed_indices], 15, "g") + axs[2, 0].add_patch( + patches.Polygon( + [[-10, 5], [-10, -c1 * -10 + c2], [-1, -c1 * -1 + c2], [-1, 5]], + alpha=0.05, + color="k", + ) + ) + axs[2, 0].add_patch( + patches.Polygon( + [[1, -c1 * 1 - c2], [1, -5], [10, -5], [10, -c1 * 10 - c2]], + alpha=0.05, + color="k", + ) + ) + axs[2, 0].set_title("Subspace S12") # Subspace S21 - axs[2, 1].scatter(drrs, s22, 15, 'k') - axs[2, 1].scatter(drrs[longshort_indices], s22[longshort_indices], 15, 'c') - axs[2, 1].scatter(drrs[ectopic_indices], s22[ectopic_indices], 15, 'r') - axs[2, 1].scatter(drrs[extra_indices], s22[extra_indices], 15, 'm') - axs[2, 1].scatter(drrs[missed_indices], s22[missed_indices], 15, 'g') - axs[2, 1].add_patch(patches.Polygon([[-10, 10], [-10, 1], [-1, 1], [-1, 10]], alpha=0.05, color='k')) - axs[2, 1].add_patch(patches.Polygon([[1, -1], [1, -10], [10, -10], [10, -1]], alpha=0.05, color='k')) - axs[2, 1].set_title('Subspace S21') + axs[2, 1].scatter(drrs, s22, 15, "k") + axs[2, 1].scatter(drrs[longshort_indices], s22[longshort_indices], 15, "c") + axs[2, 1].scatter(drrs[ectopic_indices], s22[ectopic_indices], 15, "r") + axs[2, 1].scatter(drrs[extra_indices], s22[extra_indices], 15, "m") + axs[2, 1].scatter(drrs[missed_indices], s22[missed_indices], 15, "g") + axs[2, 1].add_patch( + patches.Polygon([[-10, 10], [-10, 1], [-1, 1], [-1, 10]], alpha=0.05, color="k") + ) + axs[2, 1].add_patch( + patches.Polygon([[1, -1], [1, -10], [10, -10], [10, -1]], alpha=0.05, color="k") + ) + axs[2, 1].set_title("Subspace S21") plt.tight_layout() plt.show() + # função principal def fixpeaks(peaks, sampling_rate=1000, iterative=True, show=False): - '''FIXPEAKS: HRV time series artifact correction. + """FIXPEAKS: HRV time series artifact correction. Follows the approach by Lipponen et. al, 2019 [Lipp19]. Matlab implementation by Marek Sokol, 2022. @@ -2197,12 +2388,12 @@ def fixpeaks(peaks, sampling_rate=1000, iterative=True, show=False): Struct containing indices of detected artifacts. peaks_clean: array Vector of corrected peak values (indices) - + References ---------- - .. [Lipp19] Jukka A. Lipponen & Mika P. Tarvainen (2019): A robust algorithm for heart rate variability + .. [Lipp19] Jukka A. Lipponen & Mika P. Tarvainen (2019): A robust algorithm for heart rate variability time series artefact correction using novel beat classification, Journal of Medical Engineering & Technology - ''' + """ # check inputs if peaks is None: @@ -2228,14 +2419,14 @@ def fixpeaks(peaks, sampling_rate=1000, iterative=True, show=False): if show: plot_artifacts(artifacts, subspaces) - + return utils.ReturnTuple((artifacts, peaks_clean), ("artifacts", "peaks_clean")) def getQPositions(ecg_proc=None, show=False): """Different ECG Waves (Q, R, S, ...) are not present or are not so clear to identify in all ECG signals (I II III V1 V2 V3, ...) For Q wave we suggest to use signals I, aVL . Avoid II, III, V1, V2, V3, V4, aVR, aVF - + Parameters ---------- signal : object @@ -2252,7 +2443,9 @@ def getQPositions(ecg_proc=None, show=False): """ templates_ts = ecg_proc["templates_ts"] - template_r_position = np.argmin(np.abs(templates_ts - 0)) # R peak on the template is always on time instant 0 seconds + template_r_position = np.argmin( + np.abs(templates_ts - 0) + ) # R peak on the template is always on time instant 0 seconds Q_positions = [] Q_start_positions = [] Q_positions_template = [] @@ -2270,7 +2463,7 @@ def getQPositions(ecg_proc=None, show=False): Q_positions_template.append(mininums_from_template_left[0][-1]) except: pass - + # Get Q start position template_Q_left = each[0 : mininums_from_template_left[0][-1] + 1] maximum_from_template_Q_left = argrelextrema(template_Q_left, np.greater) @@ -2288,39 +2481,49 @@ def getQPositions(ecg_proc=None, show=False): plt.figure() plt.plot(ecg_proc["templates"].T) plt.axvline(x=template_r_position, color="r", label="R peak") - plt.axvline(x=Q_positions_template[0],color="yellow",label="Q positions") - for position in range(1,len(Q_positions_template)): + plt.axvline(x=Q_positions_template[0], color="yellow", label="Q positions") + for position in range(1, len(Q_positions_template)): plt.axvline( x=Q_positions_template[position], color="yellow", ) - plt.axvline(x=Q_start_positions_template[0],color="green",label="Q Start positions") - for position in range(1,len(Q_start_positions_template)): + plt.axvline( + x=Q_start_positions_template[0], color="green", label="Q Start positions" + ) + for position in range(1, len(Q_start_positions_template)): plt.axvline( x=Q_start_positions_template[position], color="green", ) plt.legend() plt.show() - + Q_positions = np.array(Q_positions) Q_start_positions = np.array(Q_start_positions) - - return utils.ReturnTuple((Q_positions, Q_start_positions,), ("Q_positions","Q_start_positions",)) + return utils.ReturnTuple( + ( + Q_positions, + Q_start_positions, + ), + ( + "Q_positions", + "Q_start_positions", + ), + ) def getSPositions(ecg_proc=None, show=False): """Different ECG Waves (Q, R, S, ...) are not present or are not so clear to identify in all ECG signals (I II III V1 V2 V3, ...) For S wave we suggest to use signals V1, V2, V3. Avoid I, V5, V6, aVR, aVL - + Parameters ---------- signal : object object return by the function ecg. show : bool, optional If True, show a plot of the S Positions on every signal sample/template. - + Returns ------- S_positions : array @@ -2330,7 +2533,9 @@ def getSPositions(ecg_proc=None, show=False): """ templates_ts = ecg_proc["templates_ts"] - template_r_position = np.argmin(np.abs(templates_ts - 0)) # R peak on the template is always on time instant 0 seconds + template_r_position = np.argmin( + np.abs(templates_ts - 0) + ) # R peak on the template is always on time instant 0 seconds S_positions = [] S_end_positions = [] S_positions_template = [] @@ -2341,11 +2546,13 @@ def getSPositions(ecg_proc=None, show=False): # Get S Position template_right = each[template_r_position : template_size + 1] mininums_from_template_right = argrelextrema(template_right, np.less) - + try: S_position = ecg_proc["rpeaks"][n] + mininums_from_template_right[0][0] S_positions.append(S_position) - S_positions_template.append(template_r_position + mininums_from_template_right[0][0]) + S_positions_template.append( + template_r_position + mininums_from_template_right[0][0] + ) except: pass # Get S end position @@ -2353,49 +2560,61 @@ def getSPositions(ecg_proc=None, show=False): try: S_end_position = ecg_proc["rpeaks"][n] + maximums_from_template_right[0][0] S_end_positions.append(S_end_position) - S_end_positions_template.append(template_r_position + maximums_from_template_right[0][0]) + S_end_positions_template.append( + template_r_position + maximums_from_template_right[0][0] + ) except: - pass + pass if show: plt.figure() plt.plot(ecg_proc["templates"].T) plt.axvline(x=template_r_position, color="r", label="R peak") - plt.axvline(x=S_positions_template[0],color="yellow",label="S positions") - for position in range(1,len(S_positions_template)): + plt.axvline(x=S_positions_template[0], color="yellow", label="S positions") + for position in range(1, len(S_positions_template)): plt.axvline( x=S_positions_template[position], color="yellow", ) - - plt.axvline(x=S_end_positions_template[0],color="green",label="S end positions") - for position in range(1,len(S_end_positions_template)): + + plt.axvline( + x=S_end_positions_template[0], color="green", label="S end positions" + ) + for position in range(1, len(S_end_positions_template)): plt.axvline( x=S_end_positions_template[position], color="green", ) - + plt.legend() plt.show() - + S_positions = np.array(S_positions) S_end_positions = np.array(S_end_positions) - return utils.ReturnTuple((S_positions, S_end_positions,), ("S_positions","S_end_positions",)) - + return utils.ReturnTuple( + ( + S_positions, + S_end_positions, + ), + ( + "S_positions", + "S_end_positions", + ), + ) def getPPositions(ecg_proc=None, show=False): """Different ECG Waves (Q, R, S, ...) are not present or are not so clear to identify in all ECG signals (I II III V1 V2 V3, ...) For P wave we suggest to use signals II, V1, aVF . Avoid I, III, V1, V2, V3, V4, V5, AVL - + Parameters ---------- signal : object object return by the function ecg. show : bool, optional If True, show a plot of the P Positions on every signal sample/template. - + Returns ------- P_positions : array @@ -2405,21 +2624,21 @@ def getPPositions(ecg_proc=None, show=False): P_end_ positions : array Array with all P end positions on the signal """ - + templates_ts = ecg_proc["templates_ts"] # R peak on the template is always on time instant 0 seconds - template_r_position = np.argmin(np.abs(templates_ts - 0)) + template_r_position = np.argmin(np.abs(templates_ts - 0)) # the P wave end is approximately 0.04 seconds before the R peak - template_p_position_max = np.argmin(np.abs(templates_ts - (-0.04))) - + template_p_position_max = np.argmin(np.abs(templates_ts - (-0.04))) + P_positions = [] P_start_positions = [] P_end_positions = [] - + P_positions_template = [] P_start_positions_template = [] P_end_positions_template = [] - + for n, each in enumerate(ecg_proc["templates"]): # Get P position template_left = each[0 : template_p_position_max + 1] @@ -2430,7 +2649,7 @@ def getPPositions(ecg_proc=None, show=False): ) P_positions.append(P_position) P_positions_template.append(max_from_template_left) - + # Get P start position template_P_left = each[0 : max_from_template_left + 1] mininums_from_template_left = argrelextrema(template_P_left, np.less) @@ -2445,11 +2664,11 @@ def getPPositions(ecg_proc=None, show=False): P_start_positions_template.append(mininums_from_template_left[0][-1]) except: pass - + # Get P end position template_P_right = each[max_from_template_left : template_p_position_max + 1] mininums_from_template_right = argrelextrema(template_P_right, np.less) - + try: P_end_position = ( ecg_proc["rpeaks"][n] @@ -2457,9 +2676,11 @@ def getPPositions(ecg_proc=None, show=False): + max_from_template_left + mininums_from_template_right[0][0] ) - + P_end_positions.append(P_end_position) - P_end_positions_template.append(max_from_template_left + mininums_from_template_right[0][0]) + P_end_positions_template.append( + max_from_template_left + mininums_from_template_right[0][0] + ) except: pass @@ -2467,46 +2688,57 @@ def getPPositions(ecg_proc=None, show=False): plt.figure() plt.plot(ecg_proc["templates"].T) plt.axvline(x=template_r_position, color="r", label="R peak") - plt.axvline(x=P_positions_template[0],color="yellow",label="P positions") - for position in range(1,len(P_positions_template)): + plt.axvline(x=P_positions_template[0], color="yellow", label="P positions") + for position in range(1, len(P_positions_template)): plt.axvline( x=P_positions_template[position], color="yellow", ) - plt.axvline(x=P_start_positions_template[0],color="green",label="P starts") - for position in range(1,len(P_start_positions_template)): + plt.axvline(x=P_start_positions_template[0], color="green", label="P starts") + for position in range(1, len(P_start_positions_template)): plt.axvline( x=P_start_positions_template[position], color="green", ) - plt.axvline(x=P_end_positions_template[0],color="green",label="P ends") - for position in range(1,len(P_end_positions_template)): + plt.axvline(x=P_end_positions_template[0], color="green", label="P ends") + for position in range(1, len(P_end_positions_template)): plt.axvline( x=P_end_positions_template[position], color="green", ) - + plt.legend() plt.show() - + P_positions = np.array(P_positions) P_start_positions = np.array(P_start_positions) P_end_positions = np.array(P_end_positions) - - return utils.ReturnTuple((P_positions, P_start_positions, P_end_positions,), ("P_positions","P_start_positions","P_end_positions",)) + + return utils.ReturnTuple( + ( + P_positions, + P_start_positions, + P_end_positions, + ), + ( + "P_positions", + "P_start_positions", + "P_end_positions", + ), + ) def getTPositions(ecg_proc=None, show=False): """Different ECG Waves (Q, R, S, ...) are not present or are not so clear to identify in all ECG signals (I II III V1 V2 V3, ...) For T wave we suggest to use signals V4, v5 (II, V3 have good results, but in less accuracy) . Avoid I, V1, V2, aVR, aVL - + Parameters ---------- signal : object object return by the function ecg. show : bool, optional If True, show a plot of the T Positions on every signal sample/template. - + Returns ------- T_positions : array @@ -2516,22 +2748,22 @@ def getTPositions(ecg_proc=None, show=False): T_end_ positions : array Array with all T end positions on the signal """ - + templates_ts = ecg_proc["templates_ts"] - + # R peak on the template is always on time instant 0 seconds - template_r_position = np.argmin(np.abs(templates_ts - 0)) + template_r_position = np.argmin(np.abs(templates_ts - 0)) # the T wave start is approximately 0.14 seconds after R-peak - template_T_position_min = np.argmin(np.abs(templates_ts - 0.14)) - + template_T_position_min = np.argmin(np.abs(templates_ts - 0.14)) + T_positions = [] T_start_positions = [] T_end_positions = [] - + T_positions_template = [] T_start_positions_template = [] T_end_positions_template = [] - + for n, each in enumerate(ecg_proc["templates"]): # Get T position template_right = each[template_T_position_min:] @@ -2543,7 +2775,7 @@ def getTPositions(ecg_proc=None, show=False): + template_T_position_min + max_from_template_right ) - + T_positions.append(T_position) T_positions_template.append(template_T_position_min + max_from_template_right) @@ -2552,20 +2784,22 @@ def getTPositions(ecg_proc=None, show=False): template_r_position : template_T_position_min + max_from_template_right ] min_from_template_T_left = argrelextrema(template_T_left, np.less) - + try: T_start_position = ecg_proc["rpeaks"][n] + min_from_template_T_left[0][-1] - + T_start_positions.append(T_start_position) - T_start_positions_template.append(template_r_position + min_from_template_T_left[0][-1]) + T_start_positions_template.append( + template_r_position + min_from_template_T_left[0][-1] + ) except: pass - + # Get T end position template_T_right = each[template_T_position_min + max_from_template_right :] mininums_from_template_T_right = argrelextrema(template_T_right, np.less) - + try: T_end_position = ( ecg_proc["rpeaks"][n] @@ -2574,43 +2808,57 @@ def getTPositions(ecg_proc=None, show=False): + max_from_template_right + mininums_from_template_T_right[0][0] ) - + T_end_positions.append(T_end_position) - T_end_positions_template.append(template_T_position_min+ max_from_template_right+ mininums_from_template_T_right[0][0]) + T_end_positions_template.append( + template_T_position_min + + max_from_template_right + + mininums_from_template_T_right[0][0] + ) except: pass - - if show: + + if show: plt.figure() plt.plot(ecg_proc["templates"].T) plt.axvline(x=template_r_position, color="r", label="R peak") - plt.axvline(x=T_positions_template[0],color="yellow",label="T positions") - for position in range(1,len(T_positions_template)): + plt.axvline(x=T_positions_template[0], color="yellow", label="T positions") + for position in range(1, len(T_positions_template)): plt.axvline( x=T_positions_template[position], color="yellow", ) - plt.axvline(x=T_start_positions_template[0],color="green",label="T starts") - for position in range(1,len(T_start_positions_template)): + plt.axvline(x=T_start_positions_template[0], color="green", label="T starts") + for position in range(1, len(T_start_positions_template)): plt.axvline( x=T_start_positions_template[position], color="green", ) - plt.axvline(x=T_end_positions_template[0],color="green",label="T ends") - for position in range(1,len(T_end_positions_template)): + plt.axvline(x=T_end_positions_template[0], color="green", label="T ends") + for position in range(1, len(T_end_positions_template)): plt.axvline( x=T_end_positions_template[position], color="green", ) plt.legend() plt.show() - + T_positions = np.array(T_positions) T_start_positions = np.array(T_start_positions) T_end_positions = np.array(T_end_positions) - - return utils.ReturnTuple((T_positions, T_start_positions, T_end_positions,), ("T_positions","T_start_positions","T_end_positions",)) + return utils.ReturnTuple( + ( + T_positions, + T_start_positions, + T_end_positions, + ), + ( + "T_positions", + "T_start_positions", + "T_end_positions", + ), + ) def bSQI(detector_1, detector_2, fs=1000.0, mode="simple", search_window=150): diff --git a/example.py b/example.py index 155a3ac3..803f5d0c 100644 --- a/example.py +++ b/example.py @@ -7,19 +7,26 @@ from biosppy.signals import ecg from biosppy.signals.acc import acc -warnings.simplefilter(action='ignore', category=FutureWarning) + +warnings.simplefilter(action="ignore", category=FutureWarning) # load raw ECG and ACC signals -ecg_signal, _ = storage.load_txt('./examples/ecg.txt') -acc_signal, _ = storage.load_txt('./examples/acc.txt') +ecg_signal, _ = storage.load_txt("./examples/ecg.txt") +acc_signal, _ = storage.load_txt("./examples/acc.txt") # Setting current path current_dir = os.path.dirname(sys.argv[0]) -ecg_plot_path = os.path.join(current_dir, 'ecg.png') -acc_plot_path = os.path.join(current_dir, 'acc.png') +ecg_plot_path = os.path.join(current_dir, "ecg.png") +acc_plot_path = os.path.join(current_dir, "acc.png") # Process it and plot. Set interactive=True to display an interactive window -out_ecg = ecg.ecg(signal=ecg_signal, sampling_rate=1000., path=ecg_plot_path) -out_acc = acc(signal=acc_signal, sampling_rate=1000., path=acc_plot_path) - +out_ecg = ecg.ecg( + signal=ecg_signal, + sampling_rate=1000.0, + path=ecg_plot_path, + segmenter="pan-tompkins", + interactive=False, + Pth=5, +) +out_acc = acc(signal=acc_signal, sampling_rate=1000.0, path=acc_plot_path)