Skip to content

Commit 8ba7849

Browse files
committed
add support for edge cases
1 parent f8af14a commit 8ba7849

1 file changed

Lines changed: 158 additions & 160 deletions

File tree

src/probeinterface/neuropixels_tools.py

Lines changed: 158 additions & 160 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,12 @@ def get_probe_contour_vertices(shank_width, tip_length, probe_length) -> list:
212212

213213
def read_imro(file_path: Union[str, Path]) -> Probe:
214214
"""
215-
Read probe position from the imro file used in input of SpikeGlx and Open-Ephys for neuropixels probes.
215+
Read a Neuropixels probe from an IMRO (Imec ReadOut) table file.
216+
217+
IMRO files (.imro) are used by SpikeGLX and Open Ephys to configure which electrodes
218+
are recorded and their acquisition settings (gains, references, filters). This function
219+
reads the file, determines the probe part number from the IMRO header, builds the full
220+
catalogue probe, and slices it to the active electrodes specified in the table.
216221
217222
Parameters
218223
----------
@@ -221,10 +226,15 @@ def read_imro(file_path: Union[str, Path]) -> Probe:
221226
222227
Returns
223228
-------
224-
probe : Probe object
229+
probe : Probe
230+
Probe object with geometry, contact annotations, and device channel mapping.
231+
232+
See Also
233+
--------
234+
https://billkarsh.github.io/SpikeGLX/help/imroTables/
225235
226236
"""
227-
# the input is an imro file
237+
# ===== 1. Read file and determine probe part number from IMRO header =====
228238
meta_file = Path(file_path)
229239
assert meta_file.suffix == ".imro", "'file' should point to the .imro file"
230240
with meta_file.open(mode="r") as f:
@@ -247,101 +257,39 @@ def read_imro(file_path: Union[str, Path]) -> Probe:
247257
if imDatPrb_type == probe_type:
248258
imDatPrb_pn = probe_part_number
249259

250-
return _read_imro_string(imro_str, imDatPrb_pn)
251-
252-
253-
def _make_npx_probe_from_description(probe_description, model_name, elec_ids, shank_ids, mux_info=None) -> Probe:
254-
# used by _read_imro_string and for generating the NP library
255-
256-
# compute position
257-
y_idx, x_idx = np.divmod(elec_ids, probe_description["cols_per_shank"])
258-
x_pitch = probe_description["electrode_pitch_horz_um"]
259-
y_pitch = probe_description["electrode_pitch_vert_um"]
260-
261-
raw_stagger = (
262-
probe_description["even_row_horz_offset_left_edge_to_leftmost_electrode_center_um"]
263-
- probe_description["odd_row_horz_offset_left_edge_to_leftmost_electrode_center_um"]
264-
)
265-
266-
stagger = np.mod(y_idx + 1, 2) * raw_stagger
267-
x_pos = (x_idx * x_pitch + stagger).astype("float64")
268-
y_pos = (y_idx * y_pitch).astype("float64")
269-
270-
# if probe_description["shank_number"] > 1:
271-
if shank_ids is not None:
272-
# shank_ids = np.array(contact_info["shank_id"])
273-
shank_pitch = probe_description["shank_pitch_um"]
274-
contact_ids = [f"s{shank_id}e{elec_id}" for shank_id, elec_id in zip(shank_ids, elec_ids)]
275-
x_pos += np.array(shank_ids).astype(int) * shank_pitch
276-
else:
277-
# shank_ids = None
278-
contact_ids = [f"e{elec_id}" for elec_id in elec_ids]
279-
280-
positions = np.stack((x_pos, y_pos), axis=1)
281-
282-
# construct Probe object
283-
probe = Probe(ndim=2, si_units="um", model_name=model_name, manufacturer="imec")
284-
probe.description = probe_description["description"]
285-
probe.set_contacts(
286-
positions=positions,
287-
shapes="square",
288-
shank_ids=shank_ids,
289-
shape_params={"width": probe_description["electrode_size_horz_direction_um"]},
290-
)
260+
# ===== 2. Interpret IMRO table to extract recorded electrodes and acquisition settings =====
261+
imro_per_channel = _interpret_imro_string(imro_str, imDatPrb_pn)
291262

292-
probe.set_contact_ids(contact_ids)
293-
294-
# Add planar contour
295-
polygon = np.array(
296-
get_probe_contour_vertices(
297-
probe_description["shank_width_um"], probe_description["tip_length_um"], get_probe_length(model_name)
298-
)
299-
)
300-
301-
contour = []
302-
shank_pitch = probe_description["shank_pitch_um"]
303-
for shank_id in range(probe_description["num_shanks"]):
304-
shank_shift = np.array([shank_pitch * shank_id, 0])
305-
contour += list(polygon + shank_shift)
306-
307-
# final contour_shift
308-
middle_of_bottommost_electrode_to_top_of_shank_tip = 11
309-
contour_shift = np.array(
310-
[
311-
-probe_description["odd_row_horz_offset_left_edge_to_leftmost_electrode_center_um"],
312-
-middle_of_bottommost_electrode_to_top_of_shank_tip,
313-
]
314-
)
315-
contour = np.array(contour) + contour_shift
316-
probe.set_planar_contour(contour)
263+
# ===== 3. Build full probe with all possible contacts =====
264+
full_probe = build_neuropixels_probe(probe_part_number=imDatPrb_pn)
317265

