Skip to content

Commit c264b08

Browse files
committed
Add rule E3064: duplicate Interface VPC Endpoint with PrivateDnsEnabled
Detect when multiple AWS::EC2::VPCEndpoint resources in the same template share the same VpcId, ServiceName, VpcEndpointType: Interface, and PrivateDnsEnabled: true. AWS rejects the second endpoint with a conflicting DNS domain error. Fixes #4352
1 parent a1b97ea commit c264b08

2 files changed

Lines changed: 418 additions & 0 deletions

File tree

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
"""
2+
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
SPDX-License-Identifier: MIT-0
4+
"""
5+
6+
from __future__ import annotations
7+
8+
from collections import deque
9+
from typing import Any
10+
11+
from cfnlint.context.conditions import Unsatisfiable
12+
from cfnlint.helpers import ensure_list, is_function
13+
from cfnlint.jsonschema import ValidationError, ValidationResult, Validator
14+
from cfnlint.rules.helpers import get_value_from_path
15+
from cfnlint.rules.jsonschema.CfnLintKeyword import CfnLintKeyword
16+
17+
18+
class VpcEndpointPrivateDnsDuplicate(CfnLintKeyword):
19+
id = "E3064"
20+
shortdesc = "Validate unique PrivateDnsEnabled per service per VPC"
21+
description = (
22+
"Only one Interface VPC Endpoint per service can have "
23+
"PrivateDnsEnabled set to true in a VPC. A second endpoint "
24+
"with the same service and PrivateDnsEnabled will fail to create "
25+
"due to a conflicting DNS domain."
26+
)
27+
source_url = (
28+
"https://docs.aws.amazon.com/vpc/latest/privatelink/manage-dns-names.html"
29+
)
30+
tags = ["resources", "ec2", "vpc"]
31+
32+
def __init__(self) -> None:
33+
super().__init__(
34+
keywords=[
35+
"Resources/AWS::EC2::VPCEndpoint/Properties",
36+
],
37+
)
38+
self._endpoints: dict[
39+
tuple[str, str], list[tuple[list[str | int], dict[str, bool]]]
40+
] = {}
41+
42+
def initialize(self, cfn):
43+
self._endpoints = {}
44+
return super().initialize(cfn)
45+
46+
@staticmethod
47+
def _resolve_key(value: Any) -> str | None:
48+
if isinstance(value, str):
49+
return value
50+
if not isinstance(value, dict):
51+
return None
52+
fn_k, fn_v = is_function(value)
53+
if fn_k == "Ref":
54+
return f"Ref:{fn_v}"
55+
if fn_k == "Fn::GetAtt":
56+
parts = ensure_list(fn_v)
57+
return f"GetAtt:{parts[0]}"
58+
if fn_k == "Fn::Sub":
59+
if isinstance(fn_v, str):
60+
return f"Sub:{fn_v}"
61+
if isinstance(fn_v, list) and fn_v:
62+
return f"Sub:{fn_v[0]}"
63+
return None
64+
65+
def validate(
66+
self,
67+
validator: Validator,
68+
keywords: Any,
69+
instance: Any,
70+
schema: dict[str, Any],
71+
) -> ValidationResult:
72+
for endpoint_type, type_validator in get_value_from_path(
73+
validator, instance, deque(["VpcEndpointType"])
74+
):
75+
if endpoint_type != "Interface":
76+
continue
77+
78+
for private_dns, dns_validator in get_value_from_path(
79+
type_validator, instance, deque(["PrivateDnsEnabled"])
80+
):
81+
if private_dns is not True:
82+
continue
83+
84+
for vpc_id, vpc_validator in get_value_from_path(
85+
dns_validator, instance, deque(["VpcId"])
86+
):
87+
vpc_key = self._resolve_key(vpc_id)
88+
if vpc_key is None:
89+
continue
90+
91+
for service_name, svc_validator in get_value_from_path(
92+
vpc_validator, instance, deque(["ServiceName"])
93+
):
94+
svc_key = self._resolve_key(service_name)
95+
if svc_key is None:
96+
continue
97+
98+
group_key = (vpc_key, svc_key)
99+
if group_key not in self._endpoints:
100+
self._endpoints[group_key] = []
101+
102+
conditions = svc_validator.context.conditions.status
103+
104+
for _, saved_conditions in self._endpoints[group_key]:
105+
try:
106+
svc_validator.evolve(
107+
context=svc_validator.context.evolve(
108+
conditions=svc_validator.context.conditions.evolve(
109+
saved_conditions
110+
)
111+
)
112+
)
113+
except Unsatisfiable:
114+
continue
115+
116+
yield ValidationError(
117+
"Only one Interface VPC Endpoint per service "
118+
"can have 'PrivateDnsEnabled' set to true in "
119+
"a VPC. A second endpoint will fail to create "
120+
"due to a conflicting private DNS domain.",
121+
rule=self,
122+
)
123+
124+
self._endpoints[group_key].append(
125+
(
126+
list(svc_validator.context.path.path),
127+
conditions,
128+
)
129+
)

0 commit comments

Comments
 (0)