diff --git a/neo/rawio/intanrawio.py b/neo/rawio/intanrawio.py index 85bde959f..81b23f8b4 100644 --- a/neo/rawio/intanrawio.py +++ b/neo/rawio/intanrawio.py @@ -8,7 +8,8 @@ RHS supported version 1.0 RHD supported version 1.0 1.1 1.2 1.3 2.0 3.0, 3.1 -RHD headerless binary support 3.1 +RHD headerless binary support 3.x +RHS headerless binary support 3.x See: * http://intantech.com/files/Intan_RHD2000_data_file_formats.pdf @@ -19,7 +20,7 @@ """ from pathlib import Path -from packaging.version import Version as V +from packaging.version import Version import warnings import numpy as np @@ -31,7 +32,6 @@ _signal_stream_dtype, _spike_channel_dtype, _event_channel_dtype, - _common_sig_characteristics, ) @@ -42,7 +42,7 @@ class IntanRawIO(BaseRawIO): Parameters ---------- filename: str, default: '' - name of the 'rhd' or 'rhs' data file + name of the 'rhd' or 'rhs' data/header file ignore_integrity_checks: bool, default: False If True, data that violates integrity assumptions will be loaded. At the moment the only integrity check we perform is that timestamps are continuous. Setting this to True will ignore this check and set @@ -50,10 +50,14 @@ class IntanRawIO(BaseRawIO): after parsing the header to see if the timestamps are continuous or not. Notes ----- - * Intan reader can handle two file formats 'rhd' and 'rhs'. It will automatically + * The Intan reader can handle two file formats 'rhd' and 'rhs'. It will automatically check for the file extension and will gather the header information based on the - extension. Additionally it functions with RHS v 1.0 and RHD 1.0, 1.1, 1.2, 1.3, 2.0, - 3.0, and 3.1 files. + extension. Additionally it functions with RHS v 1.0 and v 3.x and RHD 1.0, 1.1, 1.2, 1.3, 2.0, + 3.x files. + + * The Intan reader can also handle the headerless binary formats 'one-file-per-signal' and + 'one-file-per-channel' which have a header file called 'info.rhd' or 'info.rhs' and a series + of binary files with the '.dat' suffix * The reader can handle three file formats 'header-attached', 'one-file-per-signal' and 'one-file-per-channel'. @@ -68,8 +72,19 @@ class IntanRawIO(BaseRawIO): 4: 'USB board digital input channel', 5: 'USB board digital output channel' + And for RHS: + + 0: 'RHS2000 amplfier channel' + 3: 'USB board ADC input channel', + 4: 'USB board ADC output channel', + 5: 'USB board digital input channel', + 6: 'USB board digital output channel', + 10: 'DC Amplifier channel', + 11: 'Stim channel', + * For the "header-attached" and "one-file-per-signal" formats, the structure of the digital input and output channels is - one long vector, which must be post-processed to extract individual digital channel information. See the intantech website for more information on performing this post-processing. + one long vector, which must be post-processed to extract individual digital channel information. + See the intantech website for more information on performing this post-processing. Examples -------- @@ -104,6 +119,7 @@ def _parse_header(self): if not filename.exists() or not filename.is_file(): raise FileNotFoundError(f"{filename} does not exist") + # see comment below for RHD which explains the division between file types if self.filename.endswith(".rhs"): if filename.name == "info.rhs": if any((filename.parent / file).exists() for file in one_file_per_signal_filenames_rhs): @@ -117,7 +133,7 @@ def _parse_header(self): ( self._global_info, - self._ordered_channels, + self._ordered_channel_info, data_dtype, header_size, self._block_size, @@ -143,7 +159,7 @@ def _parse_header(self): ( self._global_info, - self._ordered_channels, + self._ordered_channel_info, data_dtype, header_size, self._block_size, @@ -206,7 +222,7 @@ def _parse_header(self): # signals signal_channels = [] - for c, chan_info in enumerate(self._ordered_channels): + for c, chan_info in enumerate(self._ordered_channel_info): name = chan_info["custom_channel_name"] channel_id = chan_info["native_channel_name"] sig_dtype = chan_info["dtype"] @@ -251,6 +267,7 @@ def _parse_header(self): # are in a list we just take the first channel in each list of channels else: self._max_sigs_length = max([raw_data[0].size for raw_data in self._raw_data.values()]) + # No events event_channels = [] event_channels = np.array(event_channels, dtype=_event_channel_dtype) @@ -270,6 +287,61 @@ def _parse_header(self): self._generate_minimal_annotations() + bl_annotations = self.raw_annotations["blocks"][0] + seg_annotations = bl_annotations["segments"][0] + + for signal_annotation in seg_annotations["signals"]: + # Add global annotations + signal_annotation["intan_version"] = ( + f"{self._global_info['major_version']}." f"{self._global_info['minor_version']}" + ) + global_keys_to_skip = [ + "major_version", + "minor_version", + "sampling_rate", + "magic_number", + "reference_channel", + ] + global_keys_to_annotate = set(self._global_info.keys()) - set(global_keys_to_skip) + for key in global_keys_to_annotate: + signal_annotation[key] = self._global_info[key] + + reference_channel = self._global_info.get("reference_channel", None) + # Following the pdf specification + reference_channel = "hardware" if reference_channel == "n/a" else reference_channel + + # Add channel annotations + array_annotations = signal_annotation["__array_annotations__"] + channel_ids = array_annotations["channel_ids"] + + # TODO refactor ordered channel dict to make this easier + # Use this to find which elements of the ordered channels correspond to the current signal + signal_type = int(signal_annotation["stream_id"]) + channel_info = next((info for info in self._ordered_channel_info if info["signal_type"] == signal_type)) + channel_keys_to_skip = [ + "signal_type", + "custom_channel_name", + "native_channel_name", + "gain", + "offset", + "channel_enabled", + "dtype", + "units", + "sampling_rate", + ] + + channel_keys_to_annotate = set(channel_info.keys()) - set(channel_keys_to_skip) + properties_dict = {key: [] for key in channel_keys_to_annotate} + for channel_id in channel_ids: + matching_info = next( + info for info in self._ordered_channel_info if info["native_channel_name"] == channel_id + ) + for key in channel_keys_to_annotate: + properties_dict[key].append(matching_info[key]) + + for key in channel_keys_to_annotate: + array_annotations[key] = properties_dict[key] + def _segment_t_start(self, block_index, seg_index): return 0.0 @@ -517,7 +589,7 @@ def read_rhs(filename, file_format: str): sr = global_info["sampling_rate"] # construct dtype by re-ordering channels by types - ordered_channels = [] + ordered_channel_info = [] if file_format == "header-attached": data_dtype = [("timestamp", "int32", BLOCK_SIZE)] else: @@ -537,7 +609,7 @@ def read_rhs(filename, file_format: str): chan_info["dtype"] = "uint16" else: chan_info["dtype"] = "int16" - ordered_channels.append(chan_info) + ordered_channel_info.append(chan_info) if file_format == "header-attached": name = chan_info["native_channel_name"] data_dtype += [(name, "uint16", BLOCK_SIZE)] @@ -557,7 +629,7 @@ def read_rhs(filename, file_format: str): chan_info_dc["offset"] = -512 * 19.23 chan_info_dc["signal_type"] = 10 # put it in another group chan_info_dc["dtype"] = "uint16" - ordered_channels.append(chan_info_dc) + ordered_channel_info.append(chan_info_dc) if file_format == "header-attached": data_dtype += [(name + "_DC", "uint16", BLOCK_SIZE)] else: @@ -579,7 +651,7 @@ def read_rhs(filename, file_format: str): chan_info_stim["offset"] = 0.0 chan_info_stim["signal_type"] = 11 # put it in another group chan_info_stim["dtype"] = "uint16" - ordered_channels.append(chan_info_stim) + ordered_channel_info.append(chan_info_stim) if file_format == "header-attached": data_dtype += [(name + "_STIM", "uint16", BLOCK_SIZE)] else: @@ -598,7 +670,7 @@ def read_rhs(filename, file_format: str): chan_info["gain"] = 0.0003125 chan_info["offset"] = -32768 * 0.0003125 chan_info["dtype"] = "uint16" - ordered_channels.append(chan_info) + ordered_channel_info.append(chan_info) if file_format == "header-attached": name = chan_info["native_channel_name"] data_dtype += [(name, "uint16", BLOCK_SIZE)] @@ -622,7 +694,7 @@ def read_rhs(filename, file_format: str): chan_info["gain"] = 1.0 chan_info["offset"] = 0.0 chan_info["dtype"] = "uint16" - ordered_channels.append(chan_info) + ordered_channel_info.append(chan_info) if file_format == "header-attached": data_dtype += [(name, "uint16", BLOCK_SIZE)] else: @@ -635,12 +707,16 @@ def read_rhs(filename, file_format: str): chan_info["gain"] = 1.0 chan_info["offset"] = 0.0 chan_info["dtype"] = "uint16" - ordered_channels.append(chan_info) + ordered_channel_info.append(chan_info) data_dtype[sig_type] = "uint16" - if global_info["notch_filter_mode"] == 2 and global_info["major_version"] >= V("3.0"): + # per discussion with Intan developers before version 3 of their software the 'notch_filter_mode' + # was a request for postprocessing to be done in one of their scripts. From version 3+ the notch + # filter is now applied to the data in realtime and only the post notched amplifier data is + # saved. + if global_info["notch_filter_mode"] == 2 and global_info["major_version"] >= Version("3.0"): global_info["notch_filter"] = "60Hz" - elif global_info["notch_filter_mode"] == 1 and global_info["major_version"] >= V("3.0"): + elif global_info["notch_filter_mode"] == 1 and global_info["major_version"] >= Version("3.0"): global_info["notch_filter"] = "50Hz" else: global_info["notch_filter"] = False @@ -650,7 +726,7 @@ def read_rhs(filename, file_format: str): data_dtype = {k: v for (k, v) in data_dtype.items() if len(v) > 0} channel_number_dict = {k: v for (k, v) in channel_number_dict.items() if v > 0} - return global_info, ordered_channels, data_dtype, header_size, BLOCK_SIZE, channel_number_dict + return global_info, ordered_channel_info, data_dtype, header_size, BLOCK_SIZE, channel_number_dict ############### @@ -745,22 +821,22 @@ def read_rhd(filename, file_format: str): global_info = read_variable_header(f, rhd_global_header_base) - version = V(f"{global_info['major_version']}.{global_info['minor_version']}") + version = Version(f"{global_info['major_version']}.{global_info['minor_version']}") # the header size depends on the version :-( header = list(rhd_global_header_part1) # make a copy - if version >= V("1.1"): + if version >= Version("1.1"): header = header + rhd_global_header_v11 else: global_info["num_temp_sensor_channels"] = 0 - if version >= V("1.3"): + if version >= Version("1.3"): header = header + rhd_global_header_v13 else: global_info["eval_board_mode"] = 0 - if version >= V("2.0"): + if version >= Version("2.0"): header = header + rhd_global_header_v20 else: global_info["reference_channel"] = "" @@ -789,14 +865,14 @@ def read_rhd(filename, file_format: str): sr = global_info["sampling_rate"] # construct the data block dtype and reorder channels - if version >= V("2.0"): + if version >= Version("2.0"): BLOCK_SIZE = 128 else: BLOCK_SIZE = 60 # 256 channels - ordered_channels = [] + ordered_channel_info = [] - if version >= V("1.2"): + if version >= Version("1.2"): if file_format == "header-attached": data_dtype = [("timestamp", "int32", BLOCK_SIZE)] else: @@ -820,7 +896,7 @@ def read_rhd(filename, file_format: str): else: chan_info["offset"] = 0.0 chan_info["dtype"] = "int16" - ordered_channels.append(chan_info) + ordered_channel_info.append(chan_info) if file_format == "header-attached": name = chan_info["native_channel_name"] @@ -835,7 +911,7 @@ def read_rhd(filename, file_format: str): chan_info["gain"] = 0.0000374 chan_info["offset"] = 0.0 chan_info["dtype"] = "uint16" - ordered_channels.append(chan_info) + ordered_channel_info.append(chan_info) if file_format == "header-attached": name = chan_info["native_channel_name"] data_dtype += [(name, "uint16", BLOCK_SIZE // 4)] @@ -849,7 +925,7 @@ def read_rhd(filename, file_format: str): chan_info["gain"] = 0.0000748 chan_info["offset"] = 0.0 chan_info["dtype"] = "uint16" - ordered_channels.append(chan_info) + ordered_channel_info.append(chan_info) if file_format == "header-attached": name = chan_info["native_channel_name"] data_dtype += [(name, "uint16")] @@ -865,7 +941,7 @@ def read_rhd(filename, file_format: str): chan_info["gain"] = 0.001 chan_info["offset"] = 0.0 chan_info["dtype"] = "int16" - ordered_channels.append(chan_info) + ordered_channel_info.append(chan_info) data_dtype += [(name, "int16")] # 3: USB board ADC input channel @@ -882,7 +958,7 @@ def read_rhd(filename, file_format: str): chan_info["gain"] = 0.0003125 chan_info["offset"] = -32768 * 0.0003125 chan_info["dtype"] = "uint16" - ordered_channels.append(chan_info) + ordered_channel_info.append(chan_info) if file_format == "header-attached": name = chan_info["native_channel_name"] data_dtype += [(name, "uint16", BLOCK_SIZE)] @@ -906,7 +982,7 @@ def read_rhd(filename, file_format: str): chan_info["gain"] = 1.0 chan_info["offset"] = 0.0 chan_info["dtype"] = "uint16" - ordered_channels.append(chan_info) + ordered_channel_info.append(chan_info) if file_format == "header-attached": data_dtype += [(name, "uint16", BLOCK_SIZE)] else: @@ -918,12 +994,16 @@ def read_rhd(filename, file_format: str): chan_info["gain"] = 1.0 chan_info["offset"] = 0.0 chan_info["dtype"] = "uint16" - ordered_channels.append(chan_info) + ordered_channel_info.append(chan_info) data_dtype[sig_type] = "uint16" - if global_info["notch_filter_mode"] == 2 and version >= V("3.0"): + # per discussion with Intan developers before version 3 of their software the 'notch_filter_mode' + # was a request for postprocessing to be done in one of their scripts. From version 3+ the notch + # filter is now applied to the data in realtime and only the post notched amplifier data is + # saved. + if global_info["notch_filter_mode"] == 2 and version >= Version("3.0"): global_info["notch_filter"] = "60Hz" - elif global_info["notch_filter_mode"] == 1 and version >= V("3.0"): + elif global_info["notch_filter_mode"] == 1 and version >= Version("3.0"): global_info["notch_filter"] = "50Hz" else: global_info["notch_filter"] = False @@ -933,7 +1013,7 @@ def read_rhd(filename, file_format: str): data_dtype = {k: v for (k, v) in data_dtype.items() if len(v) > 0} channel_number_dict = {k: v for (k, v) in channel_number_dict.items() if v > 0} - return global_info, ordered_channels, data_dtype, header_size, BLOCK_SIZE, channel_number_dict + return global_info, ordered_channel_info, data_dtype, header_size, BLOCK_SIZE, channel_number_dict ########################################################################## diff --git a/neo/test/rawiotest/test_intanrawio.py b/neo/test/rawiotest/test_intanrawio.py index 630aee1e2..b5e4549a6 100644 --- a/neo/test/rawiotest/test_intanrawio.py +++ b/neo/test/rawiotest/test_intanrawio.py @@ -1,7 +1,7 @@ import unittest +import numpy as np from neo.rawio.intanrawio import IntanRawIO - from neo.test.rawiotest.common_rawio_test import BaseTestRawIO @@ -23,6 +23,32 @@ class TestIntanRawIO( ] + def test_annotations(self): + + intan_reader = IntanRawIO(filename=self.get_local_path("intan/intan_rhd_test_1.rhd")) + intan_reader.parse_header() + + raw_annotations = intan_reader.raw_annotations + annotations = raw_annotations["blocks"][0]["segments"][0] # Intan is mono segment + signal_annotations = annotations["signals"][0] # As in the other exmaples, annotaions are duplicated + + + # Scalar annotations + exepcted_annotations = {'intan_version': '1.5', 'desired_impedance_test_frequency': 1000.0, 'desired_upper_bandwidth': 7500.0, 'note1': '', 'notch_filter_mode': 1, 'notch_filter': False, 'nb_signal_group': 7, + 'dsp_enabled': 1, 'actual_impedance_test_frequency': 1000.0, 'desired_lower_bandwidth': 0.1, 'note3': '', 'actual_dsp_cutoff_frequency': 1.165828, + 'desired_dsp_cutoff_frequency': 1.0, 'actual_lower_bandwidth': 0.0945291, 'eval_board_mode': 0, 'note2': '', 'num_temp_sensor_channels': 0} + + for key in exepcted_annotations: + if isinstance(exepcted_annotations[key], float): + self.assertAlmostEqual(signal_annotations[key], exepcted_annotations[key], places=2) + else: + self.assertEqual(signal_annotations[key], exepcted_annotations[key]) + + # Array annotations + signal_array_annotations = signal_annotations["__array_annotations__"] + np.testing.assert_array_equal(signal_array_annotations["native_order"][:10], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) + np.testing.assert_array_equal(signal_array_annotations["spike_scope_digital_edge_polarity"][:10], [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]) + np.testing.assert_array_equal(signal_array_annotations["board_stream_num"][:10], [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]) if __name__ == "__main__": unittest.main()