|
99 | 99 | from azure.cli.command_modules.acs._consts import ( |
100 | 100 | CONST_OUTBOUND_TYPE_LOAD_BALANCER, |
101 | 101 | CONST_OUTBOUND_TYPE_MANAGED_NAT_GATEWAY, |
| 102 | + CONST_OUTBOUND_TYPE_USER_DEFINED_ROUTING, |
| 103 | + CONST_OUTBOUND_TYPE_USER_ASSIGNED_NAT_GATEWAY, |
102 | 104 | DecoratorEarlyExitException, |
103 | 105 | DecoratorMode, |
104 | 106 | ) |
@@ -4705,6 +4707,96 @@ def test_get_outbound_type(self): |
4705 | 4707 | CONST_OUTBOUND_TYPE_MANAGED_NAT_GATEWAY_V2, |
4706 | 4708 | ) |
4707 | 4709 |
|
| 4710 | + def test_get_outbound_type_update_udr_byo_vnet(self): |
| 4711 | + """Test that updating to UDR succeeds when the cluster has a BYO VNet (vnet_subnet_id is set on agentpool).""" |
| 4712 | + ctx = AKSPreviewManagedClusterContext( |
| 4713 | + self.cmd, |
| 4714 | + AKSManagedClusterParamDict({"outbound_type": "userDefinedRouting"}), |
| 4715 | + self.models, |
| 4716 | + decorator_mode=DecoratorMode.UPDATE, |
| 4717 | + ) |
| 4718 | + self.create_attach_agentpool_context(ctx) |
| 4719 | + # Simulate a BYO VNet cluster: agentpool has vnet_subnet_id set |
| 4720 | + agentpool = self.models.ManagedClusterAgentPoolProfile( |
| 4721 | + name="nodepool1", |
| 4722 | + vnet_subnet_id="/subscriptions/test/resourceGroups/rg/providers/Microsoft.Network/virtualNetworks/vnet/subnets/subnet", |
| 4723 | + ) |
| 4724 | + mc = self.models.ManagedCluster( |
| 4725 | + location="test_location", |
| 4726 | + agent_pool_profiles=[agentpool], |
| 4727 | + network_profile=self.models.ContainerServiceNetworkProfile( |
| 4728 | + load_balancer_sku="standard", |
| 4729 | + ), |
| 4730 | + ) |
| 4731 | + ctx.attach_mc(mc) |
| 4732 | + ctx.agentpool_context.attach_agentpool(agentpool) |
| 4733 | + # Should succeed — BYO VNet cluster can update to UDR |
| 4734 | + outbound_type = ctx._get_outbound_type(enable_validation=True) |
| 4735 | + self.assertEqual(outbound_type, CONST_OUTBOUND_TYPE_USER_DEFINED_ROUTING) |
| 4736 | + |
| 4737 | + def test_get_outbound_type_update_udr_managed_vnet(self): |
| 4738 | + """Test that updating to UDR fails with clear error when the cluster uses managed VNet (no vnet_subnet_id).""" |
| 4739 | + ctx = AKSPreviewManagedClusterContext( |
| 4740 | + self.cmd, |
| 4741 | + AKSManagedClusterParamDict({"outbound_type": "userDefinedRouting"}), |
| 4742 | + self.models, |
| 4743 | + decorator_mode=DecoratorMode.UPDATE, |
| 4744 | + ) |
| 4745 | + self.create_attach_agentpool_context(ctx) |
| 4746 | + # Simulate a managed VNet cluster: agentpool has no vnet_subnet_id |
| 4747 | + agentpool = self.models.ManagedClusterAgentPoolProfile( |
| 4748 | + name="nodepool1", |
| 4749 | + ) |
| 4750 | + mc = self.models.ManagedCluster( |
| 4751 | + location="test_location", |
| 4752 | + agent_pool_profiles=[agentpool], |
| 4753 | + network_profile=self.models.ContainerServiceNetworkProfile( |
| 4754 | + load_balancer_sku="standard", |
| 4755 | + ), |
| 4756 | + ) |
| 4757 | + ctx.attach_mc(mc) |
| 4758 | + ctx.agentpool_context.attach_agentpool(agentpool) |
| 4759 | + # Should fail with InvalidArgumentValueError for managed VNet clusters |
| 4760 | + with self.assertRaises(InvalidArgumentValueError): |
| 4761 | + ctx._get_outbound_type(enable_validation=True) |
| 4762 | + |
| 4763 | + def test_get_outbound_type_update_user_assigned_nat_gw_managed_vnet(self): |
| 4764 | + """Test that updating to userAssignedNATGateway fails with clear error when using managed VNet.""" |
| 4765 | + ctx = AKSPreviewManagedClusterContext( |
| 4766 | + self.cmd, |
| 4767 | + AKSManagedClusterParamDict({"outbound_type": "userAssignedNATGateway"}), |
| 4768 | + self.models, |
| 4769 | + decorator_mode=DecoratorMode.UPDATE, |
| 4770 | + ) |
| 4771 | + self.create_attach_agentpool_context(ctx) |
| 4772 | + agentpool = self.models.ManagedClusterAgentPoolProfile( |
| 4773 | + name="nodepool1", |
| 4774 | + ) |
| 4775 | + mc = self.models.ManagedCluster( |
| 4776 | + location="test_location", |
| 4777 | + agent_pool_profiles=[agentpool], |
| 4778 | + network_profile=self.models.ContainerServiceNetworkProfile( |
| 4779 | + load_balancer_sku="standard", |
| 4780 | + ), |
| 4781 | + ) |
| 4782 | + ctx.attach_mc(mc) |
| 4783 | + ctx.agentpool_context.attach_agentpool(agentpool) |
| 4784 | + with self.assertRaises(InvalidArgumentValueError): |
| 4785 | + ctx._get_outbound_type(enable_validation=True) |
| 4786 | + |
| 4787 | + def test_get_outbound_type_create_udr_no_subnet(self): |
| 4788 | + """Test that creating with UDR but no vnet_subnet_id raises RequiredArgumentMissingError.""" |
| 4789 | + ctx = AKSPreviewManagedClusterContext( |
| 4790 | + self.cmd, |
| 4791 | + AKSManagedClusterParamDict({"outbound_type": "userDefinedRouting"}), |
| 4792 | + self.models, |
| 4793 | + decorator_mode=DecoratorMode.CREATE, |
| 4794 | + ) |
| 4795 | + self.create_attach_agentpool_context(ctx) |
| 4796 | + # Should fail with RequiredArgumentMissingError during create |
| 4797 | + with self.assertRaises(RequiredArgumentMissingError): |
| 4798 | + ctx._get_outbound_type(enable_validation=True) |
| 4799 | + |
4708 | 4800 | def test_get_enable_gateway_api(self): |
4709 | 4801 | # default value |
4710 | 4802 | ctx_1 = AKSPreviewManagedClusterContext( |
|
0 commit comments