diff --git a/backends/nxp/aten_passes/convert_1d_conv_to_2d.py b/backends/nxp/aten_passes/convert_1d_conv_to_2d.py new file mode 100644 index 00000000000..cb4030d8b0f --- /dev/null +++ b/backends/nxp/aten_passes/convert_1d_conv_to_2d.py @@ -0,0 +1,396 @@ +# Copyright 2026 NXP +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + +import torch +from executorch.backends.nxp.backend.edge_helper import ( + try_get_tensor_constant_from_node, +) +from torch._subclasses import FakeTensor, FakeTensorMode +from torch.ao.quantization.fx.utils import get_new_attr_name_with_prefix +from torch.export.unflatten import _assign_attr, _AttrKind +from torch.fx import GraphModule, Node +from torch.fx.passes.infra.pass_base import PassBase, PassResult + + +Conv1dArgs = tuple[Node, Node, (Node | None), list[int], list[int], list[int], int] +Conv1dTranspArgs = tuple[ + Node, Node, (Node | None), list[int], list[int], list[int], int, list[int] +] + + +class ConvertConv1dToConv2dPass(PassBase): + r""" + The NXP backend supports only 2D convolutions. Rewrite 1D convolutions into an equivalent 2D form by + inserting a singleton spatial dimension and then remove it again. + If batch norm is present after the convolution, is is also converted from 1D to 2D. + + Without batch norm: + + x W x W + [N, C1, H] [I/O, I/O, k] [N, C1, H] [I/O, I/O, 1, k] + │ │ │ │ + │ │ ┌─────────▼──────────┐ │ + │ │ │ unsqueeze(x, -2) │ │ + │ │ └─────────▼──────────┘ │ + │ │ │ │ + │ │ [N, C1, 1, H ] │ + │ │ │ │ + └────────┐ ┌────────┘ └──────────┐ ┌──────────┘ + │ │ │ │ + ┌────────▼───────▼───────┐ ┌────────▼─────▼────────┐ + │ convolution ◄──B [O] replace │ convolution ◄──B [O] + │ (1D/transposed 1D) │ ────────────────► │ (2D/transposed 2D) │ + └────────────┬───────────┘ with └───────────┬───────────┘ + │ │ + │ [N, C2, 1, H] + │ │ + │ ┌─────────▼──────────┐ + │ │ squeeze(x, -2) │ + │ └─────────┬──────────┘ + │ │ + ▼ ▼ + [N, C2, H] [N, C2, H] + y y + + With batch norm: + + x W x W + [N, C1, H] [I/O, I/O, k] [N, C1, H] [I/O, I/O, 1, k] + │ │ │ │ + │ │ ┌─────────▼──────────┐ │ + │ │ │ unsqueeze(x, -2) │ │ + │ │ └─────────▼──────────┘ │ + │ │ │ │ + │ │ [N, C1, 1, H] │ + │ │ │ │ + └────────┐ ┌────────┘ └──────────┐ ┌──────────┘ + │ │ │ │ + ┌────────▼───────▼───────┐ ┌────────▼─────▼────────┐ + │ convolution ◄──B [O] replace │ convolution ◄──B [O] + │ (1D/transposed 1D) │ ────────────────► │ (2D/transposed 2D) │ + └────────────┬───────────┘ with └───────────┬───────────┘ + │ │ + [N, C2, 1, H] [N, C2, 1, H] + │ │ + ┌───────▼───────┐ ┌───────▼───────┐ + │ batch_norm │ │ batch_norm │ + │ (1D) │ │ (2D) │ + └───────┬───────┘ └───────┬───────┘ + │ │ + │ [N, C3, 1, H] + │ │ + │ ┌───────▼────────┐ + │ │ squeeze(-2) │ + │ └───────┬────────┘ + │ │ + ▼ ▼ + [N, C3, H] [N, C3, H] + y y + """ + + @staticmethod + def _is_conv_1d(node: Node) -> bool: + return node.target == torch.ops.aten.conv1d.default + + @staticmethod + def _is_conv_transposed_1d(node: Node) -> bool: + return node.target == torch.ops.aten.conv_transpose1d.default + + @staticmethod + def _is_batch_norm(node: Node) -> bool: + return node.target == torch.ops.aten.batch_norm.default + + @staticmethod + def _listify(x: int | list[int] | tuple[int]) -> list[int]: + if isinstance(x, int): + return [x] + + return list(x) + + def _get_node_shape(self, node: Node): + node_t = try_get_tensor_constant_from_node(self.graph_module, node) + if node_t is not None: + return node_t.shape + + return node.meta["val"].shape if hasattr(node, "meta") else node.shape + + def _get_node_dtype(self, node: Node): + node_t = try_get_tensor_constant_from_node(self.graph_module, node) + + if node_t is not None: + return node_t.dtype + + return node.meta["val"].dtype if hasattr(node, "meta") else node.dtype + + def _convert_w_node_to_static_attr(self, node: Node): + t_node = try_get_tensor_constant_from_node(self.graph_module, node) + if t_node is None: + # should not occur + raise RuntimeError( + "Node cannot be converted to `get_attr` since it is not static." + ) + t_node = t_node.unsqueeze(-2) + + t_name = get_new_attr_name_with_prefix(node.name)(self.graph_module) + _assign_attr( + torch.nn.Parameter(t_node), + self.graph_module, + t_name, + _AttrKind.PARAMETER, + ) + + get_attr_node = self.graph_module.graph.create_node("get_attr", t_name, (), {}) + fake_mode = node.meta["val"].fake_mode + get_attr_node.meta["val"] = fake_mode.from_tensor(t_node, static_shapes=True) + + return get_attr_node + + def _create_fake_tensor_for_node_args( + self, node_args: list[Node | None], mode: FakeTensorMode + ): + fake_node_args = [ + ( + FakeTensor.from_tensor( + torch.empty( + self._get_node_shape(arg), dtype=self._get_node_dtype(arg) + ), + mode, + ) + if arg is not None + else None + ) + for arg in node_args + ] + + return fake_node_args + + def _create_batch_norm_2d_node(self, *bn_args): + bn_target = torch.ops.aten.batch_norm.default + bn_node = self.graph_module.graph.call_function(bn_target, bn_args) + + bn_node.meta["source_fn_stack"] = [(bn_node.name, bn_target)] + + node_args = bn_args[:5] + scalar_args = bn_args[5:] + + with FakeTensorMode() as mode: + fake_node_args = self._create_fake_tensor_for_node_args(node_args, mode) + output = bn_target(*fake_node_args, *scalar_args) + + bn_node.meta["val"] = FakeTensor.from_tensor( + torch.empty(output.shape, dtype=output.dtype), mode + ) + + return bn_node + + def _create_some_conv_2d_node(self, target, *conv_args): + # some_conv_2d_node = could be regular 2d conv or transposed 2d conv + some_conv_node = self.graph_module.graph.call_function(target, conv_args) + some_conv_node.meta["source_fn_stack"] = [(some_conv_node.name, target)] + + node_args = conv_args[:3] + scalar_args = conv_args[3:] + + with FakeTensorMode() as mode: + fake_node_args = self._create_fake_tensor_for_node_args(node_args, mode) + output = target(*fake_node_args, *scalar_args) + + some_conv_node.meta["val"] = FakeTensor.from_tensor( + torch.empty(output.shape, dtype=output.dtype), mode + ) + + return some_conv_node + + def _create_sq_or_unsq_node(self, target, *sq_or_unsq_args) -> Node: + sq_or_unsq_node = self.graph_module.graph.call_function(target, sq_or_unsq_args) + + sq_or_unsq_node.meta["source_fn_stack"] = [(sq_or_unsq_node.name, target)] + with FakeTensorMode() as mode: + inp_node = sq_or_unsq_args[0] + fake_input = FakeTensor.from_tensor( + torch.empty( + self._get_node_shape(inp_node), dtype=self._get_node_dtype(inp_node) + ), + mode, + ) + + output = target(fake_input, *sq_or_unsq_args[1:]) + sq_or_unsq_node.meta["val"] = FakeTensor.from_tensor( + torch.empty(output.shape, dtype=output.dtype), mode + ) + + return sq_or_unsq_node + + @staticmethod + def _get_conv_1d_transp_args(node: Node): + args = node.args + listify_fn = ConvertConv1dToConv2dPass._listify + + b_node = None if len(args) < 3 else args[2] + stride = [1] if len(args) < 4 else listify_fn(args[3]) + padding = [0] if len(args) < 5 else listify_fn(args[4]) + output_padding = [0] if len(args) < 6 else listify_fn(args[5]) + groups = 1 if len(args) < 7 else args[6] + dilation = [1] if len(args) < 8 else listify_fn(args[7]) + + return ( + args[0], + args[1], + b_node, + stride, + padding, + output_padding, + groups, + dilation, + ) + + @staticmethod + def _get_conv_1d_args(node: Node) -> Conv1dArgs: + args = node.args + listify_fn = ConvertConv1dToConv2dPass._listify + + b_node = None if len(args) < 3 else args[2] + stride = [1] if len(args) < 4 else listify_fn(args[3]) + padding = [0] if len(args) < 5 else listify_fn(args[4]) + dilation = [1] if len(args) < 6 else listify_fn(args[5]) + groups = 1 if len(args) < 7 else args[6] + + return args[0], args[1], b_node, stride, padding, dilation, groups + + def _convert_scalar_1d_args_to_2d(self, old_1d_node: Node): + if self._is_conv_transposed_1d(old_1d_node): + _, _, _, stride, pad, output_pad, groups, dil = ( + self._get_conv_1d_transp_args(old_1d_node) + ) + + # conversion of 1d args to 2d, ie. padding with default values + stride = [1] + stride + pad = [0] + pad + output_pad = [0] + output_pad + dil = [1] + dil + + return stride, pad, output_pad, groups, dil + + else: + _, _, _, stride, pad, dil, groups = self._get_conv_1d_args(old_1d_node) + + # conversion of 1d args to 2d, ie. padding with default values + stride = [1] + stride + pad = [0] + pad + dil = [1] + dil + + return stride, pad, dil, groups + + def _convert_node_1d_args_to_2d(self, old_1d_node: Node): + if self._is_conv_transposed_1d(old_1d_node): + input_node, w_node, b_node, _, _, _, _, _ = self._get_conv_1d_transp_args( + old_1d_node + ) + else: + input_node, w_node, b_node, _, _, _, _ = self._get_conv_1d_args(old_1d_node) + + with self.graph_module.graph.inserting_before(old_1d_node): + # weights = [i/o, i/o, k] => [i/o, i/o, 1, k] and converted to `get_attr` node + w_node = self._convert_w_node_to_static_attr(w_node) + + # input = [n, c, h] => [n, c, 1, h] + unsqueeze_target = torch.ops.aten.unsqueeze.default + inp_unsq_args = (input_node, -2) + inp_unsq_node = self._create_sq_or_unsq_node( + unsqueeze_target, *inp_unsq_args + ) + + return (inp_unsq_node, w_node, b_node) + + def call(self, graph_module: GraphModule) -> PassResult: + self.graph_module = graph_module + made_changes = False + + for node in list(graph_module.graph.nodes): + is_conv_1d = self._is_conv_1d(node) + is_conv_1d_transp = self._is_conv_transposed_1d(node) + + # some_1d_conv = regular 1d conv or 1d transposed conv + is_some_1d_conv = is_conv_1d or is_conv_1d_transp + if not is_some_1d_conv: + continue + + old_1d_node = node + + # invalid number of args + if len(old_1d_node.args) < 2: + continue + + conv_1d_w = old_1d_node.args[1] + conv_1d_b = old_1d_node.args[2] if len(old_1d_node.args) > 2 else None + + # non-static weights are not supported + if try_get_tensor_constant_from_node(graph_module, conv_1d_w) is None: + continue + + # non-static bias is not supported + if ( + conv_1d_b is not None + and try_get_tensor_constant_from_node(graph_module, conv_1d_b) is None + ): + continue + + # get input, weight and bias arguments for the new 2d conv + node_args = self._convert_node_1d_args_to_2d(old_1d_node) + # get stride, padding etc. arguments for the new 2d conv + scalar_args = self._convert_scalar_1d_args_to_2d(old_1d_node) + + new_2d_target = ( + torch.ops.aten.conv_transpose2d.input + if is_conv_1d_transp + else torch.ops.aten.conv2d.default + ) + + # create the new conv 2d and unsqueeze the input and weights + with self.graph_module.graph.inserting_before(old_1d_node): + new_2d_args = node_args + scalar_args + new_2d_node = self._create_some_conv_2d_node( + new_2d_target, *new_2d_args + ) + + old_1d_conv_users = list(old_1d_node.users.keys()) + if len(old_1d_conv_users) == 1 and self._is_batch_norm( + old_1d_conv_users[0] + ): + bn_1d_node = old_1d_conv_users[0] + + # also convert batch_norm 1d to 2d + with self.graph_module.graph.inserting_after(new_2d_node): + bn_2d_args = (new_2d_node,) + bn_1d_node.args[1:] + bn_2d_node = self._create_batch_norm_2d_node(*bn_2d_args) + + with self.graph_module.graph.inserting_after(bn_2d_node): + squeeze_target = torch.ops.aten.squeeze.dim + + out_sq_args = (bn_2d_node, 2) + out_sq_node = self._create_sq_or_unsq_node( + squeeze_target, *out_sq_args + ) + + bn_1d_node.replace_all_uses_with(out_sq_node) + self.graph_module.graph.erase_node(bn_1d_node) + + else: + with self.graph_module.graph.inserting_after(new_2d_node): + squeeze_target = torch.ops.aten.squeeze.dim + + out_sq_args = (new_2d_node, -2) + out_sq_node = self._create_sq_or_unsq_node( + squeeze_target, *out_sq_args + ) + + old_1d_node.replace_all_uses_with(out_sq_node) + + graph_module.graph.erase_node(old_1d_node) + made_changes = True + + graph_module.recompile() + graph_module.graph.eliminate_dead_code() + return PassResult(graph_module, made_changes) diff --git a/backends/nxp/aten_passes/neutron_aten_pass_manager.py b/backends/nxp/aten_passes/neutron_aten_pass_manager.py index 703a8cf03a5..4f1ff2648aa 100644 --- a/backends/nxp/aten_passes/neutron_aten_pass_manager.py +++ b/backends/nxp/aten_passes/neutron_aten_pass_manager.py @@ -7,6 +7,9 @@ import torch +from executorch.backends.nxp.aten_passes.convert_1d_conv_to_2d import ( + ConvertConv1dToConv2dPass, +) from executorch.backends.nxp.aten_passes.convert_div_to_mul import ConvertDivToMulPass from executorch.backends.nxp.aten_passes.decompose_split_to_slices_pass import ( DecomposeSplitToSlicesPass, @@ -49,6 +52,7 @@ def _get_default_passes(neutron_target_spec, qat_mode: bool = False) -> list[Pas FuseLinearAndAddPass(), MoveActivationBeforeConcat(neutron_target_spec), ConvertDivToMulPass(), + ConvertConv1dToConv2dPass(), ] if not qat_mode: diff --git a/backends/nxp/backend/ir/converter/node_converters/ops_converters/convolution_converter.py b/backends/nxp/backend/ir/converter/node_converters/ops_converters/convolution_converter.py index 148b90a331e..5fa994be7ae 100644 --- a/backends/nxp/backend/ir/converter/node_converters/ops_converters/convolution_converter.py +++ b/backends/nxp/backend/ir/converter/node_converters/ops_converters/convolution_converter.py @@ -15,7 +15,6 @@ from executorch.backends.nxp.backend.ir.converter.conversion import ( aten_translator, common, - translator, ) from executorch.backends.nxp.backend.ir.converter.conversion.common import try_get_input from executorch.backends.nxp.backend.ir.converter.conversion.translator import ( @@ -42,7 +41,6 @@ from executorch.backends.nxp.backend.ir.tflite_generator.builtin_options import ( conv_2d_options, depthwise_conv_2d_options, - reshape_options, transpose_conv_options, ) from executorch.backends.nxp.backend.neutron_target_spec import NeutronTargetSpec @@ -70,8 +68,9 @@ def _is_supported_on_target( return False if conv_params.transposed: - # TransposeConv1d is not supported on Neutron - if len(conv_params.dilation) == 1: + # TransposeConv2d with groups > 1 is not supported + # TODO: split into multiple convs with groups = 1 + if conv_params.groups > 1: return False if not node_is_effectively_static_tensor(weights, parameters_mapping): # Only supported if the weights are static, because TFLite `TransposeConv` uses permuted @@ -187,99 +186,6 @@ def _get_convolution_arguments( groups, ) - def _convert_1d_conv( - self, t_op: tflite_model.Operator, conv_params: ConvParameters - ) -> list[tflite_model.Operator]: - """Convert the 'Conv' operator with a 1D kernel to TFLite 'Conv2D'. - TFLite doesn't support 1D convolution, but this behaviour can be represented using - Reshape -> Conv2D -> Reshape. - The first reshape introduces a 4th dimension with size 1. The second Reshape removes the temporary dimension. - """ - # -- Calculate the shapes for equivalent 2D convolution -- - conv_2d_input_shape = translator.nhc_dimensions_to_nhwc( - t_op.tmp_inputs[0].shape.vector - ) - conv_2d_weight_shape = translator.nhc_dimensions_to_nhwc( - t_op.tmp_inputs[1].shape.vector - ) - conv_2d_output_shape = translator.nhc_dimensions_to_nhwc( - t_op.tmp_outputs[0].shape.vector - ) - - # -- Generate tensors taking part in the conversion -- - reshape1_input = t_op.tmp_inputs[0] - - reshape1_output = self.builder.duplicate_tensor( - reshape1_input, name_suffix="_4D_" - ) - reshape1_output.shape = tflite_model.Shape(conv_2d_input_shape) - - reshape2_input = self.builder.duplicate_tensor( - t_op.tmp_outputs[0], name_suffix="_4D_" - ) - reshape2_input.shape = tflite_model.Shape(conv_2d_output_shape) - - reshape2_output = t_op.tmp_outputs[0] - - pre_reshapes = [] - - # Extend the weights tensor to 4D - weights_tensor = t_op.tmp_inputs[1] - if tensor_has_data(weights_tensor): - # Do it statically - weights_tensor.shape = tflite_model.Shape(conv_2d_weight_shape) - weights_tensor.tmp_buffer.data = weights_tensor.tmp_buffer.data.reshape( - conv_2d_weight_shape - ) - - else: - # Add a Reshape before the weights tensor - new_weights_tensor = self.builder.duplicate_tensor( - weights_tensor, name_suffix="_4D_" - ) - new_weights_tensor.shape = tflite_model.Shape(conv_2d_weight_shape) - - weight_reshape = tflite_model.Operator( - builtin_options=reshape_options.Reshape(conv_2d_weight_shape) - ) - weight_reshape.tmp_inputs = [weights_tensor] - weight_reshape.tmp_outputs = [new_weights_tensor] - - pre_reshapes.append(weight_reshape) - - # Save the new weights tensor, to assign it later. - weights_tensor = new_weights_tensor - - # -- Create the new operators -- - reshape1 = tflite_model.Operator( - builtin_options=reshape_options.Reshape(conv_2d_input_shape) - ) - reshape1.tmp_inputs = [reshape1_input] - reshape1.tmp_outputs = [reshape1_output] - pre_reshapes.append(reshape1) - - reshape2 = tflite_model.Operator( - builtin_options=reshape_options.Reshape(reshape2_output.shape.vector) - ) - reshape2.tmp_inputs = [reshape2_input] - reshape2.tmp_outputs = [reshape2_output] - - # Assign the new input and output of the Conv2D - t_op.tmp_inputs = [reshape1_output, weights_tensor] + t_op.tmp_inputs[ - 2: - ] # Add bias as well, if present - t_op.tmp_outputs = [reshape2_input] - - # Extend all Conv attributes to 2D - common.extend_1d_stride_to_2d(conv_params.stride) - common.extend_1d_dilation_to_2d(conv_params.dilation) - common.extend_1d_padding_to_2d(conv_params.padding) - - # Convert the now 2D Conv - converted_conv_ops = self._convert_2d_conv(t_op, conv_params) - - return pre_reshapes + converted_conv_ops + [reshape2] - # noinspection PyPep8Naming def _convert_unpadded_2D( self, t_op: tflite_model.Operator, conv_params: ConvParameters @@ -523,9 +429,7 @@ def convert(self, node: Node): ) rank = t_op.tmp_inputs[1].shape.len() - if rank == 3: # Conv1D - ops_to_add = self._convert_1d_conv(t_op, conv_params) - elif rank == 4: # Conv2D + if rank == 4: # Conv2D ops_to_add = self._convert_2d_conv(t_op, conv_params) else: raise NotImplementedError( diff --git a/backends/nxp/quantizer/neutron_quantizer.py b/backends/nxp/quantizer/neutron_quantizer.py index 73c3167d728..0c46678b25a 100644 --- a/backends/nxp/quantizer/neutron_quantizer.py +++ b/backends/nxp/quantizer/neutron_quantizer.py @@ -23,7 +23,6 @@ BMMPattern, CatPattern, ClampPattern, - Conv1dPattern, Conv2dPattern, ConvTranspose2dPattern, DropoutPattern, @@ -266,9 +265,10 @@ def __init__(self, neutron_target_spec: NeutronTargetSpec, is_qat: bool = False) OpQuantizer(BMMPattern(is_qat=is_qat), static_qconfig), OpQuantizer(CatPattern(is_qat=is_qat), static_qconfig), OpQuantizer(ClampPattern(is_qat=is_qat), static_qconfig), - OpQuantizer(Conv1dPattern(is_qat=is_qat), static_qconfig), OpQuantizer(Conv2dPattern(self, is_qat=is_qat), static_qconfig), - OpQuantizer(ConvTranspose2dPattern(is_qat=is_qat), static_qconfig), + OpQuantizer( + ConvTranspose2dPattern(self, is_qat=is_qat), static_qconfig + ), OpQuantizer(DropoutPattern(is_qat=is_qat), static_qconfig), OpQuantizer(FlattenPattern(is_qat=is_qat), static_qconfig), OpQuantizer(HardTanhPattern(is_qat=is_qat), static_qconfig), diff --git a/backends/nxp/quantizer/patterns.py b/backends/nxp/quantizer/patterns.py index 60afa6bf4d2..cf0026094df 100644 --- a/backends/nxp/quantizer/patterns.py +++ b/backends/nxp/quantizer/patterns.py @@ -7,10 +7,14 @@ from abc import ABC, abstractmethod from dataclasses import dataclass, field +from functools import partial import torch -from executorch.backends.nxp.quantizer.utils import get_bias_qparams +from executorch.backends.nxp.quantizer.utils import ( + get_bias_qparams, + get_padded_bias_qparams, +) from torch import fx from torch._ops import OpOverload from torch.fx import Node @@ -482,16 +486,6 @@ def get_anchors( ) -class Conv1dPattern(ConvPattern): - def partition_types(self) -> list[OpOverload]: - return [torch.ops.aten.conv1d.default] - - -class ConvTranspose1dPattern(ConvPattern): - def partition_types(self) -> list[OpOverload]: - return [torch.ops.aten.conv_transpose1d.default] - - class Conv2dPattern(ConvPattern): def __init__(self, neutron_quantizer, is_qat: bool = False): super().__init__(is_qat=is_qat) @@ -572,6 +566,14 @@ def get_anchors( class ConvTranspose2dPattern(QuantizationPattern): + def __init__(self, neutron_quantizer, is_qat: bool = False): + super().__init__(is_qat=is_qat) + + self.neutron_quantizer = neutron_quantizer + self.neutron_target_info = ( + self.neutron_quantizer.neutron_target_spec.neutron_target_info + ) + def partition_types(self) -> list[OpOverload]: return [torch.ops.aten.conv_transpose2d.input] @@ -580,12 +582,25 @@ def get_anchors( ) -> PartitionAnchors: conv_node = fused_partition[0].nodes[-1] + # When `groups` > 1, the per-channel weight qparams have shape (`out_channels` / `groups`), + # but bias qparams have shape (`out_channels`) - not divided by `groups`. + # So the weight qparams must be expanded to match the shape correctly. + groups = 1 if len(conv_node.args) < 7 else conv_node.args[6] + if groups > 1: + out_channels = conv_node.meta["val"].shape[1] + derive_qparams_fn = partial( + get_padded_bias_qparams, out_channels=out_channels + ) + + else: + derive_qparams_fn = get_bias_qparams + bias_quantization_qspec = DerivedQuantizationSpec( derived_from=[ (conv_node.args[0], conv_node), (conv_node.args[1], conv_node), ], - derive_qparams_fn=get_bias_qparams, + derive_qparams_fn=derive_qparams_fn, dtype=torch.int32, quant_min=-(2**31) + 1, quant_max=2**31 - 1, @@ -593,14 +608,21 @@ def get_anchors( ch_axis=0, ) - weight_observer_or_fake_quant_ctr = PerChannelMinMaxObserver + w_ch_axis = 1 + weight_observer_or_fake_quant_ctr = ( + FakeQuantize.with_args( + observer=MovingAveragePerChannelMinMaxObserver, ch_axis=w_ch_axis + ) + if self.is_qat + else PerChannelMinMaxObserver.with_args(ch_axis=w_ch_axis) + ) weight_quantization_spec = QuantizationSpec( dtype=torch.int8, observer_or_fake_quant_ctr=weight_observer_or_fake_quant_ctr, quant_min=-127, quant_max=127, qscheme=torch.per_channel_symmetric, - ch_axis=1, + ch_axis=w_ch_axis, ) # Keep bias empty if not supplied @@ -608,20 +630,33 @@ def get_anchors( if len(conv_node.args) > 2 and conv_node.args[2] is not None: bias = [(conv_node, NodeArgsIdx(2), bias_quantization_qspec)] - output_specs = [(conv_node,)] + # If the following node is a fusable activation, quantize together with activation + output = [(conv_node,)] + if len( + conv_node.users + ) == 1 and self.neutron_target_info.is_supported_fused_activation__aten( + activation := next(iter(conv_node.users)) + ): + activation_quantizer = self.neutron_quantizer.op_to_quantizer[ + activation.target + ] + activation_quantizer.annotate(gm) + output = [] + activation.meta["quantization_annotation"].input_qspec_map = {} + # In order for QAT to be numerically correct, there should be no quantization between # convolution node and batch norm node. if self.is_qat: conv_users = conv_node.users possibly_bn = list(conv_users.keys())[0] if len(conv_users) == 1 else None if possibly_bn and _is_batch_norm(possibly_bn): - output_specs = [] + output = [] return PartitionAnchors( inputs=[(conv_node, NodeArgsIdx(0))], weights=[(conv_node, NodeArgsIdx(1), weight_quantization_spec)], biases=bias, - output=output_specs, + output=output, ) diff --git a/backends/nxp/quantizer/utils.py b/backends/nxp/quantizer/utils.py index da2448fb773..fccd29b245e 100644 --- a/backends/nxp/quantizer/utils.py +++ b/backends/nxp/quantizer/utils.py @@ -73,6 +73,32 @@ def get_bias_qparams( return bias_scale, bias_zero_point +def get_padded_bias_qparams( + obs_or_fqs: List[ObserverOrFakeQuantize], + out_channels: int | None = None, +) -> Tuple[torch.Tensor, torch.Tensor]: + act_scale, _ = obs_or_fqs[0].calculate_qparams() + weight_scale, _ = obs_or_fqs[1].calculate_qparams() + + # It may happen that `torch.ao` incorrectly sets the weight qparams, not matching bias qparams. + # If `out_channels` is given, ensure bias qparams are per-output-channel: + # So for example w = [w1, w2, w3] -> [w1, w2, w3, w1, w2, w3, ...] + if out_channels is not None: + weight_scale = weight_scale.flatten() + if weight_scale.numel() != out_channels: + if out_channels % weight_scale.numel() != 0: + raise RuntimeError( + "Weight qparams cannot be repeated if not divisible by `out_channels`." + ) + weight_scale = weight_scale.repeat(out_channels // weight_scale.numel()) + + act_scale = act_scale.flatten()[0] + + bias_scale = act_scale * weight_scale + bias_zero_point = torch.zeros_like(bias_scale, dtype=torch.int64) + return bias_scale, bias_zero_point + + def get_aten_node_target_partitions( graph: torch.fx.Graph, wanted_original_aten_op: List[OpOverload], diff --git a/backends/nxp/tests/ir/converter/node_converter/test_bmm_converter.py b/backends/nxp/tests/ir/converter/node_converter/test_bmm_converter.py index 937954b42a9..8f4134d9c57 100644 --- a/backends/nxp/tests/ir/converter/node_converter/test_bmm_converter.py +++ b/backends/nxp/tests/ir/converter/node_converter/test_bmm_converter.py @@ -9,19 +9,10 @@ from executorch.backends.nxp.backend.edge_program_converter import ( EdgeProgramToIRConverter, ) -from executorch.backends.nxp.backend.ir.converter.conversion import translator -from executorch.backends.nxp.backend.neutron_operator_support import ( - transposition_is_supported_on_neutron, -) -from executorch.backends.nxp.tests.executorch_pipeline import ( - neutron_target_spec, - to_quantized_edge_program, -) +from executorch.backends.nxp.tests.executorch_pipeline import to_quantized_edge_program from executorch.backends.nxp.tests.executors import ( convert_run_compare, graph_contains_any_of_ops, - ToChannelFirstPreprocess, - ToChannelLastPreprocess, ) from executorch.backends.nxp.tests.models import BatchMatMulConvModel, BatchMatMulModel from executorch.backends.nxp.tests.use_qat import * # noqa F403 @@ -105,31 +96,6 @@ def test_convert_bmm__unsupported_shape(input_shape_x1, input_shape_x2, use_qat) assert graph_contains_any_of_ops(delegated_ep.graph, [Bmm]) -def test_convert_bmm__unsupported_dim_order(mocker, use_qat): - n1 = n2 = 5 - w1 = c2 = 16 - c1 = 8 - w2 = 24 - - x_input_shape = (n1, c1, w1) - y_input_shape = (n2, c2, w2) - - model = BatchMatMulConvModel(in_channels=c1, out_channels=c1) - - delegated_ep = to_quantized_edge_program( - model, - [x_input_shape, y_input_shape], - use_neutron_for_format_conversion=False, - use_qat=use_qat, - ).exported_program() - - # Make sure the `bmm` was NOT delegated. - # For `bmm` to work in channels-first order, support for 3D `transpose` is needed, - # which is not implemented in NXP Executorch backend yet. - assert not graph_contains_any_of_ops(delegated_ep.graph, [ExecutorchDelegateCall]) - assert graph_contains_any_of_ops(delegated_ep.graph, [Bmm]) - - def test_convert_bmm__channels_first(mocker, use_qat): # These must match: # - `n1 = n2` @@ -145,19 +111,6 @@ def test_convert_bmm__channels_first(mocker, use_qat): x_input_shape = (n1, c1, w1) y_input_shape = (n2, c2, w2) - # Channels-last shape of the output before the newly-inserted `transpose` - # converts it to channels-first - output_shape = (n1, w2, c1) - - perm = translator.create_channels_first_to_channels_last_permutation( - len(output_shape), return_list=True - ) - transp_not_supported = not transposition_is_supported_on_neutron( - output_shape, perm, neutron_target_spec - ) - if transp_not_supported: - pytest.skip("3D dim order swap not implemented.") - model = BatchMatMulConvModel(in_channels=c1, out_channels=c1) converter_spy = mocker.spy(EdgeProgramToIRConverter, "convert_program") @@ -198,6 +151,4 @@ def test_convert_bmm__channels_first(mocker, use_qat): tfl_model=neutron_ir_model, input_data=input_data, atol=1, - tflite_input_preprocess=ToChannelLastPreprocess(), - tflite_output_preprocess=ToChannelFirstPreprocess(), ) diff --git a/backends/nxp/tests/ir/converter/node_converter/test_conv_converter.py b/backends/nxp/tests/ir/converter/node_converter/test_conv_converter.py index 785bd5cc854..5580d0ca729 100644 --- a/backends/nxp/tests/ir/converter/node_converter/test_conv_converter.py +++ b/backends/nxp/tests/ir/converter/node_converter/test_conv_converter.py @@ -27,7 +27,7 @@ ToChannelFirstPreprocess, ToChannelLastPreprocess, ) -from executorch.backends.nxp.tests.models import Conv1dModule, Conv2dModule +from executorch.backends.nxp.tests.models import Conv2dModule from executorch.exir.dialects._ops import ops as exir_ops from torch.export import ExportedProgram from executorch.backends.nxp.tests.use_qat import * # noqa F403 @@ -39,218 +39,6 @@ def reseed_model_per_test_run(): np.random.seed(23) -@pytest.mark.parametrize("bias", [False, True]) -@pytest.mark.parametrize("stride", [1, 2]) -@pytest.mark.parametrize("dilation", [2, 1]) -@pytest.mark.parametrize("kernel_size", [(1,), (3,)]) -def test_conv1d_quant_conversion(bias, stride, dilation, kernel_size, mocker, use_qat): - input_shape = (1, 4, 16) - model = Conv1dModule( - bias=bias, stride=stride, dilation=dilation, kernel_size=kernel_size - ) - converter_spy = mocker.spy(EdgeProgramToIRConverter, "convert_program") - ops_spy = mocker.spy(ModelBuilder, "finish") - - # Run conversion - _ = to_quantized_edge_program(model, input_shape, use_qat=use_qat) - - # Capture generated model - tflite_flatbuffers_model, io_formats = converter_spy.spy_return - - # Capture converted program - exported_program: ExportedProgram = converter_spy.call_args.args[1] - - input_data = (np.random.random(input_shape).astype(np.float32) * 50).astype(np.int8) - - convert_run_compare( - exported_program, - tflite_input_preprocess=ToChannelLastPreprocess(), - tfl_model=tflite_flatbuffers_model, - tflite_output_preprocess=ToChannelFirstPreprocess(), - input_data=input_data, - atol=1.0, - ) - - # Capture IR model ops - conversion_result = ops_spy.spy_return - ops = conversion_result.sub_graphs[0].operators.vector - - assert len(ops) == 3 - assert ops[0].builtin_options.operator_type == BuiltinOperator.RESHAPE - assert ops[1].builtin_options.operator_type == BuiltinOperator.CONV_2D - assert ops[2].builtin_options.operator_type == BuiltinOperator.RESHAPE - - -@pytest.mark.parametrize("stride", [1, 2]) -@pytest.mark.parametrize("dilation", [2, 1]) -@pytest.mark.parametrize("kernel_size", [(1,), (3,)]) -@pytest.mark.parametrize("padding", [(1,), 2]) -def test_conv1d_quant_conversion__padded( - stride, dilation, kernel_size, padding, mocker, use_qat -): - input_shape = (1, 4, 16) - model = Conv1dModule( - stride=stride, dilation=dilation, kernel_size=kernel_size, padding=padding - ) - converter_spy = mocker.spy(EdgeProgramToIRConverter, "convert_program") - ops_spy = mocker.spy(ModelBuilder, "finish") - - # Run conversion - _ = to_quantized_edge_program(model, input_shape, use_qat=use_qat) - - # Capture generated model - tflite_flatbuffers_model, io_formats = converter_spy.spy_return - - # Capture converted program - exported_program: ExportedProgram = converter_spy.call_args.args[1] - - input_data = (np.random.random(input_shape).astype(np.float32) * 50).astype(np.int8) - - convert_run_compare( - exported_program, - tflite_input_preprocess=ToChannelLastPreprocess(), - tfl_model=tflite_flatbuffers_model, - tflite_output_preprocess=ToChannelFirstPreprocess(), - input_data=input_data, - atol=1.0, - ) - - # Capture IR model ops - conversion_result = ops_spy.spy_return - ops = conversion_result.sub_graphs[0].operators.vector - - assert len(ops) == 4 - assert ops[0].builtin_options.operator_type == BuiltinOperator.RESHAPE - assert ops[1].builtin_options.operator_type == BuiltinOperator.PADV2 - assert ops[2].builtin_options.operator_type == BuiltinOperator.CONV_2D - assert ops[3].builtin_options.operator_type == BuiltinOperator.RESHAPE - - # Make sure the padding used the `zero-point`. - pad_value = ops[1].tmp_inputs[2].tmp_buffer.data.item() - assert ( - pad_value == ops[1].tmp_inputs[0].quantization.zero_point[0] - ) # `Pad` input zp. - assert ( - pad_value == ops[1].tmp_outputs[0].quantization.zero_point[0] - ) # `Pad` output zp. - assert ( - pad_value == ops[2].tmp_inputs[0].quantization.zero_point[0] - ) # `Conv` input zp. - - -@pytest.mark.parametrize("bias", [False, True]) -@pytest.mark.parametrize("stride", [1, 2]) -@pytest.mark.parametrize("dilation", [2, 1]) -@pytest.mark.parametrize("kernel_size", [(1,), (3,)]) -def test_conv1d_quant_conversion__depthwise( - bias, stride, dilation, kernel_size, mocker, use_qat -): - input_shape = (1, 4, 16) - group = input_shape[1] - model = Conv1dModule( - bias=bias, - group=group, - in_channels=group, - out_channels=group, - stride=stride, - dilation=dilation, - kernel_size=kernel_size, - ) - converter_spy = mocker.spy(EdgeProgramToIRConverter, "convert_program") - ops_spy = mocker.spy(ModelBuilder, "finish") - - # Run conversion - _ = to_quantized_edge_program(model, input_shape, use_qat=use_qat) - - # Capture generated model - tflite_flatbuffers_model, io_formats = converter_spy.spy_return - - # Capture converted program - exported_program: ExportedProgram = converter_spy.call_args.args[1] - - input_data = (np.random.random(input_shape).astype(np.float32) * 50).astype(np.int8) - - convert_run_compare( - exported_program, - tflite_input_preprocess=ToChannelLastPreprocess(), - tfl_model=tflite_flatbuffers_model, - tflite_output_preprocess=ToChannelFirstPreprocess(), - input_data=input_data, - atol=1.0, - ) - - # Capture IR model ops - ops = ops_spy.spy_return.sub_graphs[0].operators.vector - - assert len(ops) == 3 - assert ops[0].builtin_options.operator_type == BuiltinOperator.RESHAPE - assert ops[1].builtin_options.operator_type == BuiltinOperator.DEPTHWISE_CONV_2D - assert ops[2].builtin_options.operator_type == BuiltinOperator.RESHAPE - - -@pytest.mark.parametrize("stride", [1, 2]) -@pytest.mark.parametrize("dilation", [2, 1]) -@pytest.mark.parametrize("kernel_size", [(1,), (3,)]) -@pytest.mark.parametrize("padding", [(1,), 2]) -def test_conv1d_quant_conversion__depthwise__padded( - stride, dilation, kernel_size, padding, mocker, use_qat -): - input_shape = (1, 4, 16) - group = input_shape[1] - model = Conv1dModule( - group=group, - in_channels=group, - out_channels=group, - stride=stride, - dilation=dilation, - kernel_size=kernel_size, - padding=padding, - ) - converter_spy = mocker.spy(EdgeProgramToIRConverter, "convert_program") - ops_spy = mocker.spy(ModelBuilder, "finish") - - # Run conversion - _ = to_quantized_edge_program(model, input_shape, use_qat=use_qat) - - # Capture generated model - tflite_flatbuffers_model, io_formats = converter_spy.spy_return - - # Capture converted program - exported_program: ExportedProgram = converter_spy.call_args.args[1] - - input_data = (np.random.random(input_shape).astype(np.float32) * 50).astype(np.int8) - - convert_run_compare( - exported_program, - tflite_input_preprocess=ToChannelLastPreprocess(), - tfl_model=tflite_flatbuffers_model, - tflite_output_preprocess=ToChannelFirstPreprocess(), - input_data=input_data, - atol=1.0, - ) - - # Capture IR model ops - ops = ops_spy.spy_return.sub_graphs[0].operators.vector - - assert len(ops) == 4 - assert ops[0].builtin_options.operator_type == BuiltinOperator.RESHAPE - assert ops[1].builtin_options.operator_type == BuiltinOperator.PADV2 - assert ops[2].builtin_options.operator_type == BuiltinOperator.DEPTHWISE_CONV_2D - assert ops[3].builtin_options.operator_type == BuiltinOperator.RESHAPE - - # Make sure the padding used the `zero-point`. - pad_value = ops[1].tmp_inputs[2].tmp_buffer.data.item() - assert ( - pad_value == ops[1].tmp_inputs[0].quantization.zero_point[0] - ) # `Pad` input zp. - assert ( - pad_value == ops[1].tmp_outputs[0].quantization.zero_point[0] - ) # `Pad` output zp. - assert ( - pad_value == ops[2].tmp_inputs[0].quantization.zero_point[0] - ) # `Conv` input zp. - - @pytest.mark.parametrize( "model, input_shape", [ diff --git a/backends/nxp/tests/models.py b/backends/nxp/tests/models.py index 17bea708352..f4ad2b68b9d 100644 --- a/backends/nxp/tests/models.py +++ b/backends/nxp/tests/models.py @@ -14,14 +14,14 @@ class Conv1dModule(torch.nn.Module): def __init__( self, - bias: bool = True, - dilation: Union[int, tuple[int, int]] = 1, in_channels: int = 4, - kernel_size: Union[int, tuple[int, int]] = 3, out_channels: int = 8, - padding: Union[str, int, Collection[int]] = 0, + kernel_size: Union[int, tuple[int, int]] = 3, stride: Union[int, tuple[int, int]] = 2, - group: int = 1, + padding: Union[str, int, tuple[int]] = 0, + dilation: Union[int, tuple[int, int]] = 1, + groups: int = 1, + bias: bool = True, ): super().__init__() @@ -33,13 +33,44 @@ def __init__( padding=padding, dilation=dilation, bias=bias, - groups=group, + groups=groups, ) def forward(self, x): return self.conv(x) +class ConvTranspose1dModule(torch.nn.Module): + def __init__( + self, + in_channels: int = 4, + out_channels: int = 8, + kernel_size: Union[int, tuple[int, int]] = 3, + stride: Union[int, tuple[int, int]] = 1, + padding: Union[int, tuple[int]] = 0, + output_padding: Union[int, tuple[int]] = 0, + groups: int = 1, + bias: bool = True, + dilation: Union[int, tuple[int, int]] = 1, + ): + super().__init__() + + self.conv_transp = torch.nn.ConvTranspose1d( + in_channels=in_channels, + out_channels=out_channels, + kernel_size=kernel_size, + stride=stride, + padding=padding, + output_padding=output_padding, + groups=groups, + bias=bias, + dilation=dilation, + ) + + def forward(self, x): + return self.conv_transp(x) + + class Conv2dModule(torch.nn.Module): def __init__( self, diff --git a/backends/nxp/tests/test_batch_norm_fusion.py b/backends/nxp/tests/test_batch_norm_fusion.py index 02014aae752..5648f29b9be 100644 --- a/backends/nxp/tests/test_batch_norm_fusion.py +++ b/backends/nxp/tests/test_batch_norm_fusion.py @@ -112,7 +112,7 @@ def test_batch_norm_conv_fusing__full_pipeline__1d(bias: bool): module, tuple(input_shape) ).exported_program() - assert len(edge_program.graph.nodes) == 15 + assert len(edge_program.graph.nodes) == 21 assert not graph_contains_any_of_ops(edge_program.graph, batch_norm_target_ops) diff --git a/backends/nxp/tests/test_convert_1d_conv_to_2d.py b/backends/nxp/tests/test_convert_1d_conv_to_2d.py new file mode 100644 index 00000000000..5c26313c6e2 --- /dev/null +++ b/backends/nxp/tests/test_convert_1d_conv_to_2d.py @@ -0,0 +1,383 @@ +# Copyright 2026 NXP +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + +import numpy as np +import pytest +import torch +from executorch.backends.nxp.aten_passes.neutron_aten_pass_manager import ( + ConvertConv1dToConv2dPass, + NeutronAtenPassManager, +) + +from executorch.backends.nxp.backend.edge_program_converter import ( + EdgeProgramToIRConverter, +) +from executorch.backends.nxp.tests.executorch_pipeline import ( + neutron_target_spec, + to_quantized_edge_program, +) +from executorch.backends.nxp.tests.executors import ( + convert_run_compare, + graph_contains_any_of_ops, +) +from executorch.backends.nxp.tests.models import Conv1dModule, ConvTranspose1dModule +from executorch.exir.dialects._ops import ops as exir_ops +from torch.export import ExportedProgram + + +@pytest.fixture(autouse=True) +def reseed_model_per_test_run(): + torch.manual_seed(23) + np.random.seed(23) + + +AtenConv1d = torch.ops.aten.conv1d.default +AtenConv2d = torch.ops.aten.conv2d.default +AtenConvTranspose1d = torch.ops.aten.conv_transpose1d.default +AtenConvTranspose2d = torch.ops.aten.conv_transpose2d.input +AtenSqueeze = torch.ops.aten.squeeze.dim +AtenUnsqueeze = torch.ops.aten.unsqueeze.default + +EdgeConvolution = exir_ops.edge.aten.convolution.default +ExecutorchDelegateCall = torch.ops.higher_order.executorch_call_delegate + + +@pytest.mark.parametrize( + "input_shape, kernel_size, stride, padding, dilation, groups, bias", + [ + pytest.param((3, 7, 23), 3, 1, 0, 1, 1, True, id="All default."), + pytest.param( + (3, 7, 23), 2, 1, 0, 1, 1, True, id="kernel_size=2, otherwise all default." + ), + pytest.param( + (3, 7, 23), 3, 2, 0, 1, 1, True, id="stride=2, otherwise all default." + ), + pytest.param( + (3, 7, 23), 3, 1, 1, 1, 1, True, id="pad=1, otherwise all default." + ), + pytest.param( + (3, 7, 23), 3, 1, 0, 2, 1, True, id="dilation=2, otherwise all default." + ), + pytest.param( + (3, 7, 23), 3, 1, 0, 1, 7, True, id="group=7, otherwise all default." + ), + pytest.param( + (3, 7, 23), 3, 1, 0, 1, 1, False, id="bias=False, otherwise all default." + ), + pytest.param((3, 7, 23), 5, 3, 2, 3, 7, False, id="Nothing is default."), + ], +) +def test_convert_conv_1d_to_conv2d( + input_shape, kernel_size, stride, padding, dilation, groups, bias +): + in_channels = input_shape[1] + out_channels = 14 + model = Conv1dModule( + in_channels=in_channels, + out_channels=out_channels, + kernel_size=kernel_size, + stride=stride, + padding=padding, + dilation=dilation, + groups=groups, + bias=bias, + ) + example_input = torch.rand(input_shape) + + exir_program_aten = torch.export.export(model, (example_input,)).module() + + # Make sure `aten.conv1d` is present. + assert graph_contains_any_of_ops(exir_program_aten.graph, [AtenConv1d]) + outputs_before = [o.detach().numpy() for o in exir_program_aten(example_input)] + + # Apply the optimization. + NeutronAtenPassManager(neutron_target_spec, [ConvertConv1dToConv2dPass()])( + exir_program_aten + ) + + # Make sure no `aten.conv1d` nodes are in the model. + assert not graph_contains_any_of_ops( + exir_program_aten.graph, + [ + AtenConv1d, + ], + ) + + # Check correct count and placement. + nodes = list(exir_program_aten.graph.nodes) + + conv_nodes = [i for i, n in enumerate(nodes) if n.target == AtenConv2d] + assert len(conv_nodes) == 1 + i = conv_nodes[0] + + assert nodes[i - 1].target == AtenUnsqueeze + assert nodes[i].target == AtenConv2d + assert nodes[i + 1].target == AtenSqueeze + + outputs_after = [o.detach().numpy() for o in exir_program_aten(example_input)] + + # Make sure the model still produces the exact same output. + assert len(outputs_before) == len(outputs_after) + for i in range(len(outputs_before)): + assert np.allclose(outputs_before[i], outputs_after[i]) + + +# Note: The first case is the default; the remaining cases are chosen to test various parameter combinations. +# To satisfy requirements for delegation, some parameters could not be chosen arbitrarily. +@pytest.mark.parametrize( + "input_shape, kernel_size, stride, padding, output_padding, groups, bias, dilation", + [ + pytest.param((3, 7, 23), 3, 1, 0, 0, 1, True, 1, id="All default."), + pytest.param( + (3, 7, 23), + 2, + 1, + 0, + 0, + 1, + True, + 1, + id="kernel_size=2, otherwise all default.", + ), + pytest.param( + (3, 7, 23), 3, 2, 0, 0, 1, True, 1, id="stride=2, otherwise all default." + ), + pytest.param( + (3, 7, 23), 3, 1, 1, 0, 1, True, 1, id="pad=1, otherwise all default." + ), + pytest.param( + (3, 7, 23), + 3, + 2, + 0, + 1, + 1, + True, + 1, + id="output_padding=1 (stride=2 - restriction from definition), otherwise all default.", + ), + pytest.param( + (3, 7, 23), 3, 1, 0, 0, 7, True, 1, id="group=7, otherwise all default." + ), + pytest.param( + (3, 7, 23), 3, 1, 0, 0, 1, False, 1, id="bias=False, otherwise all default." + ), + pytest.param( + (3, 7, 23), 3, 1, 0, 0, 1, True, 2, id="dilation=2, otherwise all default." + ), + pytest.param((3, 7, 23), 5, 3, 2, 1, 7, False, 3, id="Nothing is default."), + ], +) +def test_convert_conv_1d_transp_to_conv2d_transp( + input_shape, kernel_size, stride, padding, output_padding, groups, bias, dilation +): + in_channels = input_shape[1] + out_channels = 14 + model = ConvTranspose1dModule( + in_channels=in_channels, + out_channels=out_channels, + kernel_size=kernel_size, + stride=stride, + padding=padding, + output_padding=output_padding, + groups=groups, + bias=bias, + dilation=dilation, + ) + example_input = torch.rand(input_shape) + + exir_program_aten = torch.export.export(model, (example_input,)).module() + + # Make sure `aten.conv_transpose1d` is present. + assert graph_contains_any_of_ops(exir_program_aten.graph, [AtenConvTranspose1d]) + outputs_before = [o.detach().numpy() for o in exir_program_aten(example_input)] + + # Apply the optimization. + NeutronAtenPassManager(neutron_target_spec, [ConvertConv1dToConv2dPass()])( + exir_program_aten + ) + + # Make sure no `aten.conv_transpose1d` nodes are in the model. + assert not graph_contains_any_of_ops( + exir_program_aten.graph, + [ + AtenConvTranspose1d, + ], + ) + + # Check correct count and placement. + nodes = list(exir_program_aten.graph.nodes) + + conv_nodes = [i for i, n in enumerate(nodes) if n.target == AtenConvTranspose2d] + assert len(conv_nodes) == 1 + i = conv_nodes[0] + + assert nodes[i - 1].target == AtenUnsqueeze + assert nodes[i].target == AtenConvTranspose2d + assert nodes[i + 1].target == AtenSqueeze + + outputs_after = [o.detach().numpy() for o in exir_program_aten(example_input)] + + # Make sure the model still produces the exact same output. + assert len(outputs_before) == len(outputs_after) + for i in range(len(outputs_before)): + assert np.allclose(outputs_before[i], outputs_after[i]) + + +# Note: The first case is the default; the remaining cases are chosen to test various parameter combinations. +# To satisfy requirements for delegation, some parameters could not be chosen arbitrarily. +@pytest.mark.parametrize("input_shape", [(1, 8, 24), (8, 24)]) +@pytest.mark.parametrize("use_qat", [True, False]) +@pytest.mark.parametrize( + "kernel_size, stride, padding, dilation, groups, bias", + [ + pytest.param(3, 1, 1, 1, 1, True, id="All default, except for padding = 1."), + pytest.param(1, 1, 0, 1, 1, True, id="kernel_size = 1"), + pytest.param(3, 2, 5, 1, 1, True, id="stride = 2"), + pytest.param(3, 1, 2, 2, 1, True, id="dilation = 2"), + pytest.param(3, 1, 1, 1, 1, False, id="bias = False, padding = 1"), + ], +) +def test_convert_conv_1d_to_conv2d_full_pipeline( + mocker, input_shape, kernel_size, stride, padding, dilation, groups, bias, use_qat +): + converter_spy = mocker.spy(EdgeProgramToIRConverter, "convert_program") + + in_channels = input_shape[1] if len(input_shape) == 3 else input_shape[0] + out_channels = 16 + + model = Conv1dModule( + in_channels=in_channels, + out_channels=out_channels, + kernel_size=kernel_size, + stride=stride, + padding=padding, + dilation=dilation, + groups=groups, + bias=bias, + ) + + delegated_ep = to_quantized_edge_program( + model, input_shape, use_qat=use_qat + ).exported_program() + + # Make sure no `conv1d` nodes are in the model. + assert not graph_contains_any_of_ops( + delegated_ep.graph, + [ + AtenConv1d, + ], + ) + + # Check correct count and placement. + nodes = list(delegated_ep.graph.nodes) + assert len(nodes) == 7 + assert nodes[3].target == ExecutorchDelegateCall + + # Capture generated model. + neutron_ir_model = converter_spy.spy_return[0] + exported_program: ExportedProgram = converter_spy.call_args.args[1] + + # Make sure `edge.aten.convolution.default` is in the model. + assert graph_contains_any_of_ops( + exported_program.graph, + [EdgeConvolution], + ) + + example_input = (np.random.random(input_shape).astype(np.float32) * 50).astype( + np.int8 + ) + convert_run_compare( + exported_program, + input_data=example_input, + tfl_model=neutron_ir_model, + ) + + +# Note: The first case is the default; the remaining cases are chosen to test various parameter combinations. +# To satisfy requirements for delegation, some parameters could not be chosen arbitrarily. +@pytest.mark.parametrize("input_shape", [(1, 8, 24), (8, 24)]) +@pytest.mark.parametrize("use_qat", [False, True]) +@pytest.mark.parametrize( + "kernel_size, stride, padding, output_padding, groups, bias, dilation", + [ + pytest.param(2, 2, 0, 0, 1, True, 1, id="All default."), + pytest.param(4, 2, 1, 0, 1, True, 1, id="kernel_size = 4 (and padding = 1)"), + pytest.param(4, 4, 0, 0, 1, True, 1, id="stride = 4 (and kernel_size = 4)"), + pytest.param( + 4, + 4, + 1, + 2, + 1, + True, + 1, + id="output_padding = 2 (and kernel_size = 4, stride = 4, padding = 1)", + ), + pytest.param(2, 2, 0, 0, 1, False, 1, id="bias=False"), + ], +) +def test_convert_conv_1d_to_conv2d_transp_full_pipeline( + mocker, + input_shape, + kernel_size, + stride, + padding, + output_padding, + groups, + bias, + dilation, + use_qat, +): + converter_spy = mocker.spy(EdgeProgramToIRConverter, "convert_program") + + in_channels = input_shape[1] if len(input_shape) == 3 else input_shape[0] + out_channels = 16 + model = ConvTranspose1dModule( + in_channels=in_channels, + out_channels=out_channels, + kernel_size=kernel_size, + stride=stride, + padding=padding, + output_padding=output_padding, + groups=groups, + bias=bias, + dilation=dilation, + ) + + # Run conversion. + delegated_ep = to_quantized_edge_program( + model, input_shape, use_qat=use_qat + ).exported_program() + + # Make sure no `aten.conv_transpose1d` nodes are in the model. + assert not graph_contains_any_of_ops( + delegated_ep.graph, + [AtenConvTranspose1d], + ) + + # Check correct count and placement. + nodes = list(delegated_ep.graph.nodes) + assert len(nodes) == 7 + assert nodes[3].target == ExecutorchDelegateCall + + # Capture generated model. + neutron_ir_model = converter_spy.spy_return[0] + exported_program: ExportedProgram = converter_spy.call_args.args[1] + + # Make sure `edge.aten.convolution.default` is in the model. + assert graph_contains_any_of_ops( + exported_program.graph, + [EdgeConvolution], + ) + + example_input = (np.random.random(input_shape).astype(np.float32) * 50).astype( + np.int8 + ) + convert_run_compare( + exported_program, + input_data=example_input, + tfl_model=neutron_ir_model, + ) diff --git a/backends/nxp/tests/test_quantizer.py b/backends/nxp/tests/test_quantizer.py index 5ab724bf28f..7fd07993dd5 100644 --- a/backends/nxp/tests/test_quantizer.py +++ b/backends/nxp/tests/test_quantizer.py @@ -713,4 +713,10 @@ def is_conv(node): ) for arg in conv_node_args ) - assert len(graph_nodes) == 15 + + # if model with `conv1d` or `conv_transpose1d` is used, then it is converted to the 2d variant + # and additional nodes, such as `squeeze` and `unsqueeze` are inserted. + if len(input_shape) == 3 or len(input_shape) == 2: + assert len(graph_nodes) == 21 + else: + assert len(graph_nodes) == 15 diff --git a/backends/nxp/tests/test_split_group_convolution.py b/backends/nxp/tests/test_split_group_convolution.py index 4baae4cf592..78cdd790a9b 100644 --- a/backends/nxp/tests/test_split_group_convolution.py +++ b/backends/nxp/tests/test_split_group_convolution.py @@ -151,7 +151,7 @@ def test_split_group_convolution__1d( in_channels=input_shape[1], out_channels=8 * group, # Make sure the output channels are multiple of 8, so the `cat` can be delegated. - group=group, + groups=group, stride=1, ) graph_module = torch.export.export(module, example_input).module()