|
20 | 20 | _TF_VAR_REF_RE = re.compile(r"var\.(\w+)") |
21 | 21 | _TF_LOCAL_REF_RE = re.compile(r"local\.(\w+)") |
22 | 22 | _TF_DATA_REF_RE = re.compile(r"data\.(\w+)\.(\w+)") |
23 | | -_TF_RESOURCE_REF_RE = re.compile(r"(?<![.\w])(\w+)\.(\w+)\.(\w+)") |
| 23 | +_TF_RESOURCE_REF_RE = re.compile(r'(?<![.\w"])(\w+)\.(\w+)\.(\w+)') |
24 | 24 | _TF_MODULE_REF_RE = re.compile(r"module\.(\w+)") |
25 | 25 |
|
26 | 26 | _TF_SOURCE_RE = re.compile(r'^\s*source\s*=\s*"([^"]+)"', re.MULTILINE) |
@@ -74,15 +74,89 @@ def _collect_tf_dirs_and_sources(tf_files: list[Path]) -> tuple[set[Path], set[s |
74 | 74 | return tf_dirs, module_sources |
75 | 75 |
|
76 | 76 |
|
77 | | -def _is_in_module(candidate: Path, module_sources: set[str], tf_dirs: set[Path]) -> bool: |
| 77 | +def _is_in_module(candidate: Path, module_sources: set[str], tf_dirs: set[Path], repo_root: Path | None = None) -> bool: |
78 | 78 | for src in module_sources: |
79 | 79 | for tf_dir in tf_dirs: |
80 | 80 | try: |
81 | 81 | module_path = (tf_dir / src).resolve() |
82 | 82 | if candidate.is_relative_to(module_path): |
83 | 83 | return True |
84 | 84 | except (ValueError, OSError): |
85 | | - continue |
| 85 | + pass |
| 86 | + if repo_root: |
| 87 | + try: |
| 88 | + clean_src = src.lstrip("./") |
| 89 | + module_path = (repo_root / clean_src).resolve() |
| 90 | + if candidate.is_relative_to(module_path): |
| 91 | + return True |
| 92 | + except (ValueError, OSError): |
| 93 | + pass |
| 94 | + return False |
| 95 | + |
| 96 | + |
| 97 | +def _extract_qualified_defs(content: str) -> set[str]: |
| 98 | + defs: set[str] = set() |
| 99 | + for match in _TF_VARIABLE_RE.finditer(content): |
| 100 | + defs.add(match.group(1)) |
| 101 | + for match in _TF_RESOURCE_RE.finditer(content): |
| 102 | + defs.add(f"{match.group(1)}.{match.group(2)}") |
| 103 | + for match in _TF_DATA_RE.finditer(content): |
| 104 | + defs.add(f"{match.group(1)}.{match.group(2)}") |
| 105 | + for local_key in _extract_locals(content): |
| 106 | + defs.add(local_key) |
| 107 | + for match in _TF_MODULE_RE.finditer(content): |
| 108 | + defs.add(match.group(1)) |
| 109 | + return defs |
| 110 | + |
| 111 | + |
| 112 | +_TF_GENERIC_NAMES = frozenset( |
| 113 | + { |
| 114 | + "name", |
| 115 | + "region", |
| 116 | + "tags", |
| 117 | + "environment", |
| 118 | + "env", |
| 119 | + "description", |
| 120 | + "enabled", |
| 121 | + "type", |
| 122 | + "value", |
| 123 | + "default", |
| 124 | + "count", |
| 125 | + "id", |
| 126 | + "arn", |
| 127 | + "vpc_id", |
| 128 | + "subnet_id", |
| 129 | + "key", |
| 130 | + "project", |
| 131 | + "owner", |
| 132 | + "stage", |
| 133 | + } |
| 134 | +) |
| 135 | + |
| 136 | + |
| 137 | +def _candidate_references_changed_defs_strict(content: str, changed_defs: set[str]) -> bool: |
| 138 | + for match in _TF_VAR_REF_RE.finditer(content): |
| 139 | + name = match.group(1) |
| 140 | + if name in changed_defs and name not in _TF_GENERIC_NAMES: |
| 141 | + return True |
| 142 | + for match in _TF_LOCAL_REF_RE.finditer(content): |
| 143 | + name = match.group(1) |
| 144 | + if name in changed_defs and name not in _TF_GENERIC_NAMES: |
| 145 | + return True |
| 146 | + for match in _TF_DATA_REF_RE.finditer(content): |
| 147 | + data_type, data_name = match.group(1), match.group(2) |
| 148 | + if f"{data_type}.{data_name}" in changed_defs or data_name in changed_defs: |
| 149 | + return True |
| 150 | + for match in _TF_MODULE_REF_RE.finditer(content): |
| 151 | + if match.group(1) in changed_defs: |
| 152 | + return True |
| 153 | + skip_types = {"var", "local", "data", "module", "path", "terraform", "each", "self", "count"} |
| 154 | + for match in _TF_RESOURCE_REF_RE.finditer(content): |
| 155 | + res_type, res_name, _ = match.groups() |
| 156 | + if res_type in skip_types: |
| 157 | + continue |
| 158 | + if f"{res_type}.{res_name}" in changed_defs or res_name in changed_defs: |
| 159 | + return True |
86 | 160 | return False |
87 | 161 |
|
88 | 162 |
|
@@ -117,13 +191,37 @@ def discover_related_files( |
117 | 191 | return [] |
118 | 192 |
|
119 | 193 | tf_dirs, module_sources = _collect_tf_dirs_and_sources(tf_files) |
| 194 | + |
| 195 | + changed_defs: set[str] = set() |
| 196 | + changed_contents: list[str] = [] |
| 197 | + for tf in tf_files: |
| 198 | + try: |
| 199 | + content = tf.read_text(encoding="utf-8") |
| 200 | + changed_defs.update(_extract_qualified_defs(content)) |
| 201 | + changed_contents.append(content) |
| 202 | + except (OSError, UnicodeDecodeError): |
| 203 | + pass |
| 204 | + |
120 | 205 | changed_set = set(changed_files) |
121 | 206 | discovered: list[Path] = [] |
122 | 207 |
|
123 | 208 | for candidate in all_candidate_files: |
124 | 209 | if candidate in changed_set or not _is_terraform_file(candidate): |
125 | 210 | continue |
126 | | - if candidate.parent in tf_dirs or _is_in_module(candidate, module_sources, tf_dirs): |
| 211 | + if _is_in_module(candidate, module_sources, tf_dirs, repo_root): |
| 212 | + discovered.append(candidate) |
| 213 | + continue |
| 214 | + if candidate.parent not in tf_dirs: |
| 215 | + continue |
| 216 | + try: |
| 217 | + content = candidate.read_text(encoding="utf-8") |
| 218 | + except (OSError, UnicodeDecodeError): |
| 219 | + continue |
| 220 | + if _candidate_references_changed_defs_strict(content, changed_defs): |
| 221 | + discovered.append(candidate) |
| 222 | + continue |
| 223 | + candidate_defs = _extract_qualified_defs(content) |
| 224 | + if candidate_defs and any(_candidate_references_changed_defs_strict(c, candidate_defs) for c in changed_contents): |
127 | 225 | discovered.append(candidate) |
128 | 226 |
|
129 | 227 | return discovered |
|
0 commit comments