-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapp.py
More file actions
212 lines (166 loc) · 7.92 KB
/
app.py
File metadata and controls
212 lines (166 loc) · 7.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
import streamlit as st
import os
import sys
import tempfile
import numpy as np
import SimpleITK as sitk
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap
# ===========================================================================
# 0. HACK PARA JOBLIB (El "Fantasma" de __main__)
# Al clonar la clase aquí, joblib la encontrará cuando despierte el modelo
# ===========================================================================
from sklearn.base import clone, BaseEstimator, ClassifierMixin
class ClasificadorOrdinalFrankHall(BaseEstimator, ClassifierMixin):
"""Convierte cualquier clasificador en un modelo Ordinal de gravedad clínica."""
def __init__(self, estimator):
self.estimator = estimator
def fit(self, X, y):
self.classes_ = np.sort(np.unique(y))
self.estimators_ = []
for i in range(len(self.classes_) - 1):
y_binario = (y > self.classes_[i]).astype(int)
clon = clone(self.estimator)
clon.fit(X, y_binario)
self.estimators_.append(clon)
return self
def predict_proba(self, X):
probs_mayor_que = [est.predict_proba(X)[:, 1] for est in self.estimators_]
probs = np.zeros((X.shape[0], len(self.classes_)))
probs[:, 0] = 1.0 - probs_mayor_que[0]
for i in range(1, len(self.classes_) - 1):
probs[:, i] = probs_mayor_que[i - 1] - probs_mayor_que[i]
probs[:, -1] = probs_mayor_que[-1]
probs = np.clip(probs, 0.0, 1.0)
return probs / probs.sum(axis=1, keepdims=True)
def predict(self, X):
return self.classes_[np.argmax(self.predict_proba(X), axis=1)]
# ==========================================
# 1. Configuración y Rutas del Proyecto
# ==========================================
st.set_page_config(
page_title="NodeQuant AI",
page_icon="🫁",
layout="wide",
initial_sidebar_state="expanded"
)
# Ajusta tu ruta raíz (Puedes dejarla fija o hacerla dinámica)
ruta_raiz = r"C:\Users\korev\Documents\Cursos\Samsung Innovation Campus\NodeQuantAI"
if ruta_raiz not in sys.path:
sys.path.insert(0, ruta_raiz)
# Importamos tu motor de inferencia
from dashboard.motor_inferencia import MotorInferenciaNodeQuant
def generar_figura_fusion(ruta_img, ruta_mask):
try:
ct_itk = sitk.ReadImage(ruta_img)
mask_itk = sitk.ReadImage(ruta_mask)
resampler = sitk.ResampleImageFilter()
resampler.SetReferenceImage(ct_itk)
resampler.SetInterpolator(sitk.sitkNearestNeighbor)
resampler.SetDefaultPixelValue(0)
resampler.SetTransform(sitk.Transform())
mask_alineada = resampler.Execute(mask_itk)
img_data = sitk.GetArrayFromImage(ct_itk)
mask_data = sitk.GetArrayFromImage(mask_alineada)
# Buscando el área más grande de la máscara
pixeles_por_slice = np.sum(mask_data > 0, axis=(1, 2))
if pixeles_por_slice.max() > 0:
slice_idx = np.argmax(pixeles_por_slice)
else:
slice_idx = img_data.shape[0] // 2 # Por si la máscara viene vacía
img_slice = np.rot90(img_data[slice_idx, :, :], 2)
mask_slice = np.rot90(mask_data[slice_idx, :, :], 2)
valor_fondo = img_slice.min()
# Consideramos como "cuerpo del paciente" todo lo que supere ese vacío
mascara_cuerpo = img_slice > (valor_fondo + 20)
filas_validas = np.any(mascara_cuerpo, axis=1)
cols_validas = np.any(mascara_cuerpo, axis=0)
if np.any(filas_validas) and np.any(cols_validas):
rmin, rmax = np.where(filas_validas)[0][[0, -1]]
cmin, cmax = np.where(cols_validas)[0][[0, -1]]
# Damos un margen para que no quede pegado a los bordes
margen = 20
rmin, rmax = max(0, rmin - margen), min(img_slice.shape[0], rmax + margen)
cmin, cmax = max(0, cmin - margen), min(img_slice.shape[1], cmax + margen)
# Aplicamos el recorte a tomografía y máscara al mismo tiempo
img_slice = img_slice[rmin:rmax, cmin:cmax]
mask_slice = mask_slice[rmin:rmax, cmin:cmax]
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
fig.patch.set_alpha(0.0)
vmin, vmax = np.percentile(img_slice, [2, 98])
axes[0].imshow(img_slice, cmap='gray', vmin=vmin, vmax=vmax)
axes[0].set_title(f'Tomografía (Corte {slice_idx})', color='white')
axes[0].axis('off')
axes[1].imshow(mask_slice, cmap='bone', vmin=0, vmax=1)
axes[1].set_title('Máscara del Ganglio', color='white')
axes[1].axis('off')
axes[2].imshow(img_slice, cmap='gray', vmin=vmin, vmax=vmax)
cmap_rojo = ListedColormap(['#ff1111'])
mask_overlay = np.ma.masked_where(mask_slice == 0, mask_slice)
axes[2].imshow(mask_overlay, cmap=cmap_rojo, alpha=0.5, interpolation='none')
axes[2].set_title('Superposición', color='white')
axes[2].axis('off')
plt.tight_layout()
return fig
except Exception as e:
st.error(f"Error visual: {e}")
return None
# ==========================================
# 2. Carga del Motor en Caché (¡Muy rápido!)
# ==========================================
@st.cache_resource
def cargar_motor():
ruta_reg = os.path.join(ruta_raiz, "regression", "joblib")
ruta_clf = os.path.join(ruta_raiz, "classification", "joblib")
return MotorInferenciaNodeQuant(ruta_reg, ruta_clf)
motor = cargar_motor()
def guardar_archivo_temporal(archivo_subido):
if archivo_subido is not None:
directorio_temp = tempfile.mkdtemp()
ruta_completa = os.path.join(directorio_temp, archivo_subido.name)
with open(ruta_completa, "wb") as f:
f.write(archivo_subido.getbuffer())
return ruta_completa
return None
# ==========================================
# 3. Interfaz Visual
# ==========================================
st.title("🫁 NodeQuant AI")
st.markdown("Sube la tomografía (CT) y la máscara de segmentación para iniciar el análisis radiómico.")
st.divider()
col1, col2 = st.columns(2)
with col1:
archivo_img = st.file_uploader("Tomografía (image.nii.gz)", type=['nii', 'gz'])
with col2:
archivo_mask = st.file_uploader("Máscara (mask.nii.gz)", type=['nii', 'gz'])
# 4. Acción de Procesamiento
if archivo_img and archivo_mask:
if st.button("Procesar Paciente", type="primary", use_container_width=True):
with st.spinner("Analizando biomarcadores y calculando predicciones..."):
# 1. Guardar en temporales
ruta_img = guardar_archivo_temporal(archivo_img)
ruta_mask = guardar_archivo_temporal(archivo_mask)
try:
# 2. Llamada a TU MOTOR
reporte = motor.predecir_paciente(ruta_img, ruta_mask)
# 3. Llamada al Visor Espacial
figura = generar_figura_fusion(ruta_img, ruta_mask)
# 4. Despliegue de Resultados Clínicos (Ajustado solo a Volumen y Riesgo)
st.success("¡Análisis completado exitosamente!")
st.subheader("Reporte Clínico")
m1, m2 = st.columns(2)
# Métrica 1: Volumen Tumoral
vol_str = f"{reporte['volumen']['valor']:,.1f} {reporte['volumen']['unidad']}"
m1.metric("Volumen Tumoral Estimado", vol_str)
# Métrica 2: Riesgo con Lógica de Colores
riesgo = reporte.get('riesgo', 'Desconocido')
iconos_riesgo = {"Bajo": "🟢", "Intermedio": "🟡", "Crítico": "🔴"}
icono = iconos_riesgo.get(riesgo, "⚪")
m2.metric("Riesgo de Malignidad", f"{icono} {riesgo}")
# 5. Despliegue de la imagen de fusión
st.divider()
st.subheader("Visualización Radiológica")
if figura:
st.pyplot(figura, use_container_width=True)
except Exception as e:
st.error(f"Error en el análisis: {e}")