Skip to content

Commit 1e1abae

Browse files
authored
Merge pull request #73 from tdincer/intan
Intan loader
2 parents f4312b3 + b7d1d70 commit 1e1abae

1 file changed

Lines changed: 287 additions & 0 deletions

File tree

element_interface/intan_loader.py

Lines changed: 287 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,287 @@
1+
import numpy as np
2+
import os, sys, struct
3+
from pathlib import Path
4+
import matplotlib.pyplot as plt
5+
6+
7+
def read_qstring(fid):
8+
"""Read Qt style QString.
9+
10+
The first 32-bit unsigned number indicates the length of the string (in bytes).
11+
If this number equals 0xFFFFFFFF, the string is null.
12+
13+
Strings are stored as unicode.
14+
"""
15+
16+
(length,) = struct.unpack("<I", fid.read(4))
17+
if length == int("ffffffff", 16):
18+
return ""
19+
20+
if length > (os.fstat(fid.fileno()).st_size - fid.tell() + 1):
21+
print(length)
22+
raise Exception("Length too long.")
23+
24+
# convert length from bytes to 16-bit Unicode words
25+
length = int(length / 2)
26+
27+
data = []
28+
for i in range(0, length):
29+
(c,) = struct.unpack("<H", fid.read(2))
30+
data.append(c)
31+
32+
if sys.version_info >= (3, 0):
33+
a = "".join([chr(c) for c in data])
34+
else:
35+
a = "".join([unichr(c) for c in data])
36+
37+
return a
38+
39+
40+
def read_header(fid):
41+
"""Reads the Intan File Format header from the given file."""
42+
43+
# Check 'magic number' at beginning of file to make sure this is an Intan
44+
# Technologies RHD2000 data file.
45+
(magic_number,) = struct.unpack("<I", fid.read(4))
46+
47+
if magic_number != int("0xD69127AC", 16):
48+
raise Exception("Unrecognized file type.")
49+
50+
header = {}
51+
# Read version number.
52+
version = {}
53+
(version["major"], version["minor"]) = struct.unpack("<hh", fid.read(4))
54+
header["version"] = version
55+
56+
print("")
57+
print(
58+
"Reading Intan Technologies RHS2000 Data File, Version {}.{}".format(
59+
version["major"], version["minor"]
60+
)
61+
)
62+
print("")
63+
64+
# Read information of sampling rate and amplifier frequency settings.
65+
(header["sample_rate"],) = struct.unpack("<f", fid.read(4))
66+
(
67+
header["dsp_enabled"],
68+
header["actual_dsp_cutoff_frequency"],
69+
header["actual_lower_bandwidth"],
70+
header["actual_lower_settle_bandwidth"],
71+
header["actual_upper_bandwidth"],
72+
header["desired_dsp_cutoff_frequency"],
73+
header["desired_lower_bandwidth"],
74+
header["desired_lower_settle_bandwidth"],
75+
header["desired_upper_bandwidth"],
76+
) = struct.unpack("<hffffffff", fid.read(34))
77+
78+
# This tells us if a software 50/60 Hz notch filter was enabled during
79+
# the data acquisition.
80+
(notch_filter_mode,) = struct.unpack("<h", fid.read(2))
81+
header["notch_filter_frequency"] = 0
82+
if notch_filter_mode == 1:
83+
header["notch_filter_frequency"] = 50
84+
elif notch_filter_mode == 2:
85+
header["notch_filter_frequency"] = 60
86+
87+
(
88+
header["desired_impedance_test_frequency"],
89+
header["actual_impedance_test_frequency"],
90+
) = struct.unpack("<ff", fid.read(8))
91+
(header["amp_settle_mode"], header["charge_recovery_mode"]) = struct.unpack(
92+
"<hh", fid.read(4)
93+
)
94+
95+
frequency_parameters = {}
96+
frequency_parameters["amplifier_sample_rate"] = header["sample_rate"]
97+
frequency_parameters["board_adc_sample_rate"] = header["sample_rate"]
98+
frequency_parameters["board_dig_in_sample_rate"] = header["sample_rate"]
99+
frequency_parameters["desired_dsp_cutoff_frequency"] = header[
100+
"desired_dsp_cutoff_frequency"
101+
]
102+
frequency_parameters["actual_dsp_cutoff_frequency"] = header[
103+
"actual_dsp_cutoff_frequency"
104+
]
105+
frequency_parameters["dsp_enabled"] = header["dsp_enabled"]
106+
frequency_parameters["desired_lower_bandwidth"] = header["desired_lower_bandwidth"]
107+
frequency_parameters["desired_lower_settle_bandwidth"] = header[
108+
"desired_lower_settle_bandwidth"
109+
]
110+
frequency_parameters["actual_lower_bandwidth"] = header["actual_lower_bandwidth"]
111+
frequency_parameters["actual_lower_settle_bandwidth"] = header[
112+
"actual_lower_settle_bandwidth"
113+
]
114+
frequency_parameters["desired_upper_bandwidth"] = header["desired_upper_bandwidth"]
115+
frequency_parameters["actual_upper_bandwidth"] = header["actual_upper_bandwidth"]
116+
frequency_parameters["notch_filter_frequency"] = header["notch_filter_frequency"]
117+
frequency_parameters["desired_impedance_test_frequency"] = header[
118+
"desired_impedance_test_frequency"
119+
]
120+
frequency_parameters["actual_impedance_test_frequency"] = header[
121+
"actual_impedance_test_frequency"
122+
]
123+
124+
header["frequency_parameters"] = frequency_parameters
125+
126+
(
127+
header["stim_step_size"],
128+
header["recovery_current_limit"],
129+
header["recovery_target_voltage"],
130+
) = struct.unpack("fff", fid.read(12))
131+
132+
note1 = read_qstring(fid)
133+
note2 = read_qstring(fid)
134+
note3 = read_qstring(fid)
135+
header["notes"] = {"note1": note1, "note2": note2, "note3": note3}
136+
137+
(header["dc_amplifier_data_saved"], header["eval_board_mode"]) = struct.unpack(
138+
"<hh", fid.read(4)
139+
)
140+
141+
header["ref_channel_name"] = read_qstring(fid)
142+
143+
# Create structure arrays for each type of data channel.
144+
header["spike_triggers"] = []
145+
header["amplifier_channels"] = []
146+
header["board_adc_channels"] = []
147+
header["board_dac_channels"] = []
148+
header["board_dig_in_channels"] = []
149+
header["board_dig_out_channels"] = []
150+
151+
# Read signal summary from data file header.
152+
(number_of_signal_groups,) = struct.unpack("<h", fid.read(2))
153+
print("n signal groups {}".format(number_of_signal_groups))
154+
155+
for signal_group in range(1, number_of_signal_groups + 1):
156+
signal_group_name = read_qstring(fid)
157+
signal_group_prefix = read_qstring(fid)
158+
(
159+
signal_group_enabled,
160+
signal_group_num_channels,
161+
signal_group_num_amp_channels,
162+
) = struct.unpack("<hhh", fid.read(6))
163+
164+
if (signal_group_num_channels > 0) and (signal_group_enabled > 0):
165+
for signal_channel in range(0, signal_group_num_channels):
166+
new_channel = {
167+
"port_name": signal_group_name,
168+
"port_prefix": signal_group_prefix,
169+
"port_number": signal_group,
170+
}
171+
new_channel["native_channel_name"] = read_qstring(fid)
172+
new_channel["custom_channel_name"] = read_qstring(fid)
173+
(
174+
new_channel["native_order"],
175+
new_channel["custom_order"],
176+
signal_type,
177+
channel_enabled,
178+
new_channel["chip_channel"],
179+
command_stream,
180+
new_channel["board_stream"],
181+
) = struct.unpack(
182+
"<hhhhhhh", fid.read(14)
183+
) # ignore command_stream
184+
new_trigger_channel = {}
185+
(
186+
new_trigger_channel["voltage_trigger_mode"],
187+
new_trigger_channel["voltage_threshold"],
188+
new_trigger_channel["digital_trigger_channel"],
189+
new_trigger_channel["digital_edge_polarity"],
190+
) = struct.unpack("<hhhh", fid.read(8))
191+
(
192+
new_channel["electrode_impedance_magnitude"],
193+
new_channel["electrode_impedance_phase"],
194+
) = struct.unpack("<ff", fid.read(8))
195+
196+
if channel_enabled:
197+
if signal_type == 0:
198+
header["amplifier_channels"].append(new_channel)
199+
header["spike_triggers"].append(new_trigger_channel)
200+
elif signal_type == 1:
201+
raise Exception("Wrong signal type for the rhs format")
202+
# header['aux_input_channels'].append(new_channel)
203+
elif signal_type == 2:
204+
raise Exception("Wrong signal type for the rhs format")
205+
# header['supply_voltage_channels'].append(new_channel)
206+
elif signal_type == 3:
207+
header["board_adc_channels"].append(new_channel)
208+
elif signal_type == 4:
209+
header["board_dac_channels"].append(new_channel)
210+
elif signal_type == 5:
211+
header["board_dig_in_channels"].append(new_channel)
212+
elif signal_type == 6:
213+
header["board_dig_out_channels"].append(new_channel)
214+
else:
215+
raise Exception("Unknown channel type.")
216+
217+
# Summarize contents of data file.
218+
header["num_amplifier_channels"] = len(header["amplifier_channels"])
219+
header["num_board_adc_channels"] = len(header["board_adc_channels"])
220+
header["num_board_dac_channels"] = len(header["board_dac_channels"])
221+
header["num_board_dig_in_channels"] = len(header["board_dig_in_channels"])
222+
header["num_board_dig_out_channels"] = len(header["board_dig_out_channels"])
223+
224+
return header
225+
226+
227+
def load_rhs(folder: str, file_expr: str):
228+
"""Load rhs data
229+
230+
Example:
231+
# Read data
232+
>>> rhs_data = load_rhs("/home/inbox/organoids21/032520_US_885kHz_sham", file_expr="amp*dat")
233+
234+
# Plot data
235+
>>> plt.plot(rhs_data["time"], rhs_data["recordings"]["amp-B-000.dat"])
236+
>>> plt.xlabel("Time (s)")
237+
>>> plt.ylabel("Reading")
238+
>>> plt.show()
239+
240+
Args:
241+
folder (str): Folder that contains info.rhs, time.dat, and *.dat files
242+
file_expr (str): regex pattern of the file names to be read.
243+
244+
Returns:
245+
rhs_data (dict): RHS data.
246+
rhs_data["header"] (dict): Header.
247+
rhs_data["recordings"] (dict): Readings from various files
248+
rhs_data["timestamps"] (np.array_like): Relative timestamps in seconds.
249+
"""
250+
251+
header_filepath = next(Path(folder).glob("info.rhs"))
252+
with open(header_filepath, "rb") as fid:
253+
header = read_header(fid)
254+
255+
time_file = next(Path(folder).glob("time.dat"))
256+
257+
timestamps = (
258+
np.memmap(time_file, dtype=np.int32)
259+
/ header["frequency_parameters"]["amplifier_sample_rate"]
260+
)
261+
262+
rhs_data = dict(header=header, timestamps=timestamps, recordings={})
263+
264+
file_paths = Path(folder).glob(file_expr)
265+
file_paths = [x for x in file_paths if x.as_posix() != "time.dat"]
266+
267+
for file_path in file_paths:
268+
file_path = file_path.as_posix()
269+
if "amp" in file_path:
270+
signal = np.memmap(file_path, dtype=np.int16)
271+
signal = signal * 0.195 # Convert to microvolts
272+
elif "board-ANALOG-IN" in file_path or "board-ANALOG-OUT" in file_path:
273+
signal = np.memmap(file_path, dtype=np.uint16)
274+
signal = (signal - 32768) * 0.0003125 # Convert to volts
275+
elif "dc-" in file_path:
276+
signal = np.memmap(file_path, dtype=np.uint16)
277+
signal = (signal - 512) * 19.23 # Convert to milivolts
278+
elif "board-DIGITAL-IN" in file_path or "board-DIGITAL-OUT" in file_path:
279+
signal = np.memmap(file_path, dtype=np.uint16)
280+
elif "stim-" in file_path:
281+
data = np.memmap(file_path, dtype=np.uint16)
282+
i = np.bitwise_and(data, 255) * header["stim_step_size"]
283+
sign = (128 - np.bitwise_and(data, 255)) / 128
284+
signal = i * sign
285+
rhs_data["recordings"][Path(file_path).relative_to(folder).stem] = signal
286+
287+
return rhs_data

0 commit comments

Comments
 (0)