318-
# shank tips : minimum of the polygon
319-
shank_tips = []
320-
for shank_id in range(probe_description["num_shanks"]):
321-
shank_shift = np.array([shank_pitch * shank_id, 0])
322-
shank_tip = np.array(polygon[2]) + contour_shift + shank_shift
323-
shank_tips.append(shank_tip.tolist())
266+
# ===== 4. Build contact IDs for active electrodes =====
267+
elec_ids = imro_per_channel["electrode"]
268+
shank_ids = imro_per_channel.get("shank", [None] * len(elec_ids))
269+
active_contact_ids = [
270+
_build_canonical_contact_id(elec_id, shank_id) for shank_id, elec_id in zip(shank_ids, elec_ids)
271+
]
324272

325-
probe.annotate(shank_tips=shank_tips)
273+
# ===== 5. Slice full probe to active electrodes =====
274+
contact_id_to_index = {cid: i for i, cid in enumerate(full_probe.contact_ids)}
275+
selected_indices = np.array([contact_id_to_index[cid] for cid in active_contact_ids])
276+
probe = full_probe.get_slice(selected_indices)
326277

327-
# wire it
328-
probe.set_device_channel_indices(np.arange(positions.shape[0]))
278+
# ===== 6. Annotate probe with recording-specific metadata =====
279+
adc_sampling_table = probe.annotations.get("adc_sampling_table")
280+
_annotate_probe_with_adc_sampling_info(probe, adc_sampling_table)
329281

330-
# set other key metadata annotations
331-
probe.annotate(
332-
adc_bit_depth=int(probe_description["adc_bit_depth"]),
333-
num_readout_channels=int(probe_description["num_readout_channels"]),
334-
ap_sample_frequency_hz=float(probe_description["ap_sample_frequency_hz"]),
335-
lf_sample_frequency_hz=float(probe_description["lf_sample_frequency_hz"]),
336-
)
282+
# Scalar annotations
283+
probe_type = imro_str.strip().split(")")[0].split(",")[0][1:]
284+
probe.annotate(probe_type=probe_type)
337285

338-
# annotate with MUX table
339-
if mux_info is not None:
340-
# annotate each contact with its mux channel
341-
num_adcs, num_channels_per_adc, adc_groups_array = make_mux_table_array(mux_info)
342-
probe.annotate(num_adcs=num_adcs)
343-
probe.annotate(num_channels_per_adc=num_channels_per_adc)
344-
_annotate_contacts_from_mux_table(probe, adc_groups_array)
286+
# Vector annotations from IMRO fields
287+
vector_properties = ("channel", "bank", "bank_mask", "ref_id", "ap_gain", "lf_gain", "ap_hipas_flt")
288+
vector_properties_available = {}
289+
for k, v in imro_per_channel.items():
290+
if k in vector_properties and len(v) > 0:
291+
vector_properties_available[imro_field_to_pi_field.get(k)] = v
292+
probe.annotate_contacts(**vector_properties_available)
345293

346294
return probe
347295

