Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 163 additions & 0 deletions biosppy/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import json
import os
import zipfile
import struct

# 3rd party
import h5py
Expand Down Expand Up @@ -438,6 +439,168 @@ def load_txt(path):
return data, mdata


def load_edf(path):
"""Load data from an EDF+ (European Data Format) file.

Parameters
----------
path : str
Path to the EDF file.

Returns
-------
signals : array
Array of signals read from the EDF file. Each column represents a signal.
mdata : dict
Metadata extracted from the EDF file, including:
- version : str
- patient_id : str
- recording_id : str
- start_date : str
- start_time : str
- header_bytes : str
- reserved : str
- num_data_records : int
- duration_per_data_record : float
- num_signals : int
- labels : list of str
- units : list of str
- sampling_rates : list of int
- physical_min : list of float
- physical_max : list of float
- digital_min : list of int
- digital_max : list of int
- annotations : list of tuples (onset, duration, annotation)

Notes
-----
This function reads the EDF file header and data records, scales the signals
into physical units, and parses the annotations according to the EDF+ specification.
"""

def parse_annotations(data):
annotations = []
i = 0
while i < len(data):
if data[i] == 0:
break
onset = ''
duration = ''
while data[i] != 20:
onset += chr(data[i])
i += 1
i += 1
if data[i] == 21:
i += 1
while data[i] != 20:
duration += chr(data[i])
i += 1
i += 1
annotation = ''
while data[i] != 0:
if data[i] == 20:
# convert to string in HH:MM:SS format
onset = float(onset) # seconds
onset = str(datetime.timedelta(seconds=onset))

duration = float(duration) if duration else 0
duration = str(datetime.timedelta(seconds=duration))

# remove leading and trailing white space
annotation = annotation.strip()
if annotation != '':
annotations.append((onset, duration, annotation))
annotation = ''
i += 1
else:
annotation += chr(data[i])
i += 1
i += 1
return annotations

with open(path, 'rb') as f:
# Read the header
header = f.read(256)

# Extract fixed fields
version = header[:8].decode('ascii').strip()
patient_id = header[8:88].decode('ascii').strip()
recording_id = header[88:168].decode('ascii').strip()
start_date = header[168:176].decode('ascii').strip()
start_time = header[176:184].decode('ascii').strip()
header_bytes = header[184:192].decode('ascii').strip()
reserved = header[192:236].decode('ascii').strip()
num_data_records = int(header[236:244].decode('ascii').strip())
duration_per_data_record = float(header[244:252].decode('ascii').strip())
num_signals = int(header[252:256].decode('ascii').strip())

# Read signal metadata
labels = [f.read(16).decode('ascii').strip() for _ in range(num_signals)]
transducer_types = [f.read(80).decode('ascii').strip() for _ in range(num_signals)]
units = [f.read(8).decode('ascii').strip() for _ in range(num_signals)]
physical_min = [float(f.read(8).decode('ascii').strip()) for _ in range(num_signals)]
physical_max = [float(f.read(8).decode('ascii').strip()) for _ in range(num_signals)]
digital_min = [int(f.read(8).decode('ascii').strip()) for _ in range(num_signals)]
digital_max = [int(f.read(8).decode('ascii').strip()) for _ in range(num_signals)]
prefiltering = [f.read(80).decode('ascii').strip() for _ in range(num_signals)]
num_samples_per_data_record = [int(f.read(8).decode('ascii').strip()) for _ in range(num_signals)]
reserved_space = f.read(32 * num_signals).decode('ascii').strip()
sampling_rates = [int(num_samples / duration_per_data_record) for num_samples in num_samples_per_data_record]

# Read data records
signals = [[] for _ in range(num_signals)]
annotations = []
for _ in range(num_data_records):
for i in range(num_signals):
num_samples = num_samples_per_data_record[i]
if labels[i] == 'EDF Annotations':
annotation_data = f.read(num_samples * 2)
annotations.extend(parse_annotations(annotation_data))
else:
for _ in range(num_samples):
signals[i].append(struct.unpack('<h', f.read(2))[0])

# Scale the signals into physical units
for i in range(num_signals-1):
signals[i] = np.array(signals[i])
signals[i] = (signals[i] - digital_min[i]) / (digital_max[i] - digital_min[i]) * (physical_max[i] - physical_min[i]) + physical_min[i]

# remove annotation from signals
if 'EDF Annotations' in labels:
num_signals -= 1
signals = signals[:-1]
labels = labels[:-1]
units = units[:-1]
sampling_rates = sampling_rates[:-1]
physical_min = physical_min[:-1]
physical_max = physical_max[:-1]
digital_min = digital_min[:-1]
digital_max = digital_max[:-1]

mdata = {
'version': version,
'patient_id': patient_id,
'recording_id': recording_id,
'start_date': start_date,
'start_time': start_time,
'header_bytes': header_bytes,
'reserved': reserved,
'num_data_records': num_data_records,
'duration_per_data_record': duration_per_data_record,
'num_signals': num_signals,
'labels': labels,
'units': units,
'sampling_rates': sampling_rates,
'physical_min': physical_min,
'physical_max': physical_max,
'digital_min': digital_min,
'digital_max': digital_max,
'annotations': annotations
}

return np.array(signals).T, mdata


class HDF(object):
"""Wrapper class to operate on BioSPPy HDF5 files.

Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ scipy>=1.2.0
shortuuid>=0.5.0
six>=1.11.0
joblib>=0.11
pywavelets>=1.4.1
pywavelets>=1.4.1