diff --git a/src/model_api/models/action_classification.py b/src/model_api/models/action_classification.py index 8af041ab..9dd944c8 100644 --- a/src/model_api/models/action_classification.py +++ b/src/model_api/models/action_classification.py @@ -13,7 +13,7 @@ from model_api.models.result import ClassificationResult, Label from .model import Model -from .types import BooleanValue, ListValue, NumericalValue, StringValue +from .parameters import ParameterRegistry from .utils import load_labels if TYPE_CHECKING: @@ -65,26 +65,19 @@ def __init__( self.image_blob_names = self._get_inputs() self.image_blob_name = self.image_blob_names[0] self.nscthw_layout = "NSCTHW" in self.inputs[self.image_blob_name].layout - self.labels: list[str] - self.path_to_labels: str - self.mean_values: list[int | float] - self.pad_value: int - self.resize_type: str - self.reverse_input_channels: bool - self.scale_values: list[int | float] if self.nscthw_layout: self.n, self.s, self.c, self.t, self.h, self.w = self.inputs[self.image_blob_name].shape else: self.n, self.s, self.t, self.h, self.w, self.c = self.inputs[self.image_blob_name].shape - self.resize = RESIZE_TYPES[self.resize_type] + self.resize = RESIZE_TYPES[self.params.resize_type] self.input_transform = InputTransform( - self.reverse_input_channels, - self.mean_values, - self.scale_values, + self.params.reverse_input_channels, + self.params.mean_values, + self.params.scale_values, ) - if self.path_to_labels: - self.labels = load_labels(self.path_to_labels) + if self.params.path_to_labels: + self._labels = load_labels(self.params.path_to_labels) @property def clip_size(self) -> int: @@ -94,39 +87,11 @@ def clip_size(self) -> int: def parameters(cls) -> dict[str, Any]: parameters = super().parameters() parameters.update( - { - "labels": ListValue(description="List of class labels"), - "path_to_labels": StringValue( - description="Path to file with labels. Overrides the labels, if they sets via 'labels' parameter", - ), - "mean_values": ListValue( - description=( - "Normalization values, which will be subtracted from image channels " - "for image-input layer during preprocessing" - ), - default_value=[], - ), - "pad_value": NumericalValue( - int, - min=0, - max=255, - description="Pad value for resize_image_letterbox embedded into a model", - default_value=0, - ), - "resize_type": StringValue( - default_value="standard", - choices=tuple(RESIZE_TYPES.keys()), - description="Type of input image resizing", - ), - "reverse_input_channels": BooleanValue( - default_value=False, - description="Reverse the input channel order", - ), - "scale_values": ListValue( - default_value=[], - description="Normalization values, which will divide the image channels for image-input layer", - ), - }, + ParameterRegistry.merge( + ParameterRegistry.LABELS, + ParameterRegistry.IMAGE_RESIZE, + ParameterRegistry.IMAGE_PREPROCESSING, + ), ) return parameters @@ -193,7 +158,7 @@ def preprocess( "original_shape": inputs.shape, "resized_shape": (self.n, self.s, self.c, self.t, self.h, self.w), } - resized_inputs = [self.resize(frame, (self.w, self.h), pad_value=self.pad_value) for frame in inputs] + resized_inputs = [self.resize(frame, (self.w, self.h), pad_value=self.params.pad_value) for frame in inputs] np_frames = self._change_layout( [self.input_transform(inputs) for inputs in resized_inputs], ) @@ -222,8 +187,9 @@ def postprocess( """Post-process.""" logits = next(iter(outputs.values())).squeeze() index = np.argmax(logits) + labels = self.params.labels return ClassificationResult( - [Label(int(index), self.labels[index], logits[index])], + [Label(int(index), labels[index], logits[index])], np.ndarray(0), np.ndarray(0), np.ndarray(0), diff --git a/src/model_api/models/anomaly.py b/src/model_api/models/anomaly.py index 29eb0443..f3366a82 100644 --- a/src/model_api/models/anomaly.py +++ b/src/model_api/models/anomaly.py @@ -15,8 +15,8 @@ import numpy as np from model_api.models.image_model import ImageModel +from model_api.models.parameters import ParameterRegistry from model_api.models.result import AnomalyResult -from model_api.models.types import ListValue, NumericalValue, StringValue if TYPE_CHECKING: from model_api.adapters.inference_adapter import InferenceAdapter @@ -67,11 +67,6 @@ def __init__( ) -> None: super().__init__(inference_adapter, configuration, preload) self._check_io_number(1, (1, 4)) - self.normalization_scale: float - self.image_threshold: float - self.pixel_threshold: float - self.task: str - self.labels: list[str] def preprocess(self, inputs: np.ndarray) -> list[dict]: """Data preprocess method for Anomalib models. @@ -103,7 +98,7 @@ def preprocess(self, inputs: np.ndarray) -> list[dict]: else: resized_shape = (self.w, self.h, self.c) # For fixed models, use standard preprocessing - if self.embedded_processing: + if self.params.embedded_processing: processed_image = inputs[None] else: # Resize image to expected model input dimensions @@ -148,16 +143,17 @@ def postprocess(self, outputs: dict[str, np.ndarray], meta: dict[str, Any]) -> A anomaly_map = predictions.squeeze() npred_score = anomaly_map.reshape(-1).max() - pred_label = self.labels[1] if npred_score > self.image_threshold else self.labels[0] + labels_list = self.params.labels + pred_label = labels_list[1] if npred_score > self.params.image_threshold else labels_list[0] assert anomaly_map is not None - pred_mask = (anomaly_map >= self.pixel_threshold).astype(np.uint8) - anomaly_map = self._normalize(anomaly_map, self.pixel_threshold) + pred_mask = (anomaly_map >= self.params.pixel_threshold).astype(np.uint8) + anomaly_map = self._normalize(anomaly_map, self.params.pixel_threshold) # normalize - npred_score = self._normalize(npred_score, self.image_threshold) + npred_score = self._normalize(npred_score, self.params.image_threshold) - if pred_label == self.labels[0]: # normal + if pred_label == labels_list[0]: # normal npred_score = 1 - npred_score # Score of normal is 1 - score of anomaly pred_score = npred_score.item() else: @@ -180,7 +176,7 @@ def postprocess(self, outputs: dict[str, np.ndarray], meta: dict[str, Any]) -> A (meta["original_shape"][1], meta["original_shape"][0]), ) - if self.task == "detection": + if self.params.task == "detection": pred_boxes = self._get_boxes(pred_mask) return AnomalyResult( @@ -194,33 +190,13 @@ def postprocess(self, outputs: dict[str, np.ndarray], meta: dict[str, Any]) -> A @classmethod def parameters(cls) -> dict: parameters = super().parameters() - parameters.update( - { - "image_threshold": NumericalValue( - description="Image threshold", - min=0.0, - default_value=0.5, - ), - "pixel_threshold": NumericalValue( - description="Pixel threshold", - min=0.0, - default_value=0.5, - ), - "normalization_scale": NumericalValue( - description="Value used for normalization", - ), - "task": StringValue( - description="Task type", - default_value="segmentation", - ), - "labels": ListValue(description="List of class labels", value_type=str), - }, - ) + parameters.update(ParameterRegistry.ANOMALY) + parameters.update(ParameterRegistry.LABELS) return parameters def _normalize(self, tensor: np.ndarray, threshold: float) -> np.ndarray: """Currently supports only min-max normalization.""" - normalized = ((tensor - threshold) / self.normalization_scale) + 0.5 + normalized = ((tensor - threshold) / self.params.normalization_scale) + 0.5 return np.clip(normalized, 0, 1) @staticmethod diff --git a/src/model_api/models/classification.py b/src/model_api/models/classification.py index c0b1884a..662e84aa 100644 --- a/src/model_api/models/classification.py +++ b/src/model_api/models/classification.py @@ -17,8 +17,8 @@ from openvino.preprocess import PrePostProcessor from model_api.models.image_model import ImageModel +from model_api.models.parameters import ParameterRegistry from model_api.models.result import ClassificationResult, Label -from model_api.models.types import BooleanValue, ListValue, NumericalValue, StringValue from model_api.models.utils import softmax if TYPE_CHECKING: @@ -51,34 +51,26 @@ class ClassificationModel(ImageModel): def __init__(self, inference_adapter: InferenceAdapter, configuration: dict = {}, preload: bool = False) -> None: super().__init__(inference_adapter, configuration, preload=False) - self.topk: int - self.labels: list[str] - self.path_to_labels: str - self.multilabel: bool - self.hierarchical: bool - self.hierarchical_config: str - self.confidence_threshold: float - self.output_raw_scores: bool - self.hierarchical_postproc: str self.labels_resolver: GreedyLabelsResolver | ProbabilisticLabelsResolver self._check_io_number(1, (1, 2, 3, 4, 5)) - if self.path_to_labels: - self.labels = self._load_labels(self.path_to_labels) + if self.params.path_to_labels: + self._labels = self._load_labels(self.params.path_to_labels) if len(self.outputs) == 1: self._verify_single_output() self.raw_scores_name = _raw_scores_name - if self.hierarchical: - self.embedded_processing = True + if self.params.hierarchical: + self._embedded_processing = True self.out_layer_names = _get_non_xai_names(self.outputs.keys()) _append_xai_names(self.outputs.keys(), self.out_layer_names) - if not self.hierarchical_config: + hierarchical_config = self.params.hierarchical_config + if not hierarchical_config: self.raise_error("Hierarchical classification config is empty.") self.raw_scores_name = self.out_layer_names[0] - self.hierarchical_info = json.loads(self.hierarchical_config) + self.hierarchical_info = json.loads(hierarchical_config) - if self.hierarchical_postproc == "probabilistic": + if self.params.hierarchical_postproc == "probabilistic": self.labels_resolver = ProbabilisticLabelsResolver( self.hierarchical_info, ) @@ -89,8 +81,8 @@ def __init__(self, inference_adapter: InferenceAdapter, configuration: dict = {} self.load() return - if self.multilabel: - self.embedded_processing = True + if self.params.multilabel: + self._embedded_processing = True self.out_layer_names = _get_non_xai_names(self.outputs.keys()) _append_xai_names(self.outputs.keys(), self.out_layer_names) self.raw_scores_name = self.out_layer_names[0] @@ -101,17 +93,17 @@ def __init__(self, inference_adapter: InferenceAdapter, configuration: dict = {} try: addOrFindSoftmaxAndTopkOutputs( self.inference_adapter, - self.topk, - self.output_raw_scores, + self.params.topk, + self.params.output_raw_scores, ) self.embedded_topk = True self.out_layer_names = ["indices", "scores"] - if self.output_raw_scores: + if self.params.output_raw_scores: self.out_layer_names.append(self.raw_scores_name) except (RuntimeError, AttributeError): # exception means we have a non-ov model # with already inserted softmax and topk - if self.embedded_processing and len(self.outputs) >= 2: + if self.params.embedded_processing and len(self.outputs) >= 2: self.embedded_topk = True self.out_layer_names = ["indices", "scores"] self.raw_scores_name = _raw_scores_name @@ -150,67 +142,40 @@ def _verify_single_output(self) -> None: "The Classification model wrapper supports topologies only with 4D " "output which has last two dimensions of size 1", ) - if self.labels: - if layer_shape[1] == len(self.labels) + 1: - self.labels.insert(0, "other") + labels = self.params.labels + if labels: + if layer_shape[1] == len(labels) + 1: + labels.insert(0, "other") + self._labels = labels self.logger.warning("\tInserted 'other' label as first.") - if layer_shape[1] > len(self.labels): + if layer_shape[1] > len(labels): self.raise_error( "Model's number of classes must be greater then " - f"number of parsed labels ({layer_shape[1]}, {len(self.labels)})", + f"number of parsed labels ({layer_shape[1]}, {len(labels)})", ) @classmethod def parameters(cls) -> dict: parameters = super().parameters() parameters.update( - { - "topk": NumericalValue( - value_type=int, - default_value=1, - min=1, - description="Number of most likely labels", - ), - "labels": ListValue(description="List of class labels", value_type=str), - "path_to_labels": StringValue( - description="Path to file with labels. Overrides the labels, if they sets via 'labels' parameter", - ), - "multilabel": BooleanValue( - default_value=False, - description="Predict a set of labels per image", - ), - "hierarchical": BooleanValue( - default_value=False, - description="Predict a hierarchy if labels per image", - ), - "hierarchical_config": StringValue( - default_value="", - description="Extra config for decoding hierarchical predictions", - ), - "confidence_threshold": NumericalValue( - default_value=0.5, - description="Predict a set of labels per image", - ), - "output_raw_scores": BooleanValue( - default_value=False, - description="Output all scores for multiclass classification", - ), - "hierarchical_postproc": StringValue( - default_value="greedy", - choices=("probabilistic", "greedy"), - description="Type of hierarchical postprocessing", - ), - }, + ParameterRegistry.merge( + ParameterRegistry.TOP_K, + ParameterRegistry.LABELS, + ParameterRegistry.MULTILABEL, + ParameterRegistry.HIERARCHICAL, + ParameterRegistry.CONFIDENCE_THRESHOLD, + ParameterRegistry.OUTPUT_RAW_SCORES, + ), ) return parameters def postprocess(self, outputs: dict, meta: dict) -> ClassificationResult: del meta # unused - if self.multilabel: + if self.params.multilabel: result = self.get_multilabel_predictions( outputs[self.out_layer_names[0]].squeeze(), ) - elif self.hierarchical: + elif self.params.hierarchical: result = self.get_hierarchical_predictions( outputs[self.out_layer_names[0]].squeeze(), ) @@ -218,7 +183,7 @@ def postprocess(self, outputs: dict, meta: dict) -> ClassificationResult: result = self.get_multiclass_predictions(outputs) raw_scores = np.ndarray(0) - if self.output_raw_scores: + if self.params.output_raw_scores: raw_scores = self.get_all_probs(outputs[self.raw_scores_name]) return ClassificationResult( @@ -233,22 +198,23 @@ def get_saliency_maps(self, outputs: dict) -> np.ndarray: to match the order of labels in .XML meta. """ saliency_maps = outputs.get(_saliency_map_name, np.ndarray(0)) - if not self.hierarchical: + if not self.params.hierarchical: return saliency_maps reordered_saliency_maps: list[list[np.ndarray]] = [[] for _ in range(len(saliency_maps))] model_classes = self.hierarchical_info["cls_heads_info"]["class_to_group_idx"] label_to_model_out_idx = {lbl: i for i, lbl in enumerate(model_classes.keys())} + labels = self.params.labels for batch in range(len(saliency_maps)): - for label in self.labels: + for label in labels: idx = label_to_model_out_idx[label] reordered_saliency_maps[batch].append(saliency_maps[batch][idx]) return np.array(reordered_saliency_maps) def get_all_probs(self, logits: np.ndarray) -> np.ndarray: - if self.multilabel: + if self.params.multilabel: probs = sigmoid_numpy(logits.reshape(-1)) - elif self.hierarchical: + elif self.params.hierarchical: logits = logits.reshape(-1) probs = np.copy(logits) cls_heads_info = self.hierarchical_info["cls_heads_info"] @@ -285,8 +251,9 @@ def get_hierarchical_predictions(self, logits: np.ndarray) -> list[Label]: head_logits = logits[logits_begin:] head_logits = sigmoid_numpy(head_logits) + conf_threshold = self.params.confidence_threshold for i in range(head_logits.shape[0]): - if head_logits[i] > self.confidence_threshold: + if head_logits[i] > conf_threshold: label_str = cls_heads_info["all_groups"][cls_heads_info["num_multiclass_heads"] + i][0] predicted_labels.append(label_str) predicted_scores.append(head_logits[i]) @@ -298,23 +265,26 @@ def get_multilabel_predictions(self, logits: np.ndarray) -> list[Label]: logits = sigmoid_numpy(logits) scores = [] indices = [] + conf_threshold = self.params.confidence_threshold for i in range(logits.shape[0]): - if logits[i] > self.confidence_threshold: + if logits[i] > conf_threshold: indices.append(i) scores.append(logits[i]) - labels = [self.labels[i] if self.labels else "" for i in indices] + labels_list = self.params.labels + labels = [labels_list[i] if labels_list else "" for i in indices] return list(starmap(Label, zip(indices, labels, scores))) def get_multiclass_predictions(self, outputs: dict) -> list[Label]: + labels_list = self.params.labels if self.embedded_topk: indicesTensor = outputs[self.out_layer_names[0]][0] scoresTensor = outputs[self.out_layer_names[1]][0] - labels = [self.labels[i] if self.labels else "" for i in indicesTensor] + labels = [labels_list[i] if labels_list else "" for i in indicesTensor] else: scoresTensor = softmax(outputs[self.out_layer_names[0]][0]) indicesTensor = [int(np.argmax(scoresTensor))] - labels = [self.labels[i] if self.labels else "" for i in indicesTensor] + labels = [labels_list[i] if labels_list else "" for i in indicesTensor] return list(starmap(Label, zip(indicesTensor, labels, scoresTensor))) diff --git a/src/model_api/models/detection_model.py b/src/model_api/models/detection_model.py index 1ee176ae..84b89aba 100644 --- a/src/model_api/models/detection_model.py +++ b/src/model_api/models/detection_model.py @@ -6,8 +6,8 @@ import numpy as np from .image_model import ImageModel +from .parameters import ParameterRegistry from .result import DetectionResult -from .types import ListValue, NumericalValue, StringValue from .utils import load_labels @@ -39,32 +39,23 @@ def __init__(self, inference_adapter, configuration: dict = {}, preload=False): WrapperError: if the model has more than 1 image inputs """ super().__init__(inference_adapter, configuration, preload) - self.path_to_labels: str - self.confidence_threshold: float if not self.image_blob_name: self.raise_error( f"The Wrapper supports only one image input, but {len(self.image_blob_names)} found", ) - if self.path_to_labels: - self.labels = load_labels(self.path_to_labels) + if self.params.path_to_labels: + self._labels = load_labels(self.params.path_to_labels) @classmethod def parameters(cls): parameters = super().parameters() parameters.update( - { - "confidence_threshold": NumericalValue( - default_value=0.5, - description="Probability threshold value for bounding box filtering", - ), - "labels": ListValue(description="List of class labels", value_type=str), - "path_to_labels": StringValue( - description="Path to file with labels. Overrides the labels, if they sets via 'labels' parameter", - ), - }, + ParameterRegistry.merge( + ParameterRegistry.CONFIDENCE_THRESHOLD, + ParameterRegistry.LABELS, + ), ) - return parameters def _resize_detections(self, detection_result: DetectionResult, meta: dict): @@ -82,12 +73,13 @@ def _resize_detections(self, detection_result: DetectionResult, meta: dict): inverted_scale_y = input_img_height / self.h pad_left = 0 pad_top = 0 - if self.resize_type == "fit_to_window" or self.resize_type == "fit_to_window_letterbox": + resize_type = self.params.resize_type + if resize_type == "fit_to_window" or resize_type == "fit_to_window_letterbox": inverted_scale_x = inverted_scale_y = max( inverted_scale_x, inverted_scale_y, ) - if self.resize_type == "fit_to_window_letterbox": + if resize_type == "fit_to_window_letterbox": pad_left = (self.w - round(input_img_widht / inverted_scale_x)) // 2 pad_top = (self.h - round(input_img_height / inverted_scale_y)) // 2 @@ -110,7 +102,7 @@ def _filter_detections(self, detection_result: DetectionResult, box_area_thresho - list of detections with confidence above the threshold """ keep = (detection_result.get_obj_sizes() > box_area_threshold) & ( - detection_result.scores > self.confidence_threshold + detection_result.scores > self.params.confidence_threshold ) detection_result.bboxes = detection_result.bboxes[keep] detection_result.labels = detection_result.labels[keep] diff --git a/src/model_api/models/image_model.py b/src/model_api/models/image_model.py index 9e5dd138..158cf4de 100644 --- a/src/model_api/models/image_model.py +++ b/src/model_api/models/image_model.py @@ -9,7 +9,7 @@ from model_api.adapters.utils import RESIZE_TYPES, InputTransform from model_api.models.model import Model -from model_api.models.types import BooleanValue, ListValue, NumericalValue, StringValue +from model_api.models.parameters import ParameterRegistry if TYPE_CHECKING: import numpy as np @@ -58,15 +58,6 @@ def __init__(self, inference_adapter: InferenceAdapter, configuration: dict = {} super().__init__(inference_adapter, configuration, preload) self.image_blob_names, self.image_info_blob_names = self._get_inputs() self.image_blob_name = self.image_blob_names[0] - self.orig_height: int - self.orig_width: int - self.pad_value: int - self.resize_type: str - self.mean_values: list - self.scale_values: list - self.reverse_input_channels: bool - self.embedded_processing: bool - self.labels: list[str] self.nchw_layout = self.inputs[self.image_blob_name].layout == "NCHW" if self.nchw_layout: @@ -78,77 +69,38 @@ def __init__(self, inference_adapter: InferenceAdapter, configuration: dict = {} if self.h == -1 or self.w == -1: self._is_dynamic = True - self.resize = RESIZE_TYPES[self.resize_type] + self.resize = RESIZE_TYPES[self.params.resize_type] self.input_transform = InputTransform( - self.reverse_input_channels, - self.mean_values, - self.scale_values, + self.params.reverse_input_channels, + self.params.mean_values, + self.params.scale_values, ) layout = self.inputs[self.image_blob_name].layout - if self.embedded_processing: - self.h, self.w = self.orig_height, self.orig_width + if self.params.embedded_processing: + self.h, self.w = self.params.orig_height, self.params.orig_width elif not self._is_dynamic: inference_adapter.embed_preprocessing( layout=layout, - resize_mode=self.resize_type, + resize_mode=self.params.resize_type, interpolation_mode="LINEAR", target_shape=(self.w, self.h), - pad_value=self.pad_value, - brg2rgb=self.reverse_input_channels, - mean=self.mean_values, - scale=self.scale_values, + pad_value=self.params.pad_value, + brg2rgb=self.params.reverse_input_channels, + mean=self.params.mean_values, + scale=self.params.scale_values, ) - self.embedded_processing = True + self._embedded_processing = True self.orig_height, self.orig_width = self.h, self.w @classmethod def parameters(cls) -> dict[str, Any]: parameters = super().parameters() parameters.update( - { - "embedded_processing": BooleanValue( - description="Flag that pre/postprocessing embedded", - default_value=False, - ), - "mean_values": ListValue( - description=( - "Normalization values, which will be subtracted from image " - "channels for image-input layer during preprocessing" - ), - default_value=[], - ), - "orig_height": NumericalValue( - int, - description="Model input height before embedding processing", - default_value=None, - ), - "orig_width": NumericalValue( - int, - description="Model input width before embedding processing", - default_value=None, - ), - "pad_value": NumericalValue( - int, - min=0, - max=255, - description="Pad value for resize_image_letterbox embedded into a model", - default_value=0, - ), - "resize_type": StringValue( - default_value="standard", - choices=tuple(RESIZE_TYPES.keys()), - description="Type of input image resizing", - ), - "reverse_input_channels": BooleanValue( - default_value=False, - description="Reverse the input channel order", - ), - "scale_values": ListValue( - default_value=[], - description="Normalization values, which will divide the image channels for image-input layer", - ), - }, + ParameterRegistry.merge( + ParameterRegistry.IMAGE_PREPROCESSING, + ParameterRegistry.IMAGE_RESIZE, + ), ) return parameters @@ -163,11 +115,12 @@ def get_label_name(self, label_id: int) -> str: Returns: str: label name. """ - if self.labels is None: + labels = self.params.labels + if labels is None: return f"#{label_id}" - if label_id >= len(self.labels): + if label_id >= len(labels): return f"#{label_id}" - return self.labels[label_id] + return labels[label_id] def _get_inputs(self) -> tuple[list[str], ...]: """Defines the model inputs for images and additional info. @@ -222,7 +175,7 @@ def preprocess(self, inputs: np.ndarray) -> list[dict]: """ original_shape = inputs.shape - if self.embedded_processing: + if self.params.embedded_processing: processed_image = inputs[None] if self._is_dynamic: h, w, c = inputs.shape @@ -238,7 +191,7 @@ def preprocess(self, inputs: np.ndarray) -> list[dict]: # Fixed model without embedded preprocessing resized_shape = (self.w, self.h, self.c) - resized_image = self.resize(inputs, (self.w, self.h), pad_value=self.pad_value) + resized_image = self.resize(inputs, (self.w, self.h), pad_value=self.params.pad_value) processed_image = self.input_transform(resized_image) processed_image = self._change_layout(processed_image) diff --git a/src/model_api/models/instance_segmentation.py b/src/model_api/models/instance_segmentation.py index 2b78d471..c6e104af 100644 --- a/src/model_api/models/instance_segmentation.py +++ b/src/model_api/models/instance_segmentation.py @@ -9,8 +9,8 @@ from model_api.adapters.inference_adapter import InferenceAdapter from .image_model import ImageModel +from .parameters import ParameterRegistry from .result import InstanceSegmentationResult -from .types import BooleanValue, ListValue, NumericalValue, StringValue from .utils import load_labels @@ -21,13 +21,8 @@ def __init__(self, inference_adapter: InferenceAdapter, configuration: dict = {} super().__init__(inference_adapter, configuration, preload) self._check_io_number((1, 2), (3, 4, 5, 6, 8)) - self.confidence_threshold: float - self.labels: list[str] - self.path_to_labels: str - self.postprocess_semantic_masks: bool - - if self.path_to_labels: - self.labels = load_labels(self.path_to_labels) + if self.params.path_to_labels: + self._labels = load_labels(self.params.path_to_labels) self.is_segmentoly = len(self.inputs) == 2 self.output_blob_name = self._get_outputs() @@ -35,20 +30,11 @@ def __init__(self, inference_adapter: InferenceAdapter, configuration: dict = {} def parameters(cls) -> dict: parameters = super().parameters() parameters.update( - { - "confidence_threshold": NumericalValue( - default_value=0.5, - description="Probability threshold value for bounding box filtering", - ), - "labels": ListValue(description="List of class labels", value_type=str), - "path_to_labels": StringValue( - description="Path to file with labels. Overrides the labels, if they sets via `labels` parameter", - ), - "postprocess_semantic_masks": BooleanValue( - description="Resize and apply 0.5 threshold to instance segmentation masks", - default_value=True, - ), - }, + ParameterRegistry.merge( + ParameterRegistry.CONFIDENCE_THRESHOLD, + ParameterRegistry.LABELS, + ParameterRegistry.INSTANCE_SEGMENTATION, + ), ) return parameters @@ -160,9 +146,10 @@ def postprocess(self, outputs: dict, meta: dict) -> InstanceSegmentationResult: inputImgHeight / self.orig_height, ) padLeft, padTop = 0, 0 - if self.resize_type == "fit_to_window" or self.resize_type == "fit_to_window_letterbox": + resize_type = self.params.resize_type + if resize_type == "fit_to_window" or resize_type == "fit_to_window_letterbox": invertedScaleX = invertedScaleY = max(invertedScaleX, invertedScaleY) - if self.resize_type == "fit_to_window_letterbox": + if resize_type == "fit_to_window_letterbox": padLeft = (self.orig_width - round(inputImgWidth / invertedScaleX)) // 2 padTop = (self.orig_height - round(inputImgHeight / invertedScaleY)) // 2 @@ -177,10 +164,11 @@ def postprocess(self, outputs: dict, meta: dict) -> InstanceSegmentationResult: ) has_feature_vector_name = _feature_vector_name in self.outputs + labels_list = self.params.labels if has_feature_vector_name: - if not self.labels: + if not labels_list: self.raise_error("Can't get number of classes because labels are empty") - saliency_maps: list = [[] for _ in range(len(self.labels))] + saliency_maps: list = [[] for _ in range(len(labels_list))] else: saliency_maps = [] @@ -192,10 +180,10 @@ def postprocess(self, outputs: dict, meta: dict) -> InstanceSegmentationResult: # Use float64 for area calculation to prevent overflow, then check if area > 1 box_areas = box_widths.astype(np.float64) * box_heights.astype(np.float64) - keep = (scores > self.confidence_threshold) & (box_areas > 1) + keep = (scores > self.params.confidence_threshold) & (box_areas > 1) - if self.labels: - keep &= labels < len(self.labels) + if labels_list: + keep &= labels < len(labels_list) boxes = boxes[keep].astype(np.int32) scores = scores[keep] @@ -204,11 +192,11 @@ def postprocess(self, outputs: dict, meta: dict) -> InstanceSegmentationResult: resized_masks, label_names = [], [] for box, label_idx, raw_mask in zip(boxes, labels, masks): - if self.labels: - label_names.append(self.labels[label_idx]) + if labels_list: + label_names.append(labels_list[label_idx]) raw_cls_mask = raw_mask[label_idx, ...] if self.is_segmentoly else raw_mask - if self.postprocess_semantic_masks or has_feature_vector_name: + if self.params.postprocess_semantic_masks or has_feature_vector_name: resized_mask = _segm_postprocess( box, raw_cls_mask, @@ -217,7 +205,7 @@ def postprocess(self, outputs: dict, meta: dict) -> InstanceSegmentationResult: else: resized_mask = raw_cls_mask - output_mask = resized_mask if self.postprocess_semantic_masks else raw_cls_mask + output_mask = resized_mask if self.params.postprocess_semantic_masks else raw_cls_mask resized_masks.append(output_mask) if has_feature_vector_name: saliency_maps[label_idx - 1].append(resized_mask) diff --git a/src/model_api/models/keypoint_detection.py b/src/model_api/models/keypoint_detection.py index 52fab1b5..90a39997 100644 --- a/src/model_api/models/keypoint_detection.py +++ b/src/model_api/models/keypoint_detection.py @@ -10,8 +10,8 @@ import numpy as np from .image_model import ImageModel +from .parameters import ParameterRegistry from .result import DetectedKeypoints, DetectionResult -from .types import BooleanValue, ListValue class KeypointDetectionModel(ImageModel): @@ -30,7 +30,6 @@ def __init__(self, inference_adapter, configuration: dict = {}, preload=False): """ super().__init__(inference_adapter, configuration, preload) self._check_io_number(1, 2) - self.apply_softmax: bool def postprocess( self, @@ -50,7 +49,7 @@ def postprocess( batch_keypoints, batch_scores = _decode_simcc( encoded_kps[0], encoded_kps[1], - apply_softmax=self.apply_softmax, + apply_softmax=self.params.apply_softmax, ) orig_h, orig_w = meta["original_shape"][:2] kp_scale_h = orig_h / self.h @@ -58,10 +57,11 @@ def postprocess( batch_keypoints = batch_keypoints.squeeze() - if self.resize_type in ["fit_to_window", "fit_to_window_letterbox"]: + resize_type = self.params.resize_type + if resize_type in ["fit_to_window", "fit_to_window_letterbox"]: inverted_scale = max(kp_scale_h, kp_scale_w) kp_scale_h = kp_scale_w = inverted_scale - if self.resize_type == "fit_to_window_letterbox": + if resize_type == "fit_to_window_letterbox": pad_left = (self.w - round(orig_w / inverted_scale)) // 2 pad_top = (self.h - round(orig_h / inverted_scale)) // 2 batch_keypoints -= np.array([pad_left, pad_top]) @@ -74,17 +74,10 @@ def postprocess( def parameters(cls) -> dict: parameters = super().parameters() parameters.update( - { - "labels": ListValue( - description="List of class labels", - value_type=str, - default_value=[], - ), - "apply_softmax": BooleanValue( - default_value=True, - description="Whether to apply softmax on the heatmap.", - ), - }, + ParameterRegistry.merge( + ParameterRegistry.LABELS, + ParameterRegistry.SOFTMAX, + ), ) return parameters diff --git a/src/model_api/models/model.py b/src/model_api/models/model.py index f771b091..b6bf79c4 100644 --- a/src/model_api/models/model.py +++ b/src/model_api/models/model.py @@ -17,6 +17,7 @@ get_user_config, ) from model_api.metrics import PerformanceMetrics +from model_api.models.parameters import ParameterDescriptor if TYPE_CHECKING: from os import PathLike @@ -55,9 +56,11 @@ class Model: inputs (dict): keeps the model inputs names and `Metadata` structure for each one outputs (dict): keeps the model outputs names and `Metadata` structure for each one model_loaded (bool): a flag whether the model is loaded to device + params (ParameterDescriptor): provides attribute-style access to model parameters """ __model__: str = "Model" + params = ParameterDescriptor() # Class-level descriptor def __init__(self, inference_adapter: InferenceAdapter, configuration: dict = {}, preload: bool = False) -> None: """Model constructor @@ -90,13 +93,40 @@ def __init__(self, inference_adapter: InferenceAdapter, configuration: dict = {} self.inputs = self.inference_adapter.get_input_layers() self.outputs = self.inference_adapter.get_output_layers() - for name, parameter in self.parameters().items(): - self.__setattr__(name, parameter.default_value) + self._parameters_cache: dict | None = None self._load_config(configuration) self.model_loaded = False if preload: self.load() self.callback_fn = lambda _: None + # params is now a class-level descriptor, no need to instantiate + + def get_param(self, name: str) -> Any: + """Gets a parameter value, either from instance attribute (if set by config) or from parameter default. + + Args: + name (str): parameter name + + Returns: + Any: parameter value + """ + if hasattr(self, f"_{name}"): + return getattr(self, f"_{name}") + if self._parameters_cache is None: + self._parameters_cache = self.parameters() + if name in self._parameters_cache: + return self._parameters_cache[name].default_value + return self.raise_error(f"Parameter '{name}' not found") + + def get_cached_parameters(self) -> dict[str, Any]: + """Get cached parameters, initializing cache if needed. + + Returns: + dict: Dictionary of parameter definitions + """ + if self._parameters_cache is None: + self._parameters_cache = self.parameters() + return self._parameters_cache def get_model(self) -> Any: """ @@ -287,7 +317,7 @@ def _load_config(self, config: dict[str, Any]) -> None: value = param.from_str( self.inference_adapter.get_rt_info(["model_info", name]).astype(str), ) - self.__setattr__(name, value) + self.__setattr__(f"_{name}", value) except RuntimeError as error: missing_rt_info = "Cannot get runtime attribute. Path to runtime attribute is incorrect." in str(error) if not missing_rt_info: @@ -304,7 +334,7 @@ def _load_config(self, config: dict[str, Any]) -> None: self.logger.error(f"\t{_error}") self.raise_error("Incorrect user configuration") value = parameters[name].get_value(value) - self.__setattr__(name, value) + self.__setattr__(f"_{name}", value) else: self.logger.warning( f'The parameter "{name}" not found in {self.__model__} wrapper, will be omitted', @@ -602,7 +632,9 @@ def save(self, path: str, weights_path: str | None = None, version: str | None = "model_type": self.__model__, } for name in self.parameters(): - model_info[name] = getattr(self, name) + value = getattr(self.params, name) + if value is not None: + model_info[name] = value self.inference_adapter.update_model_info(model_info) self.inference_adapter.save_model(path, weights_path, version) diff --git a/src/model_api/models/parameters.py b/src/model_api/models/parameters.py new file mode 100644 index 00000000..54dc830b --- /dev/null +++ b/src/model_api/models/parameters.py @@ -0,0 +1,273 @@ +# +# Copyright (C) 2020-2025 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +# + +"""Parameter registry and decorators for model configuration.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, ClassVar + +from model_api.adapters.utils import RESIZE_TYPES +from model_api.models.types import BooleanValue, ListValue, NumericalValue, StringValue + +if TYPE_CHECKING: + from model_api.models.model import Model + + +class ParameterDescriptor: + """Descriptor that provides parameter access for models.""" + + def __get__(self, obj, objtype=None): + """Return ParameterView when accessed from model instance.""" + if obj is None: + return self + return ParameterView(obj) + + +class ParameterView: + """Provides attribute-style access to model parameters.""" + + def __init__(self, model: "Model") -> None: + self._model = model + + def __getattr__(self, name: str) -> Any: + """Get parameter value by attribute name.""" + try: + return self._model.get_param(name) + except Exception as e: + msg = f"Parameter '{name}' not found" + raise AttributeError(msg) from e + + def __dir__(self) -> list[str]: + """Return available parameter names""" + try: + parameters = self._model.get_cached_parameters() + return list(parameters.keys()) + except (AttributeError, TypeError, ValueError): + return [] + + +class ParameterRegistry: + """Registry for common parameter groups used across models. + + This centralizes parameter definitions to reduce duplication and ensure + consistency across model classes. + """ + + # Confidence threshold for filtering predictions + CONFIDENCE_THRESHOLD: ClassVar[dict[str, Any]] = { + "confidence_threshold": NumericalValue( + default_value=0.5, + description="Probability threshold value for filtering", + ), + } + + # Label-related parameters + LABELS: ClassVar[dict[str, Any]] = { + "labels": ListValue( + description="List of class labels", + value_type=str, + ), + "path_to_labels": StringValue( + description="Path to file with labels. Overrides the labels parameter if both are provided", + ), + } + + # Image preprocessing parameters + IMAGE_PREPROCESSING: ClassVar[dict[str, Any]] = { + "embedded_processing": BooleanValue( + description="Flag that pre/postprocessing is embedded in the model", + default_value=False, + ), + "mean_values": ListValue( + description="Normalization values to subtract from image channels during preprocessing", + default_value=[], + ), + "scale_values": ListValue( + description="Scale values to divide image channels during preprocessing", + default_value=[], + ), + "reverse_input_channels": BooleanValue( + default_value=False, + description="Reverse the input channel order (e.g., RGB to BGR)", + ), + } + + # Image resizing parameters + IMAGE_RESIZE: ClassVar[dict[str, Any]] = { + "resize_type": StringValue( + default_value="standard", + choices=tuple(RESIZE_TYPES.keys()), + description="Type of input image resizing", + ), + "pad_value": NumericalValue( + int, + min=0, + max=255, + description="Pad value for resize_image_letterbox embedded into a model", + default_value=0, + ), + "orig_height": NumericalValue( + int, + description="Model input height before embedding processing", + default_value=None, + ), + "orig_width": NumericalValue( + int, + description="Model input width before embedding processing", + default_value=None, + ), + } + + # Top-k classification parameters + TOP_K: ClassVar[dict[str, Any]] = { + "topk": NumericalValue( + value_type=int, + default_value=1, + min=1, + description="Number of most likely labels to return", + ), + } + + # Multi-label classification parameters + MULTILABEL: ClassVar[dict[str, Any]] = { + "multilabel": BooleanValue( + default_value=False, + description="Predict a set of labels per image", + ), + } + + # Hierarchical classification parameters + HIERARCHICAL: ClassVar[dict[str, Any]] = { + "hierarchical": BooleanValue( + default_value=False, + description="Predict a hierarchy of labels per image", + ), + "hierarchical_config": StringValue( + default_value="", + description="Extra config for decoding hierarchical predictions", + ), + "hierarchical_postproc": StringValue( + default_value="greedy", + choices=("probabilistic", "greedy"), + description="Type of hierarchical postprocessing", + ), + } + + # Output control parameters + OUTPUT_RAW_SCORES: ClassVar[dict[str, Any]] = { + "output_raw_scores": BooleanValue( + default_value=False, + description="Output all scores for multiclass classification", + ), + } + + # Segmentation parameters + SEGMENTATION_POSTPROCESS: ClassVar[dict[str, Any]] = { + "blur_strength": NumericalValue( + value_type=int, + description="Blurring kernel size. -1 value means no blurring and no soft_threshold", + default_value=-1, + ), + "soft_threshold": NumericalValue( + value_type=float, + description="Probability threshold for pixel filtering. -inf means no thresholding", + default_value=float("-inf"), + ), + "return_soft_prediction": BooleanValue( + description="Return raw resized model prediction in addition to processed one", + default_value=True, + ), + } + + # Instance segmentation parameters + INSTANCE_SEGMENTATION: ClassVar[dict[str, Any]] = { + "postprocess_semantic_masks": BooleanValue( + description="Resize and apply 0.5 threshold to instance segmentation masks", + default_value=True, + ), + } + + # NMS parameters + NMS: ClassVar[dict[str, Any]] = { + "iou_threshold": NumericalValue( + default_value=0.5, + description="Threshold for non-maximum suppression (NMS) intersection over union (IOU) filtering", + ), + } + + # Anomaly detection parameters + ANOMALY: ClassVar[dict[str, Any]] = { + "image_threshold": NumericalValue( + description="Image-level anomaly threshold", + default_value=0.5, + ), + "pixel_threshold": NumericalValue( + description="Pixel-level anomaly threshold", + default_value=0.5, + ), + "normalization_scale": NumericalValue( + description="Scale factor for normalization", + default_value=1.0, + ), + "task": StringValue( + description="Task type: classification, segmentation, or detection", + default_value="classification", + choices=("classification", "segmentation", "detection"), + ), + } + + # Tiler base parameters + TILER: ClassVar[dict[str, Any]] = { + "tile_size": NumericalValue( + value_type=int, + default_value=400, + min=1, + description="Size of one tile", + ), + "tiles_overlap": NumericalValue( + value_type=float, + default_value=0.5, + min=0.0, + max=1.0, + description="Overlap of tiles", + ), + "tile_with_full_img": BooleanValue( + default_value=True, + description="Whether to include full image as a tile", + ), + } + + # Detection tiler parameters + DETECTION_TILER: ClassVar[dict[str, Any]] = { + "max_pred_number": NumericalValue( + value_type=int, + default_value=100, + min=1, + description="Maximum numbers of prediction per image", + ), + "iou_threshold": NumericalValue( + value_type=float, + default_value=0.45, + min=0, + max=1.0, + description="IoU threshold which is used to apply NMS to bounding boxes", + ), + } + + SOFTMAX: ClassVar[dict[str, Any]] = { + "apply_softmax": BooleanValue( + default_value=True, + description="Whether to apply softmax on the heatmap.", + ), + } + + @staticmethod + def merge(*param_groups: dict[str, Any]) -> dict[str, Any]: + """Merge multiple parameter groups into a single dictionary.""" + result = {} + for group in param_groups: + result.update(group) + return result diff --git a/src/model_api/models/sam_models.py b/src/model_api/models/sam_models.py index 7a30d922..55fb805e 100644 --- a/src/model_api/models/sam_models.py +++ b/src/model_api/models/sam_models.py @@ -34,7 +34,6 @@ def __init__( super().__init__(inference_adapter, configuration, preload) self.output_name: str = next(iter(self.outputs.keys())) self.resize_type: str - self.image_size: int @classmethod def parameters(cls) -> dict[str, Any]: @@ -57,7 +56,7 @@ def preprocess( ) -> list[dict]: """Update meta for image encoder.""" dict_inputs, meta = super().preprocess(inputs) - meta["resize_type"] = self.resize_type + meta["resize_type"] = self.params.resize_type return [dict_inputs, meta] def postprocess( @@ -83,9 +82,6 @@ def __init__( self.mask_input = np.zeros((1, 1, 256, 256), dtype=np.float32) self.has_mask_input = np.zeros((1, 1), dtype=np.float32) - self.image_size: int - self.mask_threshold: float - self.embed_dim: int @classmethod def parameters(cls) -> dict[str, Any]: @@ -171,7 +167,7 @@ def apply_coords( ) -> np.ndarray: """Process coords according to preprocessed image size using image meta.""" old_h, old_w = orig_size - new_h, new_w = self._get_preprocess_shape(old_h, old_w, self.image_size) + new_h, new_w = self._get_preprocess_shape(old_h, old_w, self.params.image_size) coords = deepcopy(coords).astype(np.float32) coords[..., 0] = coords[..., 0] * (new_w / old_w) coords[..., 1] = coords[..., 1] * (new_h / old_h) @@ -219,7 +215,7 @@ def postprocess( """ outputs = deepcopy(outputs) probability = np.clip(outputs["scores"], 0.0, 1.0) - hard_prediction = outputs[self.output_blob_name].squeeze(0) > self.mask_threshold + hard_prediction = outputs[self.output_blob_name].squeeze(0) > self.params.mask_threshold soft_prediction = hard_prediction * probability.reshape(-1, 1, 1) outputs["hard_prediction"] = hard_prediction diff --git a/src/model_api/models/segmentation.py b/src/model_api/models/segmentation.py index 167fc8d6..96c7fd27 100644 --- a/src/model_api/models/segmentation.py +++ b/src/model_api/models/segmentation.py @@ -11,8 +11,8 @@ import numpy as np from model_api.models.image_model import ImageModel +from model_api.models.parameters import ParameterRegistry from model_api.models.result import Contour, ImageResultWithSoftPrediction -from model_api.models.types import BooleanValue, ListValue, NumericalValue, StringValue from model_api.models.utils import load_labels if TYPE_CHECKING: @@ -77,13 +77,8 @@ class SegmentationModel(ImageModel): def __init__(self, inference_adapter: InferenceAdapter, configuration: dict = {}, preload: bool = False) -> None: super().__init__(inference_adapter, configuration, preload) self._check_io_number(1, (1, 2)) - self.labels: list[str] - self.path_to_labels: str - self.blur_strength: int - self.soft_threshold: float - self.return_soft_prediction: bool - if self.path_to_labels: - self.labels = load_labels(self.path_to_labels) + if self.params.path_to_labels: + self._labels = load_labels(self.params.path_to_labels) self.output_blob_name = self._get_outputs() @@ -116,29 +111,10 @@ def _get_outputs(self) -> str: def parameters(cls) -> dict: parameters = super().parameters() parameters.update( - { - "labels": ListValue(description="List of class labels", value_type=str), - "path_to_labels": StringValue( - description="Path to file with labels. Overrides the labels, if they sets via 'labels' parameter", - ), - "blur_strength": NumericalValue( - value_type=int, - description="Blurring kernel size. -1 value means no blurring and no soft_threshold", - default_value=-1, - ), - "soft_threshold": NumericalValue( - value_type=float, - description=( - "Probability threshold value for bounding box filtering. " - "inf value means no blurring and no soft_threshold" - ), - default_value=float("-inf"), - ), - "return_soft_prediction": BooleanValue( - description="Return raw resized model prediction in addition to processed one", - default_value=True, - ), - }, + ParameterRegistry.merge( + ParameterRegistry.LABELS, + ParameterRegistry.SEGMENTATION_POSTPROCESS, + ), ) return parameters @@ -154,8 +130,8 @@ def postprocess(self, outputs: dict, meta: dict) -> ImageResultWithSoftPredictio hard_prediction = create_hard_prediction_from_soft_prediction( soft_prediction=soft_prediction, - soft_threshold=self.soft_threshold, - blur_strength=self.blur_strength, + soft_threshold=self.params.soft_threshold, + blur_strength=self.params.blur_strength, ) hard_prediction = cv2.resize( @@ -166,7 +142,7 @@ def postprocess(self, outputs: dict, meta: dict) -> ImageResultWithSoftPredictio interpolation=cv2.INTER_NEAREST, ) - if self.return_soft_prediction: + if self.params.return_soft_prediction: soft_prediction = cv2.resize( soft_prediction, (input_image_width, input_image_height), diff --git a/src/model_api/models/types.py b/src/model_api/models/types.py index 6b7bc29d..5b0eab45 100644 --- a/src/model_api/models/types.py +++ b/src/model_api/models/types.py @@ -66,6 +66,8 @@ def from_str(self, value: str) -> Any: return self.default_value if not value and self.default_value is None: return None + if value == "None": + return None return self.value_type(value) def validate(self, value): @@ -121,7 +123,9 @@ def __init__( msg = f"Incorrect option in choice list - {choice}." raise ValueError(msg) - def from_str(self, value: str) -> str: + def from_str(self, value: str) -> str | None: + if value == "None": + return None return value def validate(self, value): @@ -155,7 +159,9 @@ class BooleanValue(BaseValue): def __init__(self, **kwargs) -> None: super().__init__(**kwargs) - def from_str(self, value: str) -> bool: + def from_str(self, value: str) -> bool | None: + if value == "None": + return None return value == "YES" or value == "True" def validate(self, value): @@ -181,7 +187,9 @@ def __init__( super().__init__(description, default_value) self.value_type = value_type - def from_str(self, value: str) -> list[Any]: + def from_str(self, value: str) -> list[Any] | None: + if value == "None": + return None if self.value_type is str or self.value_type is StringValue: return value.split() try: @@ -248,3 +256,26 @@ def validate(self, value): ), ) return errors + + +def get_python_type(base_value: BaseValue) -> type | type[Any]: + """Helper function that extracts the corresponding Python type from a BaseValue instance. + + Args: + base_value: An instance of BaseValue or its subclass + + Returns: + The corresponding Python type (int, float, bool, str, list, dict, or object) + """ + if isinstance(base_value, NumericalValue): + return base_value.value_type if hasattr(base_value, "value_type") else float + if isinstance(base_value, BooleanValue): + return bool + if isinstance(base_value, StringValue): + return str + if isinstance(base_value, ListValue): + return list + if isinstance(base_value, DictValue): + return dict + + return object diff --git a/src/model_api/models/visual_prompting.py b/src/model_api/models/visual_prompting.py index 3437b6ee..7958ed4e 100644 --- a/src/model_api/models/visual_prompting.py +++ b/src/model_api/models/visual_prompting.py @@ -170,7 +170,7 @@ def __init__( raise ValueError(msg) self._num_bg_points: int = 1 self._default_threshold_target: float = 0.0 - self._image_size: int = self.encoder.image_size + self._image_size: int = self.encoder.params.image_size self._downsizing: int = 64 self._default_threshold_reference: float = 0.3 @@ -288,7 +288,7 @@ def learn( feats=processed_embedding, masks=ref_mask, threshold_mask=cur_default_threshold_reference, - image_size=self.encoder.image_size, + image_size=self.encoder.params.image_size, ) cur_default_threshold_reference -= 0.05 @@ -429,7 +429,7 @@ def infer( def reset_reference_info(self) -> None: """Initialize reference information.""" self._reference_features = np.zeros( - (0, 1, self.decoder.embed_dim), + (0, 1, self.decoder.params.embed_dim), dtype=np.float32, ) self._used_indices = np.array([], dtype=np.int64) @@ -534,7 +534,7 @@ def _predict_masks( prediction["iou_predictions"], prediction["low_res_masks"], ) - masks = upscaled_masks > self.decoder.mask_threshold + masks = upscaled_masks > self.decoder.params.mask_threshold _, masks, _ = _decide_masks(masks, logits, scores) return {"upscaled_masks": masks} diff --git a/src/model_api/models/yolo.py b/src/model_api/models/yolo.py index a7a17bbf..f958a35a 100644 --- a/src/model_api/models/yolo.py +++ b/src/model_api/models/yolo.py @@ -10,8 +10,9 @@ from model_api.adapters.utils import INTERPOLATION_TYPES, resize_image_ocv from .detection_model import DetectionModel +from .parameters import ParameterRegistry from .result import DetectionResult -from .types import BooleanValue, ListValue, NumericalValue +from .types import BooleanValue, ListValue from .utils import clip_detections, multiclass_nms, nms DetectionBox = namedtuple("DetectionBox", ["x", "y", "w", "h"]) @@ -177,14 +178,7 @@ def _get_output_info(self): @classmethod def parameters(cls): parameters = super().parameters() - parameters.update( - { - "iou_threshold": NumericalValue( - default_value=0.5, - description="Threshold for non-maximum suppression (NMS) intersection over union (IOU) filtering", - ), - }, - ) + parameters.update(ParameterRegistry.NMS) parameters["resize_type"].update_default_value("fit_to_window_letterbox") parameters["confidence_threshold"].update_default_value(0.5) return parameters @@ -210,7 +204,7 @@ def _parse_yolo_region(self, predictions, input_size, params) -> DetectionResult class_probabilities = self._get_probabilities(prediction, params.classes) # filter out the proposals with low confidence score - keep_idxs = np.nonzero(class_probabilities > self.confidence_threshold)[0] + keep_idxs = np.nonzero(class_probabilities > self.params.confidence_threshold)[0] class_probabilities = class_probabilities[keep_idxs] obj_indx = keep_idxs // params.classes class_idx = keep_idxs % params.classes @@ -367,7 +361,7 @@ def _parse_outputs(self, outputs, meta) -> DetectionResult: scores=scores, ) - return self._filter(detection_result, self.iou_threshold) # type: ignore[attr-defined] + return self._filter(detection_result, self.params.iou_threshold) class YoloV4(YOLO): @@ -527,14 +521,9 @@ def __init__(self, inference_adapter, configuration: dict = {}, preload=False): @classmethod def parameters(cls): parameters = super().parameters() - parameters.update( - { - "iou_threshold": NumericalValue( - default_value=0.65, - description="Threshold for non-maximum suppression (NMS) intersection over union (IOU) filtering", - ), - }, - ) + parameters.update(ParameterRegistry.NMS) + # Override default iou_threshold for YOLOX + parameters["iou_threshold"].update_default_value(0.65) parameters["confidence_threshold"].update_default_value(0.5) return parameters @@ -572,11 +561,12 @@ def postprocess(self, outputs, meta) -> DetectionResult: output[..., :2] = (output[..., :2] + self.grids) * self.expanded_strides output[..., 2:4] = np.exp(output[..., 2:4]) * self.expanded_strides - valid_predictions = output[output[..., 4] > self.confidence_threshold] + conf_threshold = self.params.confidence_threshold + valid_predictions = output[output[..., 4] > conf_threshold] valid_predictions[:, 5:] *= valid_predictions[:, 4:5] boxes = xywh2xyxy(valid_predictions[:, :4]) / meta["scale"] - i, j = (valid_predictions[:, 5:] > self.confidence_threshold).nonzero() + i, j = (valid_predictions[:, 5:] > conf_threshold).nonzero() x_mins, y_mins, x_maxs, y_maxs = boxes[i].T scores = valid_predictions[i, j + 5] @@ -586,7 +576,7 @@ def postprocess(self, outputs, meta) -> DetectionResult: x_maxs, y_maxs, scores, - self.iou_threshold, # type: ignore[attr-defined] + self.params.iou_threshold, include_boundaries=True, ) @@ -633,7 +623,7 @@ def __init__(self, inference_adapter, configuration: dict = {}, preload=False): self.indices_blob_name, ) = self._get_outputs() - if self.embedded_processing: + if self.params.embedded_processing: layout = "NHWC" if self.nchw_layout else "NCHW" inference_adapter.embed_preprocessing( image_layout=layout, @@ -681,7 +671,7 @@ def preprocess(self, inputs): dict_inputs = {} meta = {"original_shape": image.shape} - if self.embedded_processing: + if self.params.embedded_processing: meta.update({"resized_shape": (self.w, self.h)}) dict_inputs = { @@ -737,7 +727,7 @@ def _parse_outputs(self, outputs) -> DetectionResult: x_maxs = _boxes[:, 3] y_maxs = _boxes[:, 2] _boxes = np.stack((x_mins, y_mins, x_maxs, y_maxs)).T - mask = np.array(out_scores) > self.confidence_threshold + mask = np.array(out_scores) > self.params.confidence_threshold if mask.size == 0: return DetectionResult( @@ -793,15 +783,11 @@ def parameters(cls): ), default_value=False, ), - "iou_threshold": NumericalValue( - float, - min=0.0, - max=1.0, - default_value=0.7, - description="Threshold for non-maximum suppression (NMS) intersection over union (IOU) filtering", - ), }, ) + parameters.update(ParameterRegistry.NMS) + # Override default iou_threshold for YOLOv5 + parameters["iou_threshold"].update_default_value(0.7) return parameters def postprocess(self, outputs, meta) -> DetectionResult: @@ -818,7 +804,8 @@ def postprocess(self, outputs, meta) -> DetectionResult: msg = "the first dim of the output must be 1" raise RuntimeError(msg) LABELS_START = 4 - filtered = prediction[0].T[(prediction[:, LABELS_START:] > self.confidence_threshold).any(1)[0]] + conf_threshold = self.params.confidence_threshold + filtered = prediction[0].T[(prediction[:, LABELS_START:] > conf_threshold).any(1)[0]] confidences = filtered[:, LABELS_START:] labels = confidences.argmax(1, keepdims=True) confidences = np.take_along_axis(confidences, labels, 1) @@ -828,6 +815,7 @@ def postprocess(self, outputs, meta) -> DetectionResult: dtype=np.float32, ) keep_top_k = 30000 + iou_threshold = self.params.iou_threshold if self.agnostic_nms: # type: ignore[attr-defined] boxes = boxes[ nms( @@ -836,12 +824,12 @@ def postprocess(self, outputs, meta) -> DetectionResult: boxes[:, 4], boxes[:, 5], boxes[:, 1], - self.iou_threshold, # type: ignore[attr-defined] + iou_threshold, # type: ignore[attr-defined] keep_top_k=keep_top_k, ) ] else: - boxes, _ = multiclass_nms(boxes, self.iou_threshold, keep_top_k) # type: ignore[attr-defined] + boxes, _ = multiclass_nms(boxes, iou_threshold, keep_top_k) # type: ignore[attr-defined] inputImgWidth = meta["original_shape"][1] inputImgHeight = meta["original_shape"][0] invertedScaleX, invertedScaleY = ( @@ -849,9 +837,10 @@ def postprocess(self, outputs, meta) -> DetectionResult: inputImgHeight / self.orig_height, ) padLeft, padTop = 0, 0 - if self.resize_type == "fit_to_window" or self.resize_type == "fit_to_window_letterbox": + resize_type = self.params.resize_type + if resize_type == "fit_to_window" or resize_type == "fit_to_window_letterbox": invertedScaleX = invertedScaleY = max(invertedScaleX, invertedScaleY) - if self.resize_type == "fit_to_window_letterbox": + if resize_type == "fit_to_window_letterbox": padLeft = (self.orig_width - round(inputImgWidth / invertedScaleX)) // 2 padTop = (self.orig_height - round(inputImgHeight / invertedScaleY)) // 2 coords = boxes[:, 2:] diff --git a/src/model_api/tilers/detection.py b/src/model_api/tilers/detection.py index 4c570a11..9b6fb366 100644 --- a/src/model_api/tilers/detection.py +++ b/src/model_api/tilers/detection.py @@ -7,7 +7,7 @@ import numpy as np from model_api.models import DetectionResult -from model_api.models.types import NumericalValue +from model_api.models.parameters import ParameterRegistry from model_api.models.utils import multiclass_nms from .tiler import Tiler @@ -30,23 +30,7 @@ def parameters(cls): - the dictionary with defined wrapper tiler parameters """ parameters = super().parameters() - parameters.update( - { - "max_pred_number": NumericalValue( - value_type=int, - default_value=100, - min=1, - description="Maximum numbers of prediction per image", - ), - "iou_threshold": NumericalValue( - value_type=float, - default_value=0.45, - min=0, - max=1.0, - description="IoU threshold which is used to apply NMS to bounding boxes", - ), - }, - ) + parameters.update(ParameterRegistry.DETECTION_TILER) return parameters def _postprocess_tile( diff --git a/src/model_api/tilers/instance_segmentation.py b/src/model_api/tilers/instance_segmentation.py index 302a1815..7bcc30f6 100644 --- a/src/model_api/tilers/instance_segmentation.py +++ b/src/model_api/tilers/instance_segmentation.py @@ -192,13 +192,13 @@ def __call__(self, inputs): def setup_maskrcnn(*args, **kwds): postprocess_state = None if isinstance(self.model, MaskRCNNModel): - postprocess_state = self.model.postprocess_semantic_masks - self.model.postprocess_semantic_masks = False + postprocess_state = self.model.params.postprocess_semantic_masks + self.model._postprocess_semantic_masks = False # noqa: SLF001 try: yield finally: if isinstance(self.model, MaskRCNNModel): - self.model.postprocess_semantic_masks = postprocess_state + self.model._postprocess_semantic_masks = postprocess_state # noqa: SLF001 with setup_maskrcnn(): return super().__call__(inputs) diff --git a/src/model_api/tilers/semantic_segmentation.py b/src/model_api/tilers/semantic_segmentation.py index 22ce6589..32f40990 100644 --- a/src/model_api/tilers/semantic_segmentation.py +++ b/src/model_api/tilers/semantic_segmentation.py @@ -51,7 +51,7 @@ def _merge_results( ImageResultWithSoftPrediction: merged predictions """ height, width = shape[:2] - num_classes = len(self.model.labels) + num_classes = len(self.model.params.labels) full_logits_mask = np.zeros((height, width, num_classes), dtype=np.float32) vote_mask = np.zeros((height, width), dtype=np.int32) for result in results: @@ -74,13 +74,13 @@ def __call__(self, inputs): def setup_segm_model(): return_soft_prediction_state = None if isinstance(self.model, SegmentationModel): - return_soft_prediction_state = self.model.return_soft_prediction - self.model.return_soft_prediction = True + return_soft_prediction_state = self.model.params.return_soft_prediction + self.model._return_soft_prediction = True # noqa: SLF001 try: yield finally: if isinstance(self.model, SegmentationModel): - self.model.return_soft_prediction = return_soft_prediction_state + self.model._return_soft_prediction = return_soft_prediction_state # noqa: SLF001 with setup_segm_model(): return super().__call__(inputs) diff --git a/src/model_api/tilers/tiler.py b/src/model_api/tilers/tiler.py index d9a9468b..20cf929a 100644 --- a/src/model_api/tilers/tiler.py +++ b/src/model_api/tilers/tiler.py @@ -7,7 +7,7 @@ import logging as log from itertools import product -from model_api.models.types import BooleanValue, NumericalValue +from model_api.models.parameters import ParameterRegistry from model_api.pipelines import AsyncPipeline @@ -63,29 +63,7 @@ def parameters(cls): Returns: - the dictionary with defined wrapper tiler parameters """ - parameters = {} - parameters.update( - { - "tile_size": NumericalValue( - value_type=int, - default_value=400, - min=1, - description="Size of one tile", - ), - "tiles_overlap": NumericalValue( - value_type=float, - default_value=0.5, - min=0.0, - max=1.0, - description="Overlap of tiles", - ), - "tile_with_full_img": BooleanValue( - default_value=True, - description="Whether to include full image as a tile", - ), - }, - ) - return parameters + return ParameterRegistry.TILER.copy() def _load_config(self, config): """Reads the configuration and creates data attributes diff --git a/tests/functional/test_save.py b/tests/functional/test_save.py index 13b2f41e..e62756b3 100644 --- a/tests/functional/test_save.py +++ b/tests/functional/test_save.py @@ -23,7 +23,7 @@ def test_detector_save(tmp_path, data): assert deserialized.get_model().get_rt_info(["model_info", "embedded_processing"]).astype(bool) assert type(detector) is type(deserialized) for attr in detector.parameters(): - assert getattr(detector, attr) == getattr(deserialized, attr) + assert getattr(detector.params, attr) == getattr(deserialized.params, attr) def test_classifier_save(tmp_path, data): @@ -37,7 +37,7 @@ def test_classifier_save(tmp_path, data): assert deserialized.get_model().get_rt_info(["model_info", "embedded_processing"]).astype(bool) assert type(classifier) is type(deserialized) for attr in classifier.parameters(): - assert getattr(classifier, attr) == getattr(deserialized, attr) + assert getattr(classifier.params, attr) == getattr(deserialized.params, attr) def test_segmentor_save(tmp_path, data): @@ -51,7 +51,7 @@ def test_segmentor_save(tmp_path, data): assert deserialized.get_model().get_rt_info(["model_info", "embedded_processing"]).astype(bool) assert type(segmenter) is type(deserialized) for attr in segmenter.parameters(): - assert getattr(segmenter, attr) == getattr(deserialized, attr) + assert getattr(segmenter.params, attr) == getattr(deserialized.params, attr) def test_onnx_save(tmp_path, data): @@ -69,7 +69,7 @@ def test_onnx_save(tmp_path, data): assert load_parameters_from_onnx(onnx.load(onnx_path))["model_info"]["embedded_processing"] == "True" assert type(cls_model) is type(deserialized) for attr in cls_model.parameters(): - assert getattr(cls_model, attr) == getattr(deserialized, attr) + assert getattr(cls_model.params, attr) == getattr(deserialized.params, attr) def test_padim_save(tmp_path, data): @@ -83,7 +83,7 @@ def test_padim_save(tmp_path, data): assert not deserialized.get_model().get_rt_info(["model_info", "embedded_processing"]).astype(bool) assert type(padim_model) is type(deserialized) for attr in padim_model.parameters(): - assert getattr(padim_model, attr) == getattr(deserialized, attr) + assert getattr(padim_model.params, attr) == getattr(deserialized.params, attr) def test_stfpm_save(tmp_path, data): @@ -97,7 +97,7 @@ def test_stfpm_save(tmp_path, data): assert not deserialized.get_model().get_rt_info(["model_info", "embedded_processing"]).astype(bool) assert type(stfpm_model) is type(deserialized) for attr in stfpm_model.parameters(): - assert getattr(stfpm_model, attr) == getattr(deserialized, attr) + assert getattr(stfpm_model.params, attr) == getattr(deserialized.params, attr) def test_uflow_save(tmp_path, data): @@ -111,4 +111,4 @@ def test_uflow_save(tmp_path, data): assert not deserialized.get_model().get_rt_info(["model_info", "embedded_processing"]).astype(bool) assert type(uflow_model) is type(deserialized) for attr in uflow_model.parameters(): - assert getattr(uflow_model, attr) == getattr(deserialized, attr) + assert getattr(uflow_model.params, attr) == getattr(deserialized.params, attr)