From 0181299576d2d045f377170aefff3fcc3a2e5c67 Mon Sep 17 00:00:00 2001 From: Elihei2 Date: Mon, 1 Jun 2026 13:29:09 +0200 Subject: [PATCH] fix(io): platform-detection helpers + tiebreak (shared base) Add the small shared building blocks used by the CosMX/MERSCOPE readers: column/schema resolution helpers (_lazyframe_column_names, _first_existing), boundary-index + empty-boundaries helpers, assignment-id cleaning, and a directory/schema-based _platform_tiebreak wired into _infer_platform so an ambiguous directory resolves instead of erroring. No behavior change for unambiguous Xenium dirs. Base for bugfix/read-cosmx and bugfix/read-merscope. --- src/segger/io/preprocessor.py | 141 ++++++++++++++++++++++++++++++++++ 1 file changed, 141 insertions(+) diff --git a/src/segger/io/preprocessor.py b/src/segger/io/preprocessor.py index 9bbe62d..e78f5bf 100644 --- a/src/segger/io/preprocessor.py +++ b/src/segger/io/preprocessor.py @@ -8,6 +8,7 @@ import polars as pl import pandas as pd import json +import csv import warnings import logging @@ -56,6 +57,143 @@ def decorator(cls): return cls return decorator +def _lazyframe_column_names(lf: pl.LazyFrame) -> list[str]: + """Return column names for a LazyFrame across Polars versions.""" + try: + return lf.collect_schema().names() + except AttributeError: + return lf.columns + + +def _first_existing(columns: list[str] | set[str], candidates: list[str]) -> str | None: + """Return the first candidate column name present in `columns`.""" + column_set = set(columns) + for candidate in candidates: + if candidate in column_set: + return candidate + return None + + +def _build_boundary_index(boundaries: pd.DataFrame) -> pd.Index: + """Return the canonical string index used for cell/nucleus boundaries.""" + std = StandardBoundaryFields() + boundary_suffix = boundaries[std.boundary_type].map({ + std.nucleus_value: "0", + std.cell_value: "1", + }) + if boundary_suffix.isnull().any(): + unknown_values = sorted( + { + str(value) + for value in boundaries.loc[boundary_suffix.isnull(), std.boundary_type].unique() + } + ) + raise ValueError( + "Unsupported boundary_type values while building boundary index: " + + ", ".join(unknown_values) + ) + boundary_ids = boundaries[std.id].copy() + missing_ids = boundary_ids.isnull() + boundary_ids = boundary_ids.astype(str) + if missing_ids.any(): + fallback = pd.Series(boundaries.index, index=boundaries.index).astype(str) + boundary_ids.loc[missing_ids] = "missing_" + fallback.loc[missing_ids] + + boundary_index = boundary_ids + "_" + boundary_suffix + duplicate_counts = boundary_index.groupby(boundary_index).cumcount() + boundary_index = boundary_index.where( + duplicate_counts.eq(0), + boundary_index + "_dup" + duplicate_counts.astype(str), + ) + return pd.Index(boundary_index, dtype="object") + + +def _empty_boundaries() -> gpd.GeoDataFrame: + """Return an empty boundary frame with the canonical schema.""" + std = StandardBoundaryFields() + empty = gpd.GeoDataFrame( + { + std.id: pd.Series(dtype="object"), + std.boundary_type: pd.Series(dtype="object"), + }, + geometry=gpd.GeoSeries([], dtype="geometry"), + ) + return empty.set_index(std.id) + + +def _clean_assignment_expr(column_name: str) -> pl.Expr: + """Normalize assignment values and map null-like tokens to null.""" + cleaned = pl.col(column_name).cast(pl.String, strict=False).str.strip_chars() + lowered = cleaned.str.to_lowercase() + return ( + pl.when( + cleaned.is_null() + | cleaned.eq("").fill_null(False) + | cleaned.eq("-1").fill_null(False) + | cleaned.eq("-1.0").fill_null(False) + | lowered.is_in( + ["none", "nan", "null", "na", "n/a", "unassigned", "unknown"] + ).fill_null(False) + ) + .then(None) + .otherwise(cleaned) + ) + + +def _platform_tiebreak(data_dir: Path, candidates: list[str]) -> str | None: + """Break platform inference ties using directory markers and transcript schema.""" + tx_columns: list[str] = [] + tx_path = data_dir / "transcripts.parquet" + if tx_path.exists(): + try: + tx_columns = _lazyframe_column_names( + pl.scan_parquet(tx_path, parallel="row_groups") + ) + except Exception: + tx_columns = [] + + scores: dict[str, int] = {name: 0 for name in candidates} + + if "nanostring_cosmx" in scores: + if len(list(data_dir.glob("CompartmentLabels"))) == 1: + scores["nanostring_cosmx"] += 100 + if len(list(data_dir.glob("CellLabels"))) == 1: + scores["nanostring_cosmx"] += 100 + if {"x_global_px", "y_global_px", "target"}.issubset(set(tx_columns)): + scores["nanostring_cosmx"] += 50 + if "CellComp" in tx_columns: + scores["nanostring_cosmx"] += 20 + + if "vizgen_merscope" in scores: + if len(list(data_dir.glob("detected_transcripts.csv"))) == 1: + scores["vizgen_merscope"] += 100 + if {"global_x", "global_y", "gene"}.issubset(set(tx_columns)): + scores["vizgen_merscope"] += 50 + if "nucleus_boundaries_id" in tx_columns: + scores["vizgen_merscope"] += 20 + + if "10x_xenium" in scores: + if len(list(data_dir.glob("cell_boundaries.parquet"))) == 1: + scores["10x_xenium"] += 100 + if len(list(data_dir.glob("nucleus_boundaries.parquet"))) == 1: + scores["10x_xenium"] += 100 + if {"x_location", "y_location", "feature_name"}.issubset(set(tx_columns)): + scores["10x_xenium"] += 50 + if "overlaps_nucleus" in tx_columns: + scores["10x_xenium"] += 20 + + if not scores: + return None + best = max(scores.values()) + if best <= 0: + return None + winners = [name for name, score in scores.items() if score == best] + if len(winners) == 1: + return winners[0] + return None + + + class ISTPreprocessor(ABC): """ Abstract base class for platform-specific preprocessing of spatial @@ -554,6 +692,9 @@ def _infer_platform(data_dir: Path) -> str: f"Could not infer platform from data directory: {err_str}." ) elif len(matches) > 1: + tie_break = _platform_tiebreak(data_dir, matches) + if tie_break is not None: + return tie_break conflicting_platforms = ", ".join(matches) raise ValueError( f"Ambiguous data directory: Multiple platforms match: "