From ff3ee09a8dd15bdcce3bb707f0d819245ac20342 Mon Sep 17 00:00:00 2001 From: Elihei2 Date: Mon, 1 Jun 2026 14:37:33 +0200 Subject: [PATCH] fix(io): robustly read NanoString CosMX datasets Generalize CosMXPreprocessor so CosMX exports actually read: resolve the transcripts source (CSV glob or transcripts.parquet fallback), resolve x/y/feature/assignment/compartment columns by candidate names, map CosMX compartments to the standard schema, and read cell/nucleus polygons from masks. Bundles the small column/boundary resolution helpers it needs, so it is self-contained and independent of the platform-detection feature. What to review: - src/segger/io/preprocessor.py: the helper functions + the CosMXPreprocessor rewrite (validate / transcripts / boundaries). - src/segger/io/fields.py: CosMx fallback_filename. - Verify against a real CosMX export. Minimal: no qv/z or synthetic boundaries. Base: main (does not depend on feat/io-platform-detection). --- src/segger/io/fields.py | 1 + src/segger/io/preprocessor.py | 377 ++++++++++++++++++++++++++++------ 2 files changed, 318 insertions(+), 60 deletions(-) diff --git a/src/segger/io/fields.py b/src/segger/io/fields.py index 8dc30a9..85259a8 100644 --- a/src/segger/io/fields.py +++ b/src/segger/io/fields.py @@ -72,6 +72,7 @@ class MerscopeBoundaryFields: @dataclass class CosMxTranscriptFields: filename: str = '*_tx_file.csv' + fallback_filename: str = 'transcripts.parquet' x: str = 'x_global_px' y: str = 'y_global_px' feature: str = 'target' diff --git a/src/segger/io/preprocessor.py b/src/segger/io/preprocessor.py index 9bbe62d..0eb9781 100644 --- a/src/segger/io/preprocessor.py +++ b/src/segger/io/preprocessor.py @@ -8,6 +8,7 @@ import polars as pl import pandas as pd import json +import csv import warnings import logging @@ -56,6 +57,90 @@ def decorator(cls): return cls return decorator +def _lazyframe_column_names(lf: pl.LazyFrame) -> list[str]: + """Return column names for a LazyFrame across Polars versions.""" + try: + return lf.collect_schema().names() + except AttributeError: + return lf.columns + + +def _first_existing(columns: list[str] | set[str], candidates: list[str]) -> str | None: + """Return the first candidate column name present in `columns`.""" + column_set = set(columns) + for candidate in candidates: + if candidate in column_set: + return candidate + return None + + +def _build_boundary_index(boundaries: pd.DataFrame) -> pd.Index: + """Return the canonical string index used for cell/nucleus boundaries.""" + std = StandardBoundaryFields() + boundary_suffix = boundaries[std.boundary_type].map({ + std.nucleus_value: "0", + std.cell_value: "1", + }) + if boundary_suffix.isnull().any(): + unknown_values = sorted( + { + str(value) + for value in boundaries.loc[boundary_suffix.isnull(), std.boundary_type].unique() + } + ) + raise ValueError( + "Unsupported boundary_type values while building boundary index: " + + ", ".join(unknown_values) + ) + boundary_ids = boundaries[std.id].copy() + missing_ids = boundary_ids.isnull() + boundary_ids = boundary_ids.astype(str) + if missing_ids.any(): + fallback = pd.Series(boundaries.index, index=boundaries.index).astype(str) + boundary_ids.loc[missing_ids] = "missing_" + fallback.loc[missing_ids] + + boundary_index = boundary_ids + "_" + boundary_suffix + duplicate_counts = boundary_index.groupby(boundary_index).cumcount() + boundary_index = boundary_index.where( + duplicate_counts.eq(0), + boundary_index + "_dup" + duplicate_counts.astype(str), + ) + return pd.Index(boundary_index, dtype="object") + + +def _empty_boundaries() -> gpd.GeoDataFrame: + """Return an empty boundary frame with the canonical schema.""" + std = StandardBoundaryFields() + empty = gpd.GeoDataFrame( + { + std.id: pd.Series(dtype="object"), + std.boundary_type: pd.Series(dtype="object"), + }, + geometry=gpd.GeoSeries([], dtype="geometry"), + ) + return empty.set_index(std.id) + + +def _clean_assignment_expr(column_name: str) -> pl.Expr: + """Normalize assignment values and map null-like tokens to null.""" + cleaned = pl.col(column_name).cast(pl.String, strict=False).str.strip_chars() + lowered = cleaned.str.to_lowercase() + return ( + pl.when( + cleaned.is_null() + | cleaned.eq("").fill_null(False) + | cleaned.eq("-1").fill_null(False) + | cleaned.eq("-1.0").fill_null(False) + | lowered.is_in( + ["none", "nan", "null", "na", "n/a", "unassigned", "unknown"] + ).fill_null(False) + ) + .then(None) + .otherwise(cleaned) + ) + + + class ISTPreprocessor(ABC): """ Abstract base class for platform-specific preprocessing of spatial @@ -231,94 +316,270 @@ class CosMXPreprocessor(ISTPreprocessor): Preprocessor for NanoString CosMX datasets. """ @staticmethod - def _validate_directory(data_dir: Path): + def _assignment_candidates() -> list[str]: + raw = CosMxTranscriptFields() + return [ + raw.cell_id, + "cell_id", + "cell_ID", + "EntityID", + "entity_id", + "nucleus_boundaries_id", + "cell_boundaries_id", + ] - # Check required files/directories + @staticmethod + def _has_mask_inputs(data_dir: Path) -> bool: bd_fields = CosMxBoundaryFields() + return ( + len(list(data_dir.glob(bd_fields.compartment_labels_dirname))) == 1 + and len(list(data_dir.glob(bd_fields.cell_labels_dirname))) == 1 + and len(list(data_dir.glob(bd_fields.fov_positions_filename))) == 1 + ) + + @staticmethod + def _has_native_schema(columns: list[str] | set[str]) -> bool: + raw = CosMxTranscriptFields() + return {raw.x, raw.y, raw.feature}.issubset(set(columns)) + + @staticmethod + def _validate_directory(data_dir: Path): + tx_fields = CosMxTranscriptFields() - for pat in [ - tx_fields.filename, - bd_fields.compartment_labels_dirname, - bd_fields.cell_labels_dirname, - bd_fields.fov_positions_filename, - ]: - num_matches = len(list(data_dir.glob(pat))) - if not num_matches == 1: + tx_path = CosMXPreprocessor._resolve_transcripts_path(data_dir) + tx_columns = _lazyframe_column_names( + CosMXPreprocessor._scan_transcripts_file(tx_path) + ) + + # Keep auto-inference strict: only match CosMX when native markers exist. + has_native_file = len(list(data_dir.glob(tx_fields.filename))) == 1 + has_native_schema = CosMXPreprocessor._has_native_schema(tx_columns) + has_native_masks = CosMXPreprocessor._has_mask_inputs(data_dir) + if not (has_native_file or has_native_schema or has_native_masks): + raise IOError( + "Directory does not look like a CosMX output layout " + "(missing native transcript schema and mask markers)." + ) + + x_col = _first_existing( + tx_columns, + [tx_fields.x, "x", "x_location", "global_x"], + ) + y_col = _first_existing( + tx_columns, + [tx_fields.y, "y", "y_location", "global_y"], + ) + feature_col = _first_existing( + tx_columns, + [tx_fields.feature, "feature_name", "gene"], + ) + assignment_col = _first_existing( + tx_columns, + CosMXPreprocessor._assignment_candidates(), + ) + + if x_col is None or y_col is None or feature_col is None or assignment_col is None: + missing_tx_columns: list[str] = [] + if x_col is None: + missing_tx_columns.append("x") + if y_col is None: + missing_tx_columns.append("y") + if feature_col is None: + missing_tx_columns.append("feature") + if assignment_col is None: + missing_tx_columns.append("cell_or_nucleus_assignment") + raise IOError( + f"CosMx transcripts file '{tx_path.name}' is missing minimum usable columns " + f"{missing_tx_columns}." + ) + + @staticmethod + def _resolve_transcripts_path(data_dir: Path) -> Path: + tx_fields = CosMxTranscriptFields() + + matches_by_pattern: dict[str, list[Path]] = {} + for pattern in (tx_fields.filename, tx_fields.fallback_filename): + matches = sorted(data_dir.glob(pattern)) + if len(matches) > 1: raise IOError( - f"CosMx sample directory must contain exactly 1 file or " - f"directory matching {pat}, but found {num_matches}." + f"CosMx sample directory must contain at most one file " + f"matching '{pattern}', but found {len(matches)}." ) + matches_by_pattern[pattern] = matches + + primary = matches_by_pattern[tx_fields.filename] + fallback = matches_by_pattern[tx_fields.fallback_filename] + if len(primary) == 1: + return primary[0] + if len(fallback) == 1: + return fallback[0] + raise IOError( + "CosMx sample directory must contain either " + f"'{tx_fields.filename}' or '{tx_fields.fallback_filename}'." + ) + + @staticmethod + def _scan_transcripts_file(path: Path) -> pl.LazyFrame: + if path.suffix.lower() == ".csv": + return pl.scan_csv(path) + if path.suffix.lower() == ".parquet": + return pl.scan_parquet(path, parallel="row_groups") + raise ValueError(f"Unsupported CosMx transcript file format: {path}") @cached_property def transcripts(self) -> pl.DataFrame: - # Field names raw = CosMxTranscriptFields() std = StandardTranscriptFields() - return ( - # Read in lazily - pl.scan_csv(next(self.data_dir.glob(raw.filename))) - .with_row_index(name=std.row_index) - # Filter data - .filter(pl.col(raw.feature).str.contains( - '|'.join(raw.filter_substrings)).not_() + source_path = self._resolve_transcripts_path(self.data_dir) + lf = self._scan_transcripts_file(source_path).with_row_index(name=std.row_index) + columns = _lazyframe_column_names(lf) + + x_col = _first_existing(columns, [raw.x, "x", "x_location", "global_x"]) + y_col = _first_existing(columns, [raw.y, "y", "y_location", "global_y"]) + feature_col = _first_existing(columns, [raw.feature, "feature_name", "gene"]) + assignment_col = _first_existing(columns, self._assignment_candidates()) + compartment_col = _first_existing(columns, [raw.compartment, "cell_compartment", "compartment"]) + + if x_col is None or y_col is None or feature_col is None or assignment_col is None: + raise ValueError( + "CosMx transcripts require minimum usable data columns: " + "x, y, feature, and cell/nucleus assignment." ) - # Standardize compartment labels - .with_columns( - pl.col(raw.compartment) - .replace_strict( - { - raw.nucleus_value: std.nucleus_value, - raw.membrane_value: std.cytoplasmic_value, - raw.cytoplasmic_value: std.cytoplasmic_value, - raw.extracellular_value: std.extracellular_value, - None: std.extracellular_value, - }, - return_dtype=pl.Int8, + + if ( + x_col != raw.x + or y_col != raw.y + or feature_col != raw.feature + or assignment_col != raw.cell_id + ): + warnings.warn( + "CosMx transcripts are being parsed in compatibility mode " + f"(x='{x_col}', y='{y_col}', feature='{feature_col}', assignment='{assignment_col}')." + ) + + # Filter technical controls when feature labels look CosMx-like. + lf = lf.filter( + pl.col(feature_col).str.contains("|".join(raw.filter_substrings)).not_() + ) + + assignment_expr = _clean_assignment_expr(assignment_col) + if compartment_col is not None: + compartment_raw = pl.col(compartment_col).cast(pl.String, strict=False).str.strip_chars() + compartment_lower = compartment_raw.str.to_lowercase() + compartment_expr = ( + pl.when( + compartment_raw == raw.nucleus_value + ) + .then(std.nucleus_value) + .when( + compartment_lower == "nucleus" + ) + .then(std.nucleus_value) + .when( + compartment_lower == "nuclear" + ) + .then(std.nucleus_value) + .when( + compartment_raw.is_in( + [raw.membrane_value, raw.cytoplasmic_value] + ).fill_null(False) + ) + .then(std.cytoplasmic_value) + .when( + compartment_lower.is_in(["cytoplasm", "cytoplasmic", "membrane"]).fill_null(False) ) + .then(std.cytoplasmic_value) + .otherwise(std.extracellular_value) .alias(std.compartment) ) - # Standardize cell IDs + else: + warnings.warn( + "CosMx transcripts have no compartment column. Using assignment-only " + "compartment fallback." + ) + compartment_expr = ( + pl.when(assignment_expr.is_not_null()) + .then(std.cytoplasmic_value) + .otherwise(std.extracellular_value) + .alias(std.compartment) + ) + + cell_id_expr = assignment_expr + lf = ( + lf + .with_columns(compartment_expr) .with_columns( pl.when(pl.col(std.compartment) != std.extracellular_value) - .then(pl.col(raw.cell_id)) + .then(cell_id_expr) .otherwise(None) .alias(std.cell_id) ) - # Map to standard field names - .rename({raw.x: std.x, raw.y: std.y, raw.feature: std.feature}) - - # Subset to necessary fields - .select([std.row_index, std.x, std.y, std.feature, std.cell_id, - std.compartment]) + ) + + rename_map = {x_col: std.x, y_col: std.y, feature_col: std.feature} + select_cols = [std.row_index, std.x, std.y, std.feature, std.cell_id, std.compartment] - # Add numeric index - .with_row_index() + return ( + lf + .rename(rename_map) + .select(select_cols) .collect() ) @cached_property def boundaries(self) -> gpd.GeoDataFrame: - - # Field names raw = CosMxBoundaryFields() std = StandardBoundaryFields() + std_tx = StandardTranscriptFields() - # Join boundary datasets - cells = get_cosmx_polygons(self.data_dir, 'cell').reset_index( - drop=False, names=std.id) - cells = fix_invalid_geometry(cells) - cells[std.boundary_type] = std.cell_value + has_mask_inputs = self._has_mask_inputs(self.data_dir) - nuclei = get_cosmx_polygons(self.data_dir, 'nucleus').reset_index( - drop=False, names=std.id) - nuclei = fix_invalid_geometry(nuclei) - nuclei[std.boundary_type] = std.nucleus_value + cells = _empty_boundaries() + nuclei = _empty_boundaries() + if has_mask_inputs: + try: + cells = get_cosmx_polygons(self.data_dir, "cell").reset_index( + drop=False, names=std.id + ) + cells = fix_invalid_geometry(cells) + cells[std.boundary_type] = std.cell_value - bd = pd.concat([cells, nuclei]) + nuclei = get_cosmx_polygons(self.data_dir, "nucleus").reset_index( + drop=False, names=std.id + ) + nuclei = fix_invalid_geometry(nuclei) + nuclei[std.boundary_type] = std.nucleus_value + except Exception as exc: + warnings.warn( + "CosMx boundary masks were found but could not be parsed. " + f"Falling back to synthetic boundaries ({exc})." + ) + cells = _empty_boundaries() + nuclei = _empty_boundaries() + + if len(cells) == 0 and len(nuclei) > 0: + cells = nuclei.copy() + cells[std.boundary_type] = std.cell_value + if len(nuclei) == 0 and len(cells) > 0: + nuclei = cells.copy() + nuclei[std.boundary_type] = std.nucleus_value + if len(cells) == 0 and len(nuclei) == 0: + raise ValueError( + "Could not construct CosMx boundaries. Minimum usable transcripts " + "must include x/y plus cell or nucleus assignment." + ) + + bd = pd.concat( + [ + cells.reset_index(drop=False, names=std.id), + nuclei.reset_index(drop=False, names=std.id), + ], + ignore_index=True, + ) + bd[std.id] = bd[std.id].astype(str) - # Add nucleus column bd[std.contains_nucleus] = bd[std.id].map( pl.from_pandas(bd[[std.id, std.boundary_type]]) .group_by(std.id) @@ -327,11 +588,7 @@ def boundaries(self) -> gpd.GeoDataFrame: .set_index(std.id) .get(std.boundary_type) ) - # Convert index to string type (to join on AnnData) - bd.index = bd[std.id] + '_' + bd[std.boundary_type].map({ - std.nucleus_value: '0', - std.cell_value: '1', - }) + bd.index = _build_boundary_index(bd) return bd def _get_anndata(self, transcripts, label):