diff --git a/src/model_api/models/__init__.py b/src/model_api/models/__init__.py index 2dbed021..2df1af31 100644 --- a/src/model_api/models/__init__.py +++ b/src/model_api/models/__init__.py @@ -29,6 +29,7 @@ from .ssd import SSD from .utils import ( OutputTransform, + ResizeMetadata, add_rotated_rects, get_contours, ) @@ -78,6 +79,7 @@ "OutputTransform", "PredictedMask", "Prompt", + "ResizeMetadata", "RotatedSegmentationResult", "SAMDecoder", "SAMImageEncoder", diff --git a/src/model_api/models/action_classification.py b/src/model_api/models/action_classification.py index 9dd944c8..4b66eff5 100644 --- a/src/model_api/models/action_classification.py +++ b/src/model_api/models/action_classification.py @@ -119,7 +119,7 @@ def _get_inputs(self) -> list[str]: ) return image_blob_names - def preprocess( + def base_preprocess( self, inputs: np.ndarray, ) -> tuple[dict[str, np.ndarray], dict[str, tuple[int, ...]]]: diff --git a/src/model_api/models/anomaly.py b/src/model_api/models/anomaly.py index f3366a82..2c43da3b 100644 --- a/src/model_api/models/anomaly.py +++ b/src/model_api/models/anomaly.py @@ -68,13 +68,7 @@ def __init__( super().__init__(inference_adapter, configuration, preload) self._check_io_number(1, (1, 4)) - def preprocess(self, inputs: np.ndarray) -> list[dict]: - """Data preprocess method for Anomalib models. - - Anomalib models typically expect inputs in [0,1] range as float32. - """ - original_shape = inputs.shape - + def _resize_image(self, image: np.ndarray) -> tuple[np.ndarray, dict]: if ( self._is_dynamic and getattr(self.inference_adapter, "device", "") == "NPU" @@ -83,40 +77,12 @@ def preprocess(self, inputs: np.ndarray) -> list[dict]: _, self.c, self.h, self.w = self.inference_adapter.compiled_model.inputs[0].get_shape() self._is_dynamic = False - if self._is_dynamic: - h, w, c = inputs.shape - resized_shape = (w, h, c) + return super()._resize_image(image) - # For anomalib models, convert to float32 and normalize to [0,1] if needed - if inputs.dtype == np.uint8: - processed_image = inputs.astype(np.float32) / 255.0 - else: - processed_image = inputs.astype(np.float32) - - # Apply layout change but skip InputTransform (which might apply wrong normalization) - processed_image = self._change_layout(processed_image) - else: - resized_shape = (self.w, self.h, self.c) - # For fixed models, use standard preprocessing - if self.params.embedded_processing: - processed_image = inputs[None] - else: - # Resize image to expected model input dimensions - resized_image = self.resize(inputs, (self.w, self.h)) - # Convert to float32 and normalize for anomalib - if resized_image.dtype == np.uint8: - processed_image = resized_image.astype(np.float32) / 255.0 - else: - processed_image = resized_image.astype(np.float32) - processed_image = self._change_layout(processed_image) - - return [ - {self.image_blob_name: processed_image}, - { - "original_shape": original_shape, - "resized_shape": resized_shape, - }, - ] + def _input_transform(self, image: np.ndarray) -> np.ndarray: + if image.dtype == np.uint8: + return image.astype(np.float32) / 255.0 + return image.astype(np.float32) def postprocess(self, outputs: dict[str, np.ndarray], meta: dict[str, Any]) -> AnomalyResult: """Post-processes the outputs and returns the results. diff --git a/src/model_api/models/classification.py b/src/model_api/models/classification.py index 662e84aa..5e5d0e8e 100644 --- a/src/model_api/models/classification.py +++ b/src/model_api/models/classification.py @@ -60,36 +60,41 @@ def __init__(self, inference_adapter: InferenceAdapter, configuration: dict = {} self._verify_single_output() self.raw_scores_name = _raw_scores_name + if self.params.hierarchical: - self._embedded_processing = True - self.out_layer_names = _get_non_xai_names(self.outputs.keys()) - _append_xai_names(self.outputs.keys(), self.out_layer_names) - hierarchical_config = self.params.hierarchical_config - if not hierarchical_config: - self.raise_error("Hierarchical classification config is empty.") - self.raw_scores_name = self.out_layer_names[0] - self.hierarchical_info = json.loads(hierarchical_config) - - if self.params.hierarchical_postproc == "probabilistic": - self.labels_resolver = ProbabilisticLabelsResolver( - self.hierarchical_info, - ) - else: - self.labels_resolver = GreedyLabelsResolver(self.hierarchical_info) + self._setup_hierarchical() + elif self.params.multilabel: + self._setup_multilabel() + else: + self._setup_single_label() - if preload: - self.load() - return + _append_xai_names(self.outputs.keys(), self.out_layer_names) + if preload: + self.load() - if self.params.multilabel: - self._embedded_processing = True - self.out_layer_names = _get_non_xai_names(self.outputs.keys()) - _append_xai_names(self.outputs.keys(), self.out_layer_names) - self.raw_scores_name = self.out_layer_names[0] - if preload: - self.load() - return + def _setup_hierarchical(self) -> None: + """Configure model for hierarchical classification.""" + self._embedded_processing = True + self.out_layer_names = _get_non_xai_names(self.outputs.keys()) + hierarchical_config = self.params.hierarchical_config + if not hierarchical_config: + self.raise_error("Hierarchical classification config is empty.") + self.raw_scores_name = self.out_layer_names[0] + self.hierarchical_info = json.loads(hierarchical_config) + + if self.params.hierarchical_postproc == "probabilistic": + self.labels_resolver = ProbabilisticLabelsResolver(self.hierarchical_info) + else: + self.labels_resolver = GreedyLabelsResolver(self.hierarchical_info) + + def _setup_multilabel(self) -> None: + """Configure model for multi-label classification.""" + self._embedded_processing = True + self.out_layer_names = _get_non_xai_names(self.outputs.keys()) + self.raw_scores_name = self.out_layer_names[0] + def _setup_single_label(self) -> None: + """Configure model for single-label classification with TopK.""" try: addOrFindSoftmaxAndTopkOutputs( self.inference_adapter, @@ -114,10 +119,6 @@ def __init__(self, inference_adapter: InferenceAdapter, configuration: dict = {} self.embedded_processing = True - _append_xai_names(self.outputs.keys(), self.out_layer_names) - if preload: - self.load() - def _load_labels(self, labels_file: str) -> list: with Path(labels_file).open() as f: labels = [] diff --git a/src/model_api/models/detection_model.py b/src/model_api/models/detection_model.py index 84b89aba..1b744bd7 100644 --- a/src/model_api/models/detection_model.py +++ b/src/model_api/models/detection_model.py @@ -8,7 +8,7 @@ from .image_model import ImageModel from .parameters import ParameterRegistry from .result import DetectionResult -from .utils import load_labels +from .utils import ResizeMetadata, load_labels class DetectionModel(ImageModel): @@ -58,6 +58,18 @@ def parameters(cls): ) return parameters + def preprocess(self, dict_inputs: dict, meta: dict) -> tuple[dict, dict]: + input_img_height, input_img_width = meta["original_shape"][:2] + resize_meta = ResizeMetadata.compute( + original_width=input_img_width, + original_height=input_img_height, + model_width=self.w, + model_height=self.h, + resize_type=self.params.resize_type, + ) + meta["resize_info"] = resize_meta.to_dict() + return dict_inputs, meta + def _resize_detections(self, detection_result: DetectionResult, meta: dict): """Resizes detection bounding boxes according to initial image shape. @@ -68,26 +80,24 @@ def _resize_detections(self, detection_result: DetectionResult, meta: dict): detection_result (DetectionList): detection result with coordinates in normalized form meta (dict): the input metadata obtained from `preprocess` method """ - input_img_height, input_img_widht = meta["original_shape"][:2] - inverted_scale_x = input_img_widht / self.w - inverted_scale_y = input_img_height / self.h - pad_left = 0 - pad_top = 0 - resize_type = self.params.resize_type - if resize_type == "fit_to_window" or resize_type == "fit_to_window_letterbox": - inverted_scale_x = inverted_scale_y = max( - inverted_scale_x, - inverted_scale_y, + input_img_height, input_img_width = meta["original_shape"][:2] + + if "resize_info" in meta: + resize_meta = ResizeMetadata.from_dict(meta["resize_info"]) + else: + resize_meta = ResizeMetadata.compute( + original_width=input_img_width, + original_height=input_img_height, + model_width=self.w, + model_height=self.h, + resize_type=self.params.resize_type, ) - if resize_type == "fit_to_window_letterbox": - pad_left = (self.w - round(input_img_widht / inverted_scale_x)) // 2 - pad_top = (self.h - round(input_img_height / inverted_scale_y)) // 2 boxes = detection_result.bboxes - boxes[:, 0::2] = (boxes[:, 0::2] * self.w - pad_left) * inverted_scale_x - boxes[:, 1::2] = (boxes[:, 1::2] * self.h - pad_top) * inverted_scale_y + boxes[:, 0::2] = (boxes[:, 0::2] * self.w - resize_meta.pad_left) * resize_meta.inverted_scale_x + boxes[:, 1::2] = (boxes[:, 1::2] * self.h - resize_meta.pad_top) * resize_meta.inverted_scale_y np.round(boxes, out=boxes) - boxes[:, 0::2] = np.clip(boxes[:, 0::2], 0, input_img_widht) + boxes[:, 0::2] = np.clip(boxes[:, 0::2], 0, input_img_width) boxes[:, 1::2] = np.clip(boxes[:, 1::2], 0, input_img_height) detection_result.bboxes = boxes.astype(np.int32) diff --git a/src/model_api/models/image_model.py b/src/model_api/models/image_model.py index 158cf4de..06ae53cc 100644 --- a/src/model_api/models/image_model.py +++ b/src/model_api/models/image_model.py @@ -148,7 +148,7 @@ def _get_inputs(self) -> tuple[list[str], ...]: ) return image_blob_names, image_info_blob_names - def preprocess(self, inputs: np.ndarray) -> list[dict]: + def base_preprocess(self, inputs: np.ndarray) -> list[dict]: """Data preprocess method It performs basic preprocessing of a single image: @@ -173,35 +173,61 @@ def preprocess(self, inputs: np.ndarray) -> list[dict]: } - the input metadata, which might be used in `postprocess` method """ - original_shape = inputs.shape - if self.params.embedded_processing: - processed_image = inputs[None] - if self._is_dynamic: - h, w, c = inputs.shape - resized_shape = (w, h, c) - else: - resized_shape = (self.w, self.h, self.c) - elif self._is_dynamic: + dict_inputs, meta = self._preprocess_embedded(inputs) + dict_inputs, meta = self.preprocess(dict_inputs, meta) + return [dict_inputs, meta] + + # 1. Resize + resized_image, meta = self._resize_image(inputs) + + # 2. Transform + processed_image = self._input_transform(resized_image) + + # 3. Layout + processed_image = self._change_layout(processed_image) + + # 4. Pack + dict_inputs = {self.image_blob_name: processed_image} + + # 5. Model-specific preprocess + dict_inputs, meta = self.preprocess(dict_inputs, meta) + + return [dict_inputs, meta] + + def _preprocess_embedded(self, inputs: np.ndarray) -> tuple[dict, dict]: + original_shape = inputs.shape + processed_image = inputs[None] + if self._is_dynamic: h, w, c = inputs.shape resized_shape = (w, h, c) - processed_image = self.input_transform(inputs) - processed_image = self._change_layout(processed_image) else: - # Fixed model without embedded preprocessing resized_shape = (self.w, self.h, self.c) - resized_image = self.resize(inputs, (self.w, self.h), pad_value=self.params.pad_value) - processed_image = self.input_transform(resized_image) - processed_image = self._change_layout(processed_image) - - return [ + return ( {self.image_blob_name: processed_image}, { "original_shape": original_shape, "resized_shape": resized_shape, }, - ] + ) + + def _resize_image(self, image: np.ndarray) -> tuple[np.ndarray, dict]: + original_shape = image.shape + if self._is_dynamic: + h, w, c = image.shape + resized_shape = (w, h, c) + return image, {"original_shape": original_shape, "resized_shape": resized_shape} + + resized_shape = (self.w, self.h, self.c) + resized_image = self.resize(image, (self.w, self.h), pad_value=self.params.pad_value) + return resized_image, {"original_shape": original_shape, "resized_shape": resized_shape} + + def _input_transform(self, image: np.ndarray) -> np.ndarray: + return self.input_transform(image) + + def preprocess(self, dict_inputs: dict, meta: dict) -> tuple[dict, dict]: + return dict_inputs, meta def _change_layout(self, image: np.ndarray) -> np.ndarray: """Changes the input image layout to fit the layout of the model input layer. diff --git a/src/model_api/models/instance_segmentation.py b/src/model_api/models/instance_segmentation.py index c6e104af..920ce79a 100644 --- a/src/model_api/models/instance_segmentation.py +++ b/src/model_api/models/instance_segmentation.py @@ -11,7 +11,7 @@ from .image_model import ImageModel from .parameters import ParameterRegistry from .result import InstanceSegmentationResult -from .utils import load_labels +from .utils import ResizeMetadata, load_labels class MaskRCNNModel(ImageModel): @@ -95,8 +95,7 @@ def _get_segmentoly_outputs(self) -> dict: ) return outputs - def preprocess(self, inputs: np.ndarray) -> list[dict]: - dict_inputs, meta = super().preprocess(inputs) + def preprocess(self, dict_inputs: dict, meta: dict) -> tuple[dict, dict]: input_image_size = meta["resized_shape"][:2] if self.is_segmentoly: assert len(self.image_info_blob_names) == 1 @@ -105,7 +104,7 @@ def preprocess(self, inputs: np.ndarray) -> list[dict]: dtype=np.float32, ) dict_inputs[self.image_info_blob_names[0]] = input_image_info - return [dict_inputs, meta] + return dict_inputs, meta def postprocess(self, outputs: dict, meta: dict) -> InstanceSegmentationResult: if ( @@ -141,20 +140,21 @@ def postprocess(self, outputs: dict, meta: dict) -> InstanceSegmentationResult: meta["original_shape"][1], meta["original_shape"][0], ) - invertedScaleX, invertedScaleY = ( - inputImgWidth / self.orig_width, - inputImgHeight / self.orig_height, + resize_meta = ResizeMetadata.compute( + original_width=inputImgWidth, + original_height=inputImgHeight, + model_width=self.orig_width, + model_height=self.orig_height, + resize_type=self.params.resize_type, + ) + + boxes -= (resize_meta.pad_left, resize_meta.pad_top, resize_meta.pad_left, resize_meta.pad_top) + boxes *= ( + resize_meta.inverted_scale_x, + resize_meta.inverted_scale_y, + resize_meta.inverted_scale_x, + resize_meta.inverted_scale_y, ) - padLeft, padTop = 0, 0 - resize_type = self.params.resize_type - if resize_type == "fit_to_window" or resize_type == "fit_to_window_letterbox": - invertedScaleX = invertedScaleY = max(invertedScaleX, invertedScaleY) - if resize_type == "fit_to_window_letterbox": - padLeft = (self.orig_width - round(inputImgWidth / invertedScaleX)) // 2 - padTop = (self.orig_height - round(inputImgHeight / invertedScaleY)) // 2 - - boxes -= (padLeft, padTop, padLeft, padTop) - boxes *= (invertedScaleX, invertedScaleY, invertedScaleX, invertedScaleY) np.around(boxes, out=boxes) np.clip( boxes, diff --git a/src/model_api/models/keypoint_detection.py b/src/model_api/models/keypoint_detection.py index 90a39997..b3cbafd8 100644 --- a/src/model_api/models/keypoint_detection.py +++ b/src/model_api/models/keypoint_detection.py @@ -12,6 +12,7 @@ from .image_model import ImageModel from .parameters import ParameterRegistry from .result import DetectedKeypoints, DetectionResult +from .utils import ResizeMetadata class KeypointDetectionModel(ImageModel): @@ -31,6 +32,24 @@ def __init__(self, inference_adapter, configuration: dict = {}, preload=False): super().__init__(inference_adapter, configuration, preload) self._check_io_number(1, 2) + def preprocess(self, dict_inputs: dict, meta: dict) -> tuple[dict, dict]: + orig_h, orig_w = meta["original_shape"][:2] + resize_meta = ResizeMetadata.compute( + original_width=orig_w, + original_height=orig_h, + model_width=self.w, + model_height=self.h, + resize_type=self.params.resize_type, + ) + # Store with keypoint-specific naming for backward compatibility + meta["resize_info"] = { + "kp_scale_h": resize_meta.inverted_scale_y, + "kp_scale_w": resize_meta.inverted_scale_x, + "pad_left": resize_meta.pad_left, + "pad_top": resize_meta.pad_top, + } + return dict_inputs, meta + def postprocess( self, outputs: dict[str, np.ndarray], @@ -51,20 +70,30 @@ def postprocess( encoded_kps[1], apply_softmax=self.params.apply_softmax, ) - orig_h, orig_w = meta["original_shape"][:2] - kp_scale_h = orig_h / self.h - kp_scale_w = orig_w / self.w - batch_keypoints = batch_keypoints.squeeze() + if "resize_info" in meta: + info = meta["resize_info"] + kp_scale_h = info["kp_scale_h"] + kp_scale_w = info["kp_scale_w"] + pad_left = info["pad_left"] + pad_top = info["pad_top"] + else: + orig_h, orig_w = meta["original_shape"][:2] + resize_meta = ResizeMetadata.compute( + original_width=orig_w, + original_height=orig_h, + model_width=self.w, + model_height=self.h, + resize_type=self.params.resize_type, + ) + kp_scale_h = resize_meta.inverted_scale_y + kp_scale_w = resize_meta.inverted_scale_x + pad_left = resize_meta.pad_left + pad_top = resize_meta.pad_top - resize_type = self.params.resize_type - if resize_type in ["fit_to_window", "fit_to_window_letterbox"]: - inverted_scale = max(kp_scale_h, kp_scale_w) - kp_scale_h = kp_scale_w = inverted_scale - if resize_type == "fit_to_window_letterbox": - pad_left = (self.w - round(orig_w / inverted_scale)) // 2 - pad_top = (self.h - round(orig_h / inverted_scale)) // 2 - batch_keypoints -= np.array([pad_left, pad_top]) + batch_keypoints = batch_keypoints.squeeze() + if pad_left != 0 or pad_top != 0: + batch_keypoints -= np.array([pad_left, pad_top]) batch_keypoints *= np.array([kp_scale_w, kp_scale_h]) diff --git a/src/model_api/models/model.py b/src/model_api/models/model.py index b6bf79c4..2a62a4f0 100644 --- a/src/model_api/models/model.py +++ b/src/model_api/models/model.py @@ -350,7 +350,7 @@ def raise_error(cls, message) -> NoReturn: """ raise WrapperError(cls.__model__, message) - def preprocess(self, inputs): + def base_preprocess(self, inputs): """Interface for preprocess method. Args: @@ -368,6 +368,19 @@ def preprocess(self, inputs): """ raise NotImplementedError + def preprocess(self, dict_inputs, meta): + """Interface for preprocess hook. + + Args: + dict_inputs: preprocessed data + meta: input metadata + + Returns: + - the preprocessed data + - the input metadata + """ + return dict_inputs, meta + def postprocess(self, outputs: dict[str, Any], meta: dict[str, Any]): """Interface for postprocess method. @@ -437,7 +450,7 @@ def __call__(self, inputs: ndarray): """ self.perf.total_time.update() self.perf.preprocess_time.update() - dict_data, input_meta = self.preprocess(inputs) + dict_data, input_meta = self.base_preprocess(inputs) self.perf.preprocess_time.update() self.perf.inference_time.update() raw_result = self.infer_sync(dict_data) @@ -555,7 +568,7 @@ def infer_async(self, input_data: dict, user_data: Any): ) self.perf.total_time.update() self.perf.preprocess_time.update() - dict_data, meta = self.preprocess(input_data) + dict_data, meta = self.base_preprocess(input_data) self.perf.preprocess_time.update() self.perf.inference_time.update() self.inference_adapter.infer_async( diff --git a/src/model_api/models/sam_models.py b/src/model_api/models/sam_models.py index 55fb805e..a80eb7f0 100644 --- a/src/model_api/models/sam_models.py +++ b/src/model_api/models/sam_models.py @@ -50,14 +50,10 @@ def parameters(cls) -> dict[str, Any]: ) return parameters - def preprocess( - self, - inputs: np.ndarray, - ) -> list[dict]: + def preprocess(self, dict_inputs: dict, meta: dict) -> tuple[dict, dict]: """Update meta for image encoder.""" - dict_inputs, meta = super().preprocess(inputs) meta["resize_type"] = self.params.resize_type - return [dict_inputs, meta] + return dict_inputs, meta def postprocess( self, @@ -122,7 +118,7 @@ def parameters(cls) -> dict[str, Any]: def _get_outputs(self) -> str: return "upscaled_masks" - def preprocess(self, inputs: dict[str, Any]) -> list[dict]: + def base_preprocess(self, inputs: dict[str, Any]) -> list[dict]: """Preprocess prompts.""" processed_prompts: list[dict[str, Any]] = [] for prompt_name in ["bboxes", "points"]: diff --git a/src/model_api/models/ssd.py b/src/model_api/models/ssd.py index 1e223216..a5cb0b30 100644 --- a/src/model_api/models/ssd.py +++ b/src/model_api/models/ssd.py @@ -159,8 +159,7 @@ def __init__(self, inference_adapter, configuration: dict = {}, preload=False): self.image_info_blob_name = self.image_info_blob_names[0] if len(self.image_info_blob_names) == 1 else None self.output_parser = self._get_output_parser(self.image_blob_name) - def preprocess(self, inputs): - dict_inputs, meta = super().preprocess(inputs) + def preprocess(self, dict_inputs: dict, meta: dict) -> tuple[dict, dict]: if self.image_info_blob_name: dict_inputs[self.image_info_blob_name] = np.array([[self.h, self.w, 1]]) return dict_inputs, meta diff --git a/src/model_api/models/utils.py b/src/model_api/models/utils.py index e72a7217..fb7c9cd4 100644 --- a/src/model_api/models/utils.py +++ b/src/model_api/models/utils.py @@ -5,6 +5,7 @@ from __future__ import annotations +from dataclasses import dataclass from pathlib import Path from typing import TYPE_CHECKING @@ -17,6 +18,96 @@ from model_api.models.result.detection import DetectionResult +@dataclass +class ResizeMetadata: + """Image resize transformation metadata. + + Contains parameters needed to transform coordinates (e.g., bounding boxes, + keypoints) from model input space back to the original image space. It handles different + resize strategies including standard resize, fit-to-window, and letterbox modes. + + Attributes: + inverted_scale_x: Scale factor to multiply x-coordinates to map from model to original space. + inverted_scale_y: Scale factor to multiply y-coordinates to map from model to original space. + pad_left: Left padding added during letterbox resize (0 for other resize types). + pad_top: Top padding added during letterbox resize (0 for other resize types). + """ + + inverted_scale_x: float + inverted_scale_y: float + pad_left: int = 0 + pad_top: int = 0 + + @classmethod + def compute( + cls, + original_width: int, + original_height: int, + model_width: int, + model_height: int, + resize_type: str, + ) -> "ResizeMetadata": + """Compute resize metadata for coordinate transformation. + + Args: + original_width: Width of the original input image. + original_height: Height of the original input image. + model_width: Width of the model input (after resize). + model_height: Height of the model input (after resize). + resize_type: Type of resize applied ("standard", "fit_to_window", "fit_to_window_letterbox"). + + Returns: + ResizeMetadata instance with computed scale factors and padding. + """ + inverted_scale_x = original_width / model_width + inverted_scale_y = original_height / model_height + pad_left = 0 + pad_top = 0 + + if resize_type in ("fit_to_window", "fit_to_window_letterbox"): + inverted_scale_x = inverted_scale_y = max(inverted_scale_x, inverted_scale_y) + if resize_type == "fit_to_window_letterbox": + pad_left = (model_width - round(original_width / inverted_scale_x)) // 2 + pad_top = (model_height - round(original_height / inverted_scale_y)) // 2 + + return cls( + inverted_scale_x=inverted_scale_x, + inverted_scale_y=inverted_scale_y, + pad_left=pad_left, + pad_top=pad_top, + ) + + def to_dict(self) -> dict[str, float | int]: + """Convert to dictionary for storage in metadata. + + Returns: + Dictionary with keys matching the legacy resize_info format. + """ + return { + "inverted_scale_x": self.inverted_scale_x, + "inverted_scale_y": self.inverted_scale_y, + "pad_left": self.pad_left, + "pad_top": self.pad_top, + } + + @classmethod + def from_dict(cls, data: dict[str, float | int]) -> "ResizeMetadata": + """Create from dictionary (e.g., from metadata). + + Args: + data: Dictionary with resize info keys. + + Returns: + ResizeMetadata instance. + """ + return cls( + inverted_scale_x=data["inverted_scale_x"], + inverted_scale_y=data["inverted_scale_y"], + pad_left=int(data.get("pad_left", 0)), + pad_top=int(data.get("pad_top", 0)), + ) + + def add_rotated_rects(inst_seg_result: InstanceSegmentationResult) -> RotatedSegmentationResult: objects_with_rects = [] for mask in inst_seg_result.masks: diff --git a/src/model_api/models/visual_prompting.py b/src/model_api/models/visual_prompting.py index 7958ed4e..e06cd381 100644 --- a/src/model_api/models/visual_prompting.py +++ b/src/model_api/models/visual_prompting.py @@ -70,9 +70,9 @@ def infer( outputs: list[dict[str, Any]] = [] - processed_image, meta = self.encoder.preprocess(image) + processed_image, meta = self.encoder.base_preprocess(image) image_embeddings = self.encoder.infer_sync(processed_image) - processed_prompts = self.decoder.preprocess( + processed_prompts = self.decoder.base_preprocess( { "bboxes": [box.data for box in boxes] if boxes else None, "points": [point.data for point in points] if points else None, @@ -229,7 +229,7 @@ def learn( if reset_features or not self.has_reference_features(): self.reset_reference_info() - processed_prompts = self.decoder.preprocess( + processed_prompts = self.decoder.base_preprocess( { "bboxes": [box.data for box in boxes] if boxes else None, "points": [point.data for point in points] if points else None, diff --git a/src/model_api/models/yolo.py b/src/model_api/models/yolo.py index f958a35a..b232cb1b 100644 --- a/src/model_api/models/yolo.py +++ b/src/model_api/models/yolo.py @@ -13,7 +13,7 @@ from .parameters import ParameterRegistry from .result import DetectionResult from .types import BooleanValue, ListValue -from .utils import clip_detections, multiclass_nms, nms +from .utils import ResizeMetadata, clip_detections, multiclass_nms, nms DetectionBox = namedtuple("DetectionBox", ["x", "y", "w", "h"]) @@ -527,8 +527,7 @@ def parameters(cls): parameters["confidence_threshold"].update_default_value(0.5) return parameters - def preprocess(self, inputs): - image = inputs + def _resize_image(self, image: np.ndarray) -> tuple[np.ndarray, dict]: resized_image = resize_image_ocv( image, (self.w, self.h), @@ -540,19 +539,10 @@ def preprocess(self, inputs): meta = { "original_shape": image.shape, + "resized_shape": padded_image.shape, "scale": min(self.w / image.shape[1], self.h / image.shape[0]), } - - preprocessed_image = self.input_transform(padded_image) - preprocessed_image = preprocessed_image.transpose( - (2, 0, 1), - ) # Change data layout from HWC to CHW - preprocessed_image = preprocessed_image.reshape( - (self.n, self.c, self.h, self.w), - ) - - dict_inputs = {self.image_blob_name: preprocessed_image} - return dict_inputs, meta + return padded_image, meta def postprocess(self, outputs, meta) -> DetectionResult: output = outputs[self.output_blob_name][0] @@ -666,37 +656,30 @@ def parameters(cls): parameters["confidence_threshold"].update_default_value(0.5) return parameters - def preprocess(self, inputs): - image = inputs - dict_inputs = {} - meta = {"original_shape": image.shape} + def _resize_image(self, image: np.ndarray) -> tuple[np.ndarray, dict]: + if self._is_dynamic: + return super()._resize_image(image) - if self.params.embedded_processing: - meta.update({"resized_shape": (self.w, self.h)}) + resized_image = self.resize( + image, + (self.w, self.h), + interpolation=INTERPOLATION_TYPES["CUBIC"], + ) + meta = { + "original_shape": image.shape, + "resized_shape": resized_image.shape, + } + return resized_image, meta - dict_inputs = { - self.image_blob_name: np.expand_dims(image, axis=0), - self.image_info_blob_name: np.array( - [[image.shape[0], image.shape[1]]], - dtype=np.float32, - ), - } - else: - resized_image = self.resize( - image, - (self.w, self.h), - interpolation=INTERPOLATION_TYPES["CUBIC"], - ) - meta.update({"resized_shape": resized_image.shape}) - resized_image = self._change_layout(resized_image) - dict_inputs = { - self.image_blob_name: resized_image, - self.image_info_blob_name: np.array( - [[image.shape[0], image.shape[1]]], - dtype=np.float32, - ), - } + def _input_transform(self, image: np.ndarray) -> np.ndarray: + return image + def preprocess(self, dict_inputs: dict, meta: dict) -> tuple[dict, dict]: + h, w = meta["original_shape"][:2] + dict_inputs[self.image_info_blob_name] = np.array( + [[h, w]], + dtype=np.float32, + ) return dict_inputs, meta def postprocess(self, outputs, meta) -> DetectionResult: @@ -832,20 +815,21 @@ def postprocess(self, outputs, meta) -> DetectionResult: boxes, _ = multiclass_nms(boxes, iou_threshold, keep_top_k) # type: ignore[attr-defined] inputImgWidth = meta["original_shape"][1] inputImgHeight = meta["original_shape"][0] - invertedScaleX, invertedScaleY = ( - inputImgWidth / self.orig_width, - inputImgHeight / self.orig_height, + resize_meta = ResizeMetadata.compute( + original_width=inputImgWidth, + original_height=inputImgHeight, + model_width=self.orig_width, + model_height=self.orig_height, + resize_type=self.params.resize_type, ) - padLeft, padTop = 0, 0 - resize_type = self.params.resize_type - if resize_type == "fit_to_window" or resize_type == "fit_to_window_letterbox": - invertedScaleX = invertedScaleY = max(invertedScaleX, invertedScaleY) - if resize_type == "fit_to_window_letterbox": - padLeft = (self.orig_width - round(inputImgWidth / invertedScaleX)) // 2 - padTop = (self.orig_height - round(inputImgHeight / invertedScaleY)) // 2 coords = boxes[:, 2:] - coords -= (padLeft, padTop, padLeft, padTop) - coords *= (invertedScaleX, invertedScaleY, invertedScaleX, invertedScaleY) + coords -= (resize_meta.pad_left, resize_meta.pad_top, resize_meta.pad_left, resize_meta.pad_top) + coords *= ( + resize_meta.inverted_scale_x, + resize_meta.inverted_scale_y, + resize_meta.inverted_scale_x, + resize_meta.inverted_scale_y, + ) intboxes = np.round(coords, out=coords).astype(np.int32) np.clip( diff --git a/src/model_api/pipelines/async_pipeline.py b/src/model_api/pipelines/async_pipeline.py index 3bbd0651..3575ac8f 100644 --- a/src/model_api/pipelines/async_pipeline.py +++ b/src/model_api/pipelines/async_pipeline.py @@ -26,7 +26,7 @@ def callback(self, request, callback_args): def submit_data(self, inputs, id, meta={}): self.model.perf.preprocess_time.update() - inputs, preprocessing_meta = self.model.preprocess(inputs) + inputs, preprocessing_meta = self.model.base_preprocess(inputs) self.model.perf.preprocess_time.update() self.model.perf.inference_time.update() diff --git a/src/model_api/tilers/instance_segmentation.py b/src/model_api/tilers/instance_segmentation.py index 7bcc30f6..d0dbae03 100644 --- a/src/model_api/tilers/instance_segmentation.py +++ b/src/model_api/tilers/instance_segmentation.py @@ -57,7 +57,7 @@ def _filter_tiles(self, image, tile_coords, confidence_threshold=0.35): keep_coords = [] for i, coord in enumerate(tile_coords): tile_img = self._crop_tile(image, coord) - tile_dict, _ = self.model.preprocess(tile_img) + tile_dict, _ = self.model.base_preprocess(tile_img) cls_outputs = self.tile_classifier_model.infer_sync(tile_dict) if i == 0 or cls_outputs["tile_prob"] > confidence_threshold: keep_coords.append(coord)