@@ -571,65 +519,6 @@ def _annotate_probe_with_adc_sampling_info(probe: Probe, adc_sampling_table: str
571519
_annotate_contacts_from_mux_table(probe, adc_groups_array)
572520

573521

574-
def _read_imro_string(imro_str: str, imDatPrb_pn: Optional[str] = None) -> Probe:
575-
"""
576-
Parse the IMRO table when presented as a string and create a Probe object.
577-
578-
Parameters
579-
----------
580-
imro_str : str
581-
IMRO table as a string.
582-
imDatPrb_pn : str, optional
583-
Probe number, by default None.
584-
585-
Returns
586-
-------
587-
Probe
588-
A Probe object built from the parsed IMRO table data.
589-
590-
See Also
591-
--------
592-
https://billkarsh.github.io/SpikeGLX/help/imroTables/
593-
594-
"""
595-
596-
# Extract probe_type from the IMRO header "(probe_type,num_chans)"
597-
probe_type = imro_str.strip().split(")")[0].split(",")[0][1:]
598-
599-
# Parse the IMRO table into per-channel data (same parser used by read_spikeglx)
600-
imro_per_channel = _parse_imro_string(imro_str, imDatPrb_pn)
601-
602-
# Build full catalogue probe and slice to active electrodes
603-
full_probe = build_neuropixels_probe(probe_part_number=imDatPrb_pn)
604-
605-
elec_ids = imro_per_channel["electrode"]
606-
shank_ids = imro_per_channel.get("shank", [None] * len(elec_ids))
607-
active_contact_ids = [
608-
_build_canonical_contact_id(elec_id, shank_id) for shank_id, elec_id in zip(shank_ids, elec_ids)
609-
]
610-
611-
contact_id_to_index = {cid: i for i, cid in enumerate(full_probe.contact_ids)}
612-
selected_indices = np.array([contact_id_to_index[cid] for cid in active_contact_ids])
613-
probe = full_probe.get_slice(selected_indices)
614-
615-
# ADC sampling annotations
616-
adc_sampling_table = probe.annotations.get("adc_sampling_table")
617-
_annotate_probe_with_adc_sampling_info(probe, adc_sampling_table)
618-
619-
# Scalar annotations
620-
probe.annotate(probe_type=probe_type)
621-
622-
# Vector annotations from IMRO fields
623-
vector_properties = ("channel", "bank", "bank_mask", "ref_id", "ap_gain", "lf_gain", "ap_hipas_flt")
624-
vector_properties_available = {}
625-
for k, v in imro_per_channel.items():
626-
if k in vector_properties and len(v) > 0:
627-
vector_properties_available[imro_field_to_pi_field.get(k)] = v
628-
probe.annotate_contacts(**vector_properties_available)
629-
630-
return probe
631-
632-
633522
def get_probe_metadata_from_probe_features(probe_features: dict, imDatPrb_pn: str):
634523
"""
635524
Parses the `probe_features` dict, to cast string to appropriate types
@@ -781,7 +670,7 @@ def _build_canonical_contact_id(electrode_id: int, shank_id: int | None = None)
781670
return f"e{electrode_id}"
782671

783672

784-
def _parse_imro_string(imro_table_string: str, probe_part_number: str) -> dict:
673+
def _interpret_imro_string(imro_table_string: str, probe_part_number: str) -> dict:
785674
"""
786675
Parse IMRO (Imec ReadOut) table string into structured per-channel data.
787676
@@ -826,18 +715,127 @@ def _parse_imro_string(imro_table_string: str, probe_part_number: str) -> dict:
826715
for field, field_value in zip(imro_fields, values):
827716
imro_per_channel[field].append(field_value)
828717

829-
# Ensure "electrode" key always exists with physical electrode IDs
830-
# Different probe types encode electrode selection differently
718+
# Resolve electrode IDs for probe types whose IMRO format does not include them.
719+
# NP2.x+ probes have "electrode" directly in the IMRO table. NP1.x probes encode
720+
# electrode selection indirectly and need computation.
721+
if "electrode" not in imro_per_channel:
722+
_resolve_active_contacts_for_np1(imro_per_channel)
831723
if "electrode" not in imro_per_channel:
832-
# NP1.0: Bank-based addressing (physical_electrode_id = bank * 384 + channel)
833-
readout_channel_ids = np.array(imro_per_channel["channel"])
834-
bank_key = "bank" if "bank" in imro_per_channel else "bank_mask"
835-
bank_indices = np.array(imro_per_channel[bank_key])
836-
imro_per_channel["electrode"] = (bank_indices * 384 + readout_channel_ids).tolist()
724+
_resolve_active_contacts_for_np1110(imro_per_channel, imro_table_string)
837725

838726
return imro_per_channel
839727

840728

729+
def _resolve_active_contacts_for_np1(imro_per_channel: dict) -> None:
730+
"""
731+
Compute electrode IDs for NP 1.0-like probes that use simple bank addressing.
732+
733+
These probes (IMRO types 0, 1020, 1030, 1100, 1120-1123, 1200, 1300) have
734+
"channel" and "bank" fields in their IMRO table but no "electrode" field.
735+
The electrode ID is computed as: electrode = bank * 384 + channel.
736+
737+
Modifies imro_per_channel in place, adding the "electrode" key.
738+
739+
Parameters
740+
----------
741+
imro_per_channel : dict
742+
Parsed IMRO data from _interpret_imro_string. Modified in place.
743+
"""
744+
if "channel" not in imro_per_channel:
745+
return
746+
747+
readout_channel_ids = np.array(imro_per_channel["channel"])
748+
bank_key = "bank" if "bank" in imro_per_channel else "bank_mask"
749+
bank_indices = np.array(imro_per_channel[bank_key])
750+
imro_per_channel["electrode"] = (bank_indices * 384 + readout_channel_ids).tolist()
751+
752+
753+
def _resolve_active_contacts_for_np1110(imro_per_channel: dict, imro_table_string: str) -> None:
754+
"""
755+
Compute electrode IDs for NP1110 (UHD2 active) probes that use group-based addressing.
756+
757+
NP1110 has 6144 electrodes in an 8x768 grid, 384 readout channels, 24 groups, and 16 banks.
758+
The IMRO table has 24 per-group entries (group, bankA, bankB) and a header with col_mode.
759+
Each group covers 16 channels. For each channel, the group index is deterministic, the
760+
effective bank is selected from (bankA, bankB) based on col_mode, and the electrode ID
761+
is computed from channel + bank using column/row lookup tables.
762+
763+
Modifies imro_per_channel in place, adding the "electrode" key with 384 electrode IDs.
764+
765+
Parameters
766+
----------
767+
imro_per_channel : dict
768+
Parsed IMRO data with keys "group", "bankA", "bankB" (24 entries each).
769+
imro_table_string : str
770+
Raw IMRO table string, needed to extract col_mode from the header.
771+
772+
References
773+
----------
774+
https://github.com/billkarsh/SpikeGLX/blob/51b96c70204c025748d69c9a588e07406728f9eb/Src-imro/IMROTbl_T1110.cpp
775+
"""
776+
if "group" not in imro_per_channel:
777+
return
778+
779+
# Extract col_mode from IMRO header: (type,col_mode,ref_id,ap_gain,lf_gain,ap_hipas_flt)
780+
header_str = imro_table_string.strip().split(")")[0][1:] # remove leading "("
781+
header_fields = header_str.split(",")
782+
col_mode = int(header_fields[1]) # 0=INNER, 1=OUTER, 2=ALL
783+
784+
groups_bankA = imro_per_channel["bankA"]
785+
groups_bankB = imro_per_channel["bankB"]
786+
787+
col_tbl = [0, 3, 1, 2, 1, 2, 0, 3]
788+
789+
def _grp_idx(ch):
790+
return 2 * ((ch % 384) // 32) + ((ch % 384) & 1)
791+
792+
def _col(ch, bank):
793+
grp_index = _grp_idx(ch)
794+
grp_col = col_tbl[4 * (bank & 1) + (grp_index % 4)]
795+
crossed = (bank // 4) & 1
796+
ingrp_col = ((((ch % 64) % 32) // 2) & 1) ^ crossed
797+
if ch & 1:
798+
return 2 * grp_col + (1 - ingrp_col)
799+
else:
800+
return 2 * grp_col + ingrp_col
801+
802+
def _row(ch, bank):
803+
grp_index = _grp_idx(ch)
804+
grp_row = grp_index // 4
805+
ingrp_row = ((ch % 64) % 32) // 4
806+
if ch & 1:
807+
b0_row = 8 * grp_row + (7 - ingrp_row)
808+
else:
809+
b0_row = 8 * grp_row + ingrp_row
810+
return 48 * bank + b0_row
811+
812+
def _bank_for_channel(ch, bankA, bankB):
813+
if col_mode == 2: # ALL
814+
return bankA
815+
# INNER (0) or OUTER (1): choose bankA or bankB based on column position
816+
c = _col(ch, bankA)
817+
if c < 4:
818+
use_bankA = (c % 2 == 0) if col_mode == 1 else (c % 2 == 1)
819+
else:
820+
use_bankA = (c % 2 == 1) if col_mode == 1 else (c % 2 == 0)
821+
return bankA if use_bankA else bankB
822+
823+
electrode_ids = []
824+
for ch in range(384):
825+
grp = _grp_idx(ch)
826+
bankA = groups_bankA[grp]
827+
bankB = groups_bankB[grp]
828+
bank = _bank_for_channel(ch, bankA, bankB)
829+
row = _row(ch, bank)
830+
col = _col(ch, bank)
831+
electrode_ids.append(8 * row + col)
832+
833+
imro_per_channel["electrode"] = electrode_ids
834+
# Also add the "channel" key (0-383) since the IMRO entries are per-group, not per-channel
835+
imro_per_channel["channel"] = list(range(384))
836+
837+
838+
841839
def read_spikeglx(file: str | Path) -> Probe:
842840
"""
843841
Read probe geometry and configuration from a SpikeGLX metadata file.
@@ -888,7 +886,7 @@ def read_spikeglx(file: str | Path) -> Probe:
888886
# Specifies which electrodes were selected for recording (e.g., 384 of 960) plus their
889887
# acquisition settings (gains, references, filters). See: https://billkarsh.github.io/SpikeGLX/help/imroTables/
890888
imro_table_string = meta["imroTbl"]
891-
imro_per_channel = _parse_imro_string(imro_table_string, imDatPrb_pn)
889+
imro_per_channel = _interpret_imro_string(imro_table_string, imDatPrb_pn)
892890

893891
# ===== 4. Build contact IDs for active electrodes =====
894892
# Convert physical electrode IDs to probeinterface canonical contact ID strings

0 commit comments

Comments
 (0)