Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 141 additions & 0 deletions src/segger/io/preprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import polars as pl
import pandas as pd
import json
import csv
import warnings
import logging

Expand Down Expand Up @@ -56,6 +57,143 @@ def decorator(cls):
return cls
return decorator

def _lazyframe_column_names(lf: pl.LazyFrame) -> list[str]:
"""Return column names for a LazyFrame across Polars versions."""
try:
return lf.collect_schema().names()
except AttributeError:
return lf.columns


def _first_existing(columns: list[str] | set[str], candidates: list[str]) -> str | None:
"""Return the first candidate column name present in `columns`."""
column_set = set(columns)
for candidate in candidates:
if candidate in column_set:
return candidate
return None


def _build_boundary_index(boundaries: pd.DataFrame) -> pd.Index:
"""Return the canonical string index used for cell/nucleus boundaries."""
std = StandardBoundaryFields()
boundary_suffix = boundaries[std.boundary_type].map({
std.nucleus_value: "0",
std.cell_value: "1",
})
if boundary_suffix.isnull().any():
unknown_values = sorted(
{
str(value)
for value in boundaries.loc[boundary_suffix.isnull(), std.boundary_type].unique()
}
)
raise ValueError(
"Unsupported boundary_type values while building boundary index: "
+ ", ".join(unknown_values)
)
boundary_ids = boundaries[std.id].copy()
missing_ids = boundary_ids.isnull()
boundary_ids = boundary_ids.astype(str)
if missing_ids.any():
fallback = pd.Series(boundaries.index, index=boundaries.index).astype(str)
boundary_ids.loc[missing_ids] = "missing_" + fallback.loc[missing_ids]

boundary_index = boundary_ids + "_" + boundary_suffix
duplicate_counts = boundary_index.groupby(boundary_index).cumcount()
boundary_index = boundary_index.where(
duplicate_counts.eq(0),
boundary_index + "_dup" + duplicate_counts.astype(str),
)
return pd.Index(boundary_index, dtype="object")


def _empty_boundaries() -> gpd.GeoDataFrame:
"""Return an empty boundary frame with the canonical schema."""
std = StandardBoundaryFields()
empty = gpd.GeoDataFrame(
{
std.id: pd.Series(dtype="object"),
std.boundary_type: pd.Series(dtype="object"),
},
geometry=gpd.GeoSeries([], dtype="geometry"),
)
return empty.set_index(std.id)


def _clean_assignment_expr(column_name: str) -> pl.Expr:
"""Normalize assignment values and map null-like tokens to null."""
cleaned = pl.col(column_name).cast(pl.String, strict=False).str.strip_chars()
lowered = cleaned.str.to_lowercase()
return (
pl.when(
cleaned.is_null()
| cleaned.eq("").fill_null(False)
| cleaned.eq("-1").fill_null(False)
| cleaned.eq("-1.0").fill_null(False)
| lowered.is_in(
["none", "nan", "null", "na", "n/a", "unassigned", "unknown"]
).fill_null(False)
)
.then(None)
.otherwise(cleaned)
)


def _platform_tiebreak(data_dir: Path, candidates: list[str]) -> str | None:
"""Break platform inference ties using directory markers and transcript schema."""
tx_columns: list[str] = []
tx_path = data_dir / "transcripts.parquet"
if tx_path.exists():
try:
tx_columns = _lazyframe_column_names(
pl.scan_parquet(tx_path, parallel="row_groups")
)
except Exception:
tx_columns = []

scores: dict[str, int] = {name: 0 for name in candidates}

if "nanostring_cosmx" in scores:
if len(list(data_dir.glob("CompartmentLabels"))) == 1:
scores["nanostring_cosmx"] += 100
if len(list(data_dir.glob("CellLabels"))) == 1:
scores["nanostring_cosmx"] += 100
if {"x_global_px", "y_global_px", "target"}.issubset(set(tx_columns)):
scores["nanostring_cosmx"] += 50
if "CellComp" in tx_columns:
scores["nanostring_cosmx"] += 20

if "vizgen_merscope" in scores:
if len(list(data_dir.glob("detected_transcripts.csv"))) == 1:
scores["vizgen_merscope"] += 100
if {"global_x", "global_y", "gene"}.issubset(set(tx_columns)):
scores["vizgen_merscope"] += 50
if "nucleus_boundaries_id" in tx_columns:
scores["vizgen_merscope"] += 20

if "10x_xenium" in scores:
if len(list(data_dir.glob("cell_boundaries.parquet"))) == 1:
scores["10x_xenium"] += 100
if len(list(data_dir.glob("nucleus_boundaries.parquet"))) == 1:
scores["10x_xenium"] += 100
if {"x_location", "y_location", "feature_name"}.issubset(set(tx_columns)):
scores["10x_xenium"] += 50
if "overlaps_nucleus" in tx_columns:
scores["10x_xenium"] += 20

if not scores:
return None
best = max(scores.values())
if best <= 0:
return None
winners = [name for name, score in scores.items() if score == best]
if len(winners) == 1:
return winners[0]
return None



class ISTPreprocessor(ABC):
"""
Abstract base class for platform-specific preprocessing of spatial
Expand Down Expand Up @@ -554,6 +692,9 @@ def _infer_platform(data_dir: Path) -> str:
f"Could not infer platform from data directory: {err_str}."
)
elif len(matches) > 1:
tie_break = _platform_tiebreak(data_dir, matches)
if tie_break is not None:
return tie_break
conflicting_platforms = ", ".join(matches)
raise ValueError(
f"Ambiguous data directory: Multiple platforms match: "
Expand Down