Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

openephys legacy format : handle gaps more correctly #1387

Merged
merged 9 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 117 additions & 63 deletions neo/rawio/openephysrawio.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,25 +53,31 @@ class OpenEphysRawIO(BaseRawIO):
Limitation :
* Works only if all continuous channels have the same sampling rate, which is a reasonable
hypothesis.
* When the recording is stopped and restarted all continuous files will contain gaps.
Ideally this would lead to a new Segment but this use case is not implemented due to its
complexity.
Instead it will raise an error.

Special cases:
* Normally all continuous files have the same first timestamp and length. In situations
where it is not the case all files are clipped to the smallest one so that they are all
aligned,
and a warning is emitted.
* A recording can contain gaps due to USB stream loss when high CPU load when recording.
Theses gaps are checked channel per channel which make the parse_header() slow.
If gaps are detected then they are filled with zeros but the the reading will be much slower for getting signals.

Parameters
----------
dirname: str
The directory where the files are stored.
ignore_timestamps_errors: bool
(deprecated) This parameter is not used anymore.
fill_gap_value: int
When gaps are detected in continuous files, the gap is filled with this value.
Default is 0.

"""
# file formats used by openephys
extensions = ['continuous', 'openephys', 'spikes', 'events', 'xml']
rawmode = 'one-dir'

def __init__(self, dirname='', ignore_timestamps_errors=False):
def __init__(self, dirname='', ignore_timestamps_errors=None, fill_gap_value=0):
BaseRawIO.__init__(self)
self.dirname = dirname
self._ignore_timestamps_errors = ignore_timestamps_errors
self.fill_gap_value = int(fill_gap_value)
if ignore_timestamps_errors is not None:
self.logger.warning("OpenEphysRawIO ignore_timestamps_errors=True/False is not used anymore")

def _source_name(self):
return self.dirname
Expand All @@ -84,12 +90,14 @@ def _parse_header(self):
self._sigs_memmap = {}
self._sig_length = {}
self._sig_timestamp0 = {}
self._sig_has_gap = {}
self._gap_mode = False
signal_channels = []
oe_indices = sorted(list(info['continuous'].keys()))
for seg_index, oe_index in enumerate(oe_indices):
self._sigs_memmap[seg_index] = {}
self._sig_has_gap[seg_index] = {}

all_sigs_length = []
all_first_timestamps = []
all_last_timestamps = []
all_samplerate = []
Expand All @@ -109,18 +117,18 @@ def _parse_header(self):
dtype=continuous_dtype, shape=(size, ))
self._sigs_memmap[seg_index][chan_index] = data_chan

all_sigs_length.append(data_chan.size * RECORD_SIZE)
all_first_timestamps.append(data_chan[0]['timestamp'])
all_last_timestamps.append(data_chan[-1]['timestamp'])
all_last_timestamps.append(data_chan[-1]['timestamp'] + RECORD_SIZE)
all_samplerate.append(chan_info['sampleRate'])

# check for continuity (no gaps)
diff = np.diff(data_chan['timestamp'])
if not np.all(diff == RECORD_SIZE) and not self._ignore_timestamps_errors:
raise ValueError(
'Not continuous timestamps for {}. ' \
'Maybe because recording was paused/stopped.'.format(continuous_filename)
)
channel_has_gaps = not np.all(diff == RECORD_SIZE)
self._sig_has_gap[seg_index][chan_index] = channel_has_gaps

if channel_has_gaps:
# protect against strange timestamp block like in file 'OpenEphys_SampleData_3' CH32
assert np.median(diff) == RECORD_SIZE, f"This file has a non valid data block size for channel {chan_id}, this case cannot be handled"

if seg_index == 0:
# add in channel list
Expand All @@ -130,46 +138,39 @@ def _parse_header(self):
units = 'V'
signal_channels.append((ch_name, chan_id, chan_info['sampleRate'],
'int16', units, chan_info['bitVolts'], 0., processor_id))

# In some cases, continuous do not have the same length because
# one record block is missing when the "OE GUI is freezing"
# So we need to clip to the smallest files
if not all(all_sigs_length[0] == e for e in all_sigs_length) or\
not all(all_first_timestamps[0] == e for e in all_first_timestamps):


if any(self._sig_has_gap[seg_index].values()):
channel_with_gaps = list(self._sig_has_gap[seg_index].keys())
self.logger.warning(f"This OpenEphys dataset contains gaps for some channels {channel_with_gaps} in segment {seg_index} the read will be slow")
self._gap_mode = True


if not all(all_first_timestamps[0] == e for e in all_first_timestamps) or \
not all(all_last_timestamps[0] == e for e in all_last_timestamps):
# In some cases, continuous do not have the same length because
# we need to clip
self.logger.warning('Continuous files do not have aligned timestamps; '
'clipping to make them aligned.')

first, last = -np.inf, np.inf
first = max(all_first_timestamps)
last = min(all_last_timestamps)
for chan_index in self._sigs_memmap[seg_index]:
data_chan = self._sigs_memmap[seg_index][chan_index]
if data_chan[0]['timestamp'] > first:
first = data_chan[0]['timestamp']
if data_chan[-1]['timestamp'] < last:
last = data_chan[-1]['timestamp']

all_sigs_length = []
all_first_timestamps = []
all_last_timestamps = []
for chan_index in self._sigs_memmap[seg_index]:
data_chan = self._sigs_memmap[seg_index][chan_index]
keep = (data_chan['timestamp'] >= first) & (data_chan['timestamp'] <= last)
keep = (data_chan['timestamp'] >= first) & (data_chan['timestamp'] < last)
data_chan = data_chan[keep]
self._sigs_memmap[seg_index][chan_index] = data_chan
all_sigs_length.append(data_chan.size * RECORD_SIZE)
all_first_timestamps.append(data_chan[0]['timestamp'])
all_last_timestamps.append(data_chan[-1]['timestamp'])

# check that all signals have the same length and timestamp0 for this segment
assert all(all_sigs_length[0] == e for e in all_sigs_length),\
'Not all signals have the same length'
assert all(all_first_timestamps[0] == e for e in all_first_timestamps),\
'Not all signals have the same first timestamp'
else:
# no clip
first = all_first_timestamps[0]
last = all_last_timestamps[0]


# check unique sampling rate
assert all(all_samplerate[0] == e for e in all_samplerate),\
'Not all signals have the same sample rate'

self._sig_length[seg_index] = all_sigs_length[0]
self._sig_timestamp0[seg_index] = all_first_timestamps[0]
self._sig_length[seg_index] = last - first
self._sig_timestamp0[seg_index] = first

if len(signal_channels) > 0:
signal_channels = np.array(signal_channels, dtype=_signal_channel_dtype)
Expand Down Expand Up @@ -316,23 +317,73 @@ def _get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop,
if i_stop is None:
i_stop = self._sig_length[seg_index]

block_start = i_start // RECORD_SIZE
block_stop = i_stop // RECORD_SIZE + 1
sl0 = i_start % RECORD_SIZE
sl1 = sl0 + (i_stop - i_start)

stream_id = self.header['signal_streams'][stream_index]['id']
mask = self.header['signal_channels']['stream_id']
global_channel_indexes, = np.nonzero(mask == stream_id)
if channel_indexes is None:
channel_indexes = slice(None)
global_channel_indexes = global_channel_indexes[channel_indexes]

sigs_chunk = np.zeros((i_stop - i_start, len(global_channel_indexes)), dtype='int16')
for i, global_chan_index in enumerate(global_channel_indexes):
data = self._sigs_memmap[seg_index][global_chan_index]
sub = data[block_start:block_stop]
sigs_chunk[:, i] = sub['samples'].flatten()[sl0:sl1]
if not self._gap_mode:
sigs_chunk = np.zeros((i_stop - i_start, len(global_channel_indexes)), dtype='int16')
# previous behavior block index are linear
block_start = i_start // RECORD_SIZE
block_stop = i_stop // RECORD_SIZE + 1
sl0 = i_start % RECORD_SIZE
sl1 = sl0 + (i_stop - i_start)

for i, global_chan_index in enumerate(global_channel_indexes):
data = self._sigs_memmap[seg_index][global_chan_index]
sub = data[block_start:block_stop]
sigs_chunk[:, i] = sub['samples'].flatten()[sl0:sl1]
else:
sigs_chunk = np.full(shape=(i_stop - i_start, len(global_channel_indexes)),
fill_value=self.fill_gap_value,
dtype='int16')
# slow mode
for i, global_chan_index in enumerate(global_channel_indexes):
data = self._sigs_memmap[seg_index][global_chan_index]
timestamp0 = data[0]['timestamp']

# find first block
block0 = np.searchsorted(data['timestamp'], timestamp0 + i_start, side='right') - 1
block0_pos = data[block0]['timestamp'] - timestamp0

if i_start - block0_pos > RECORD_SIZE:
# the block has gap!!
pos = - ((i_start - block0_pos) % RECORD_SIZE)
block_index = block0 + 1
else:
# the first block do not have gaps
shift0 = i_start - block0_pos

if shift0 + (i_stop - i_start) < RECORD_SIZE:
# protect when only one small block
pos = (i_stop - i_start)
sigs_chunk[:, i][:pos] = data[block0]['samples'][shift0:shift0 + pos]
else:

pos = RECORD_SIZE - shift0
sigs_chunk[:, i][:pos] = data[block0]['samples'][shift0:]
block_index = block0 + 1

# full block
while block_index < data.size and data[block_index]['timestamp'] - timestamp0 < i_stop - RECORD_SIZE:
diff = data[block_index]['timestamp'] - data[block_index - 1]['timestamp']
if diff > RECORD_SIZE:
# gap detected need jump
pos += diff - RECORD_SIZE

sigs_chunk[:, i][pos:pos + RECORD_SIZE] = data[block_index]['samples'][:]
pos += RECORD_SIZE
block_index += 1

# last block
if pos < i_stop - i_start:
diff = data[block_index]['timestamp'] - data[block_index - 1]['timestamp']
if diff == RECORD_SIZE:
# ensure no gaps for last block
sigs_chunk[:, i][pos:] = data[block_index]['samples'][:i_stop - i_start - pos]

return sigs_chunk

Expand Down Expand Up @@ -524,9 +575,12 @@ def explore_folder(dirname):
chan_ids_by_type[chan_type] = [chan_id]
filenames_by_type[chan_type] = [continuous_filename]
chan_types = list(chan_ids_by_type.keys())
if chan_types[0] == 'ADC':
# put ADC at last position
chan_types = chan_types[1:] + chan_types[0:1]

if 'CH' in chan_types:
# force CH at beginning
chan_types.remove('CH')
chan_types = ['CH'] + chan_types

ordered_continuous_filenames = []
for chan_type in chan_types:
local_order = np.argsort(chan_ids_by_type[chan_type])
Expand Down
1 change: 1 addition & 0 deletions neo/test/rawiotest/rawio_compliance.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ def read_analogsignals(reader):
chunks.append(raw_chunk)
i_start += chunksize
chunk_raw_sigs = np.concatenate(chunks, axis=0)

np.testing.assert_array_equal(ref_raw_sigs, chunk_raw_sigs)


Expand Down
18 changes: 3 additions & 15 deletions neo/test/rawiotest/test_openephysrawio.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,11 @@ class TestOpenEphysRawIO(BaseTestRawIO, unittest.TestCase, ):
]
entities_to_test = [
'openephys/OpenEphys_SampleData_1',
# 'OpenEphys_SampleData_2_(multiple_starts)', # This not implemented this raise error
# 'OpenEphys_SampleData_3',
# this file has gaps and this is now handle corretly
'openephys/OpenEphys_SampleData_2_(multiple_starts)',
# 'openephys/OpenEphys_SampleData_3',
]

def test_raise_error_if_discontinuous_files(self):
# the case of discontinuous signals is NOT cover by the IO for the moment
# It must raise an error
reader = OpenEphysRawIO(dirname=self.get_local_path(
'openephys/OpenEphys_SampleData_2_(multiple_starts)'))
with self.assertRaises(ValueError):
reader.parse_header()
# if ignore_timestamps_errors=True, no exception is raised
reader = OpenEphysRawIO(dirname=self.get_local_path(
'openephys/OpenEphys_SampleData_2_(multiple_starts)'),
ignore_timestamps_errors=True)
reader.parse_header()


def test_raise_error_if_strange_timestamps(self):
# In this dataset CH32 have strange timestamps
Expand Down
Loading