diff --git a/src/model_api/models/classification.py b/src/model_api/models/classification.py index 5e5d0e8e..ecb02b25 100644 --- a/src/model_api/models/classification.py +++ b/src/model_api/models/classification.py @@ -12,14 +12,11 @@ from typing import TYPE_CHECKING import numpy as np -from openvino import Model, Type -from openvino import opset10 as opset -from openvino.preprocess import PrePostProcessor from model_api.models.image_model import ImageModel from model_api.models.parameters import ParameterRegistry from model_api.models.result import ClassificationResult, Label -from model_api.models.utils import softmax +from model_api.models.utils import is_softmaxed, softmax, top_k if TYPE_CHECKING: from model_api.adapters.inference_adapter import InferenceAdapter @@ -95,27 +92,8 @@ def _setup_multilabel(self) -> None: def _setup_single_label(self) -> None: """Configure model for single-label classification with TopK.""" - try: - addOrFindSoftmaxAndTopkOutputs( - self.inference_adapter, - self.params.topk, - self.params.output_raw_scores, - ) - self.embedded_topk = True - self.out_layer_names = ["indices", "scores"] - if self.params.output_raw_scores: - self.out_layer_names.append(self.raw_scores_name) - except (RuntimeError, AttributeError): - # exception means we have a non-ov model - # with already inserted softmax and topk - if self.params.embedded_processing and len(self.outputs) >= 2: - self.embedded_topk = True - self.out_layer_names = ["indices", "scores"] - self.raw_scores_name = _raw_scores_name - else: # likely a non-ov model - self.embedded_topk = False - self.out_layer_names = _get_non_xai_names(self.outputs.keys()) - self.raw_scores_name = self.out_layer_names[0] + self.out_layer_names = _get_non_xai_names(self.outputs.keys()) + self.raw_scores_name = self.out_layer_names[0] self.embedded_processing = True @@ -228,10 +206,9 @@ def get_all_probs(self, logits: np.ndarray) -> np.ndarray: if cls_heads_info["num_multilabel_classes"]: logits_begin = cls_heads_info["num_single_label_classes"] probs[logits_begin:] = sigmoid_numpy(logits[logits_begin:]) - elif self.embedded_topk: - probs = logits.reshape(-1) else: - probs = softmax(logits.reshape(-1)) + logits_flattened = logits.reshape(-1) + probs = logits_flattened if is_softmaxed(logits_flattened, axis=0) else softmax(logits_flattened) return probs def get_hierarchical_predictions(self, logits: np.ndarray) -> list[Label]: @@ -277,68 +254,18 @@ def get_multilabel_predictions(self, logits: np.ndarray) -> list[Label]: return list(starmap(Label, zip(indices, labels, scores))) def get_multiclass_predictions(self, outputs: dict) -> list[Label]: + axis = 1 + logits = outputs[self.out_layer_names[0]] + if not is_softmaxed(logits, axis=axis): + logits = softmax(logits, axis=axis) + top_k_result = top_k(logits, self.params.topk, axis=axis) + scores = top_k_result.values[0] # noqa: PD011 # silencing false positive - it's not pandas code + indices = top_k_result.indices[0] + labels_list = self.params.labels - if self.embedded_topk: - indicesTensor = outputs[self.out_layer_names[0]][0] - scoresTensor = outputs[self.out_layer_names[1]][0] - labels = [labels_list[i] if labels_list else "" for i in indicesTensor] - else: - scoresTensor = softmax(outputs[self.out_layer_names[0]][0]) - indicesTensor = [int(np.argmax(scoresTensor))] - labels = [labels_list[i] if labels_list else "" for i in indicesTensor] - return list(starmap(Label, zip(indicesTensor, labels, scoresTensor))) - - -def addOrFindSoftmaxAndTopkOutputs(inference_adapter: InferenceAdapter, topk: int, output_raw_scores: bool) -> None: - softmaxNode = None - for i in range(len(inference_adapter.model.outputs)): - output_node = inference_adapter.model.get_output_op(i).input(0).get_source_output().get_node() - if output_node.get_type_name() == "Softmax": - softmaxNode = output_node - elif output_node.get_type_name() == "TopK": - return - - if softmaxNode is None: - logitsNode = inference_adapter.model.get_output_op(0).input(0).get_source_output().get_node() - softmaxNode = opset.softmax(logitsNode.output(0), 1) - k = opset.constant(topk, np.int32) - topkNode = opset.topk(softmaxNode, k, 1, "max", "value") - - indices = topkNode.output(0) - scores = topkNode.output(1) - results_descr = [indices, scores] - if output_raw_scores: - raw_scores = softmaxNode.output(0) - results_descr.append(raw_scores) - for output in inference_adapter.model.outputs: - if _saliency_map_name in output.get_names() or _feature_vector_name in output.get_names(): - results_descr.append(output) - - source_rt_info = inference_adapter.get_model().get_rt_info() - inference_adapter.model = Model( - results_descr, - inference_adapter.model.get_parameters(), - "classification", - ) - - if "model_info" in source_rt_info: - source_rt_info = source_rt_info["model_info"] - for k in source_rt_info: - inference_adapter.model.set_rt_info(source_rt_info[k], ["model_info", k]) - - # manually set output tensors name for created topK node - inference_adapter.model.outputs[0].tensor.set_names({"scores"}) - inference_adapter.model.outputs[1].tensor.set_names({"indices"}) - if output_raw_scores: - inference_adapter.model.outputs[2].tensor.set_names({_raw_scores_name}) - - # set output precisions - ppp = PrePostProcessor(inference_adapter.model) - ppp.output("indices").tensor().set_element_type(Type.i32) - ppp.output("scores").tensor().set_element_type(Type.f32) - if output_raw_scores: - ppp.output(_raw_scores_name).tensor().set_element_type(Type.f32) - inference_adapter.model = ppp.build() + labels = [labels_list[i] if labels_list else "" for i in indices] + + return list(starmap(Label, zip(indices, labels, scores))) def sigmoid_numpy(x: np.ndarray) -> np.ndarray: diff --git a/src/model_api/models/utils.py b/src/model_api/models/utils.py index fb7c9cd4..e02b77bb 100644 --- a/src/model_api/models/utils.py +++ b/src/model_api/models/utils.py @@ -5,6 +5,7 @@ from __future__ import annotations +from collections import namedtuple from dataclasses import dataclass from pathlib import Path from typing import TYPE_CHECKING @@ -14,6 +15,9 @@ from model_api.models.result import Contour, InstanceSegmentationResult, RotatedSegmentationResult +topk_namedtuple = namedtuple("topk_namedtuple", ["values", "indices"]) + + if TYPE_CHECKING: from model_api.models.result.detection import DetectionResult @@ -284,6 +288,29 @@ def multiclass_nms( return det, keep +def is_softmaxed(array: np.ndarray, axis: int, atol: float = 1e-5) -> bool: + """Check if the input array is softmaxed along the specified axis.""" + # Check values are in [0, 1] + if not np.all((array >= 0) & (array <= 1)): + return False + # Check sum along axis is close to 1 + sums = np.sum(array, axis=axis) + return np.allclose(sums, 1.0, atol=atol) + + def softmax(logits: np.ndarray, eps: float = 1e-9, axis=None, keepdims: bool = False) -> np.ndarray: exp = np.exp(logits - np.max(logits)) return exp / (np.sum(exp, axis=axis, keepdims=keepdims) + eps) + + +def top_k(array: np.ndarray, k: int, axis: int) -> topk_namedtuple: + """Returns the top k values and their indices along the specified axis.""" + # Get indices of the top k elements + indices = np.take(np.argpartition(array, -k, axis=axis), range(-k, 0), axis=axis) + # Gather the top k values + topk_values = np.take_along_axis(array, indices, axis=axis) + # Sort the top k values and indices in descending order + sorted_order = np.argsort(-topk_values, axis=axis) + topk_values = np.take_along_axis(topk_values, sorted_order, axis=axis) + indices = np.take_along_axis(indices, sorted_order, axis=axis) + return topk_namedtuple(values=topk_values, indices=indices) diff --git a/tests/accuracy/public_scope.json b/tests/accuracy/public_scope.json index 5586aec4..4397ee47 100644 --- a/tests/accuracy/public_scope.json +++ b/tests/accuracy/public_scope.json @@ -346,7 +346,7 @@ { "id": 105, "name": "194", - "confidence": 0.06216677650809288 + "confidence": 0.4564049541950226 } ], "raw_scores": [ diff --git a/tests/accuracy/test_accuracy.py b/tests/accuracy/test_accuracy.py index 4c6d32c1..ea161210 100644 --- a/tests/accuracy/test_accuracy.py +++ b/tests/accuracy/test_accuracy.py @@ -99,7 +99,7 @@ def create_models(model_type, model_path, download_dir, force_onnx_adapter=False model = create_core().read_model(model_path) if model.has_rt_info(["model_info", "model_type"]): wrapper_type = model_type.get_model_class( - create_core().read_model(model_path).get_rt_info(["model_info", "model_type"]).astype(str), + model.get_rt_info(["model_info", "model_type"]).astype(str), ) model = wrapper_type(OpenvinoAdapter(create_core(), model_path, device=device)) model.load()