diff --git a/biosppy/storage.py b/biosppy/storage.py index 2e6393b5..96fd6c0c 100644 --- a/biosppy/storage.py +++ b/biosppy/storage.py @@ -20,6 +20,7 @@ import json import os import zipfile +import struct # 3rd party import h5py @@ -438,6 +439,168 @@ def load_txt(path): return data, mdata +def load_edf(path): + """Load data from an EDF+ (European Data Format) file. + + Parameters + ---------- + path : str + Path to the EDF file. + + Returns + ------- + signals : array + Array of signals read from the EDF file. Each column represents a signal. + mdata : dict + Metadata extracted from the EDF file, including: + - version : str + - patient_id : str + - recording_id : str + - start_date : str + - start_time : str + - header_bytes : str + - reserved : str + - num_data_records : int + - duration_per_data_record : float + - num_signals : int + - labels : list of str + - units : list of str + - sampling_rates : list of int + - physical_min : list of float + - physical_max : list of float + - digital_min : list of int + - digital_max : list of int + - annotations : list of tuples (onset, duration, annotation) + + Notes + ----- + This function reads the EDF file header and data records, scales the signals + into physical units, and parses the annotations according to the EDF+ specification. + """ + + def parse_annotations(data): + annotations = [] + i = 0 + while i < len(data): + if data[i] == 0: + break + onset = '' + duration = '' + while data[i] != 20: + onset += chr(data[i]) + i += 1 + i += 1 + if data[i] == 21: + i += 1 + while data[i] != 20: + duration += chr(data[i]) + i += 1 + i += 1 + annotation = '' + while data[i] != 0: + if data[i] == 20: + # convert to string in HH:MM:SS format + onset = float(onset) # seconds + onset = str(datetime.timedelta(seconds=onset)) + + duration = float(duration) if duration else 0 + duration = str(datetime.timedelta(seconds=duration)) + + # remove leading and trailing white space + annotation = annotation.strip() + if annotation != '': + annotations.append((onset, duration, annotation)) + annotation = '' + i += 1 + else: + annotation += chr(data[i]) + i += 1 + i += 1 + return annotations + + with open(path, 'rb') as f: + # Read the header + header = f.read(256) + + # Extract fixed fields + version = header[:8].decode('ascii').strip() + patient_id = header[8:88].decode('ascii').strip() + recording_id = header[88:168].decode('ascii').strip() + start_date = header[168:176].decode('ascii').strip() + start_time = header[176:184].decode('ascii').strip() + header_bytes = header[184:192].decode('ascii').strip() + reserved = header[192:236].decode('ascii').strip() + num_data_records = int(header[236:244].decode('ascii').strip()) + duration_per_data_record = float(header[244:252].decode('ascii').strip()) + num_signals = int(header[252:256].decode('ascii').strip()) + + # Read signal metadata + labels = [f.read(16).decode('ascii').strip() for _ in range(num_signals)] + transducer_types = [f.read(80).decode('ascii').strip() for _ in range(num_signals)] + units = [f.read(8).decode('ascii').strip() for _ in range(num_signals)] + physical_min = [float(f.read(8).decode('ascii').strip()) for _ in range(num_signals)] + physical_max = [float(f.read(8).decode('ascii').strip()) for _ in range(num_signals)] + digital_min = [int(f.read(8).decode('ascii').strip()) for _ in range(num_signals)] + digital_max = [int(f.read(8).decode('ascii').strip()) for _ in range(num_signals)] + prefiltering = [f.read(80).decode('ascii').strip() for _ in range(num_signals)] + num_samples_per_data_record = [int(f.read(8).decode('ascii').strip()) for _ in range(num_signals)] + reserved_space = f.read(32 * num_signals).decode('ascii').strip() + sampling_rates = [int(num_samples / duration_per_data_record) for num_samples in num_samples_per_data_record] + + # Read data records + signals = [[] for _ in range(num_signals)] + annotations = [] + for _ in range(num_data_records): + for i in range(num_signals): + num_samples = num_samples_per_data_record[i] + if labels[i] == 'EDF Annotations': + annotation_data = f.read(num_samples * 2) + annotations.extend(parse_annotations(annotation_data)) + else: + for _ in range(num_samples): + signals[i].append(struct.unpack('=1.2.0 shortuuid>=0.5.0 six>=1.11.0 joblib>=0.11 -pywavelets>=1.4.1 +pywavelets>=1.4.1 \ No newline at end of file