|
1 | 1 | import json |
| 2 | +import re |
2 | 3 | try: |
3 | 4 | from urllib.parse import urlparse |
4 | 5 | except ImportError: # Fall back to Python 2 |
|
16 | 17 | AZURE_GOV_SG = "login.sovcloud-identity.sg" |
17 | 18 |
|
18 | 19 | WORLD_WIDE = 'login.microsoftonline.com' # There was an alias login.windows.net |
19 | | -WELL_KNOWN_AUTHORITY_HOSTS = frozenset([ |
20 | | - WORLD_WIDE, |
21 | | - "login.microsoft.com", |
22 | | - "login.windows.net", |
23 | | - "sts.windows.net", |
24 | | - DEPRECATED_AZURE_CHINA, |
25 | | - "login.partner.microsoftonline.cn", |
26 | | - "login.microsoftonline.de", # deprecated |
27 | | - 'login-us.microsoftonline.com', |
28 | | - AZURE_US_GOVERNMENT, |
29 | | - "login.usgovcloudapi.net", |
30 | | - AZURE_GOV_FR, |
31 | | - AZURE_GOV_DE, |
32 | | - AZURE_GOV_SG, |
33 | | - ]) |
| 20 | + |
| 21 | +# Sovereign-cloud sentinels. Aliases of the same cloud map to the same value |
| 22 | +# so callers can compare clouds with simple equality. |
| 23 | +_CLOUD_PUBLIC = "PUBLIC" |
| 24 | +_CLOUD_CHINA = "CHINA" |
| 25 | +_CLOUD_GERMANY = "GERMANY" |
| 26 | +_CLOUD_US_GOV = "US_GOV" |
| 27 | +_CLOUD_US_ALT = "US_ALT" |
| 28 | +_CLOUD_PPE = "PPE" |
| 29 | +_CLOUD_BLEU = "BLEU" |
| 30 | +_CLOUD_DELOS = "DELOS" |
| 31 | +_CLOUD_GOV_SG = "GOV_SG" |
| 32 | + |
| 33 | +# Single source of truth for known Microsoft authority hosts. Add an alias |
| 34 | +# here and WELL_KNOWN_AUTHORITY_HOSTS / _KNOWN_HOST_TO_CLOUD pick it up. |
| 35 | +_HOSTS_BY_CLOUD = { |
| 36 | + _CLOUD_PUBLIC: ( |
| 37 | + AZURE_PUBLIC, |
| 38 | + "login.microsoft.com", |
| 39 | + "login.windows.net", |
| 40 | + "sts.windows.net", |
| 41 | + ), |
| 42 | + _CLOUD_CHINA: ( |
| 43 | + "login.partner.microsoftonline.cn", |
| 44 | + DEPRECATED_AZURE_CHINA, |
| 45 | + ), |
| 46 | + _CLOUD_GERMANY: ("login.microsoftonline.de",), # deprecated |
| 47 | + _CLOUD_US_GOV: ( |
| 48 | + AZURE_US_GOVERNMENT, |
| 49 | + "login.usgovcloudapi.net", |
| 50 | + ), |
| 51 | + _CLOUD_US_ALT: ("login-us.microsoftonline.com",), |
| 52 | + _CLOUD_BLEU: (AZURE_GOV_FR,), |
| 53 | + _CLOUD_DELOS: (AZURE_GOV_DE,), |
| 54 | + _CLOUD_GOV_SG: (AZURE_GOV_SG,), |
| 55 | +} |
| 56 | + |
| 57 | +# Hosts that resolve to a cloud for the cross-cloud check but MUST NOT enter |
| 58 | +# WELL_KNOWN_AUTHORITY_HOSTS (which gates instance-discovery skipping). |
| 59 | +# - PPE: non-production. |
| 60 | +# - ciamlogin.com: bare suffix, never a usable authority on its own; tenant |
| 61 | +# subdomains resolve to Public via _resolve_known_cloud's regional logic. |
| 62 | +_EXTRA_HOSTS_BY_CLOUD = { |
| 63 | + _CLOUD_PPE: ( |
| 64 | + "login.windows-ppe.net", |
| 65 | + "sts.windows-ppe.net", |
| 66 | + "login.microsoft-ppe.com", |
| 67 | + ), |
| 68 | + _CLOUD_PUBLIC: ("ciamlogin.com",), |
| 69 | +} |
| 70 | + |
| 71 | +# Derived from _HOSTS_BY_CLOUD so a new alias cannot drift out of sync. |
| 72 | +WELL_KNOWN_AUTHORITY_HOSTS = frozenset( |
| 73 | + host for hosts in _HOSTS_BY_CLOUD.values() for host in hosts) |
| 74 | + |
| 75 | +_KNOWN_HOST_TO_CLOUD = { |
| 76 | + host: cloud |
| 77 | + for cloud, hosts in _HOSTS_BY_CLOUD.items() |
| 78 | + for host in hosts |
| 79 | +} |
| 80 | +_KNOWN_HOST_TO_CLOUD.update({ |
| 81 | + host: cloud |
| 82 | + for cloud, hosts in _EXTRA_HOSTS_BY_CLOUD.items() |
| 83 | + for host in hosts |
| 84 | +}) |
| 85 | + |
| 86 | +# Catch a duplicated host at import time rather than in production. |
| 87 | +_all_listed_hosts = [ |
| 88 | + h for hosts in _HOSTS_BY_CLOUD.values() for h in hosts |
| 89 | +] + [ |
| 90 | + h for hosts in _EXTRA_HOSTS_BY_CLOUD.values() for h in hosts |
| 91 | +] |
| 92 | +assert len(_all_listed_hosts) == len(_KNOWN_HOST_TO_CLOUD), ( |
| 93 | + "Duplicate host in cloud tables: {}".format( |
| 94 | + sorted(h for h in set(_all_listed_hosts) if _all_listed_hosts.count(h) > 1))) |
| 95 | +del _all_listed_hosts |
34 | 96 |
|
35 | 97 | WELL_KNOWN_B2C_HOSTS = [ |
36 | 98 | "b2clogin.com", |
|
41 | 103 | ] |
42 | 104 | _CIAM_DOMAIN_SUFFIX = ".ciamlogin.com" |
43 | 105 |
|
| 106 | +# RFC 1035 / RFC 1123 DNS label: 1-63 chars, no leading/trailing hyphen. |
| 107 | +# Used as a shape gate on the region prefix; not an allow-list of regions. |
| 108 | +_REGION_PREFIX_PATTERN = re.compile(r"^[a-z0-9](?:[a-z0-9-]{0,61}[a-z0-9])?$") |
| 109 | + |
| 110 | + |
| 111 | +def _resolve_known_cloud(host): |
| 112 | + """Return the cloud sentinel for *host*, or None. |
| 113 | +
|
| 114 | + Trust gate for the cross-cloud check (used by `_are_in_same_cloud`, |
| 115 | + `_ensure_endpoint_same_cloud_as_authority`, and Rule 2 of |
| 116 | + `has_valid_issuer`). Tightening here is safe; loosening here weakens |
| 117 | + all three call sites at once. |
| 118 | +
|
| 119 | + Matches an alias in :data:`_KNOWN_HOST_TO_CLOUD` or a ``{region}.{alias}`` |
| 120 | + sub-host where ``{region}`` matches :data:`_REGION_PREFIX_PATTERN`. |
| 121 | + Hosts are lowercased here, so callers may pass either the raw |
| 122 | + ``urlparse(...).hostname`` (already lowercased per RFC 3986) or any |
| 123 | + untrusted string. |
| 124 | + """ |
| 125 | + if not host: |
| 126 | + return None |
| 127 | + host = host.lower() |
| 128 | + cloud = _KNOWN_HOST_TO_CLOUD.get(host) |
| 129 | + if cloud is not None: |
| 130 | + return cloud |
| 131 | + dot = host.find(".") |
| 132 | + if dot <= 0: |
| 133 | + return None |
| 134 | + prefix = host[:dot] |
| 135 | + base = host[dot + 1:] |
| 136 | + if _REGION_PREFIX_PATTERN.match(prefix) and base in _KNOWN_HOST_TO_CLOUD: |
| 137 | + return _KNOWN_HOST_TO_CLOUD[base] |
| 138 | + return None |
| 139 | + |
| 140 | + |
| 141 | +def _are_in_same_cloud(host_a, host_b): |
| 142 | + """Default-deny: True iff both hosts resolve to the same known cloud.""" |
| 143 | + cloud_a = _resolve_known_cloud(host_a) |
| 144 | + if cloud_a is None: |
| 145 | + return False |
| 146 | + cloud_b = _resolve_known_cloud(host_b) |
| 147 | + if cloud_b is None: |
| 148 | + return False |
| 149 | + return cloud_a == cloud_b |
| 150 | + |
| 151 | + |
| 152 | +def _ensure_endpoint_same_cloud_as_authority( |
| 153 | + authority_url, endpoint_url, endpoint_name): |
| 154 | + """Reject an OIDC discovery endpoint that crosses sovereign clouds. |
| 155 | +
|
| 156 | + No-op when *authority_url* is a custom domain (custom OIDC IdPs are |
| 157 | + unconstrained) or when *endpoint_url* is empty / not absolute. Raises |
| 158 | + :class:`ValueError` naming the authority, endpoint kind, and offending |
| 159 | + URL; no tokens or secrets are surfaced. |
| 160 | + """ |
| 161 | + if not endpoint_url: |
| 162 | + return |
| 163 | + endpoint_parsed = urlparse(endpoint_url) |
| 164 | + if not endpoint_parsed.scheme or not endpoint_parsed.hostname: |
| 165 | + return # Let downstream parsing surface a non-absolute URL |
| 166 | + authority_host = urlparse(authority_url).hostname if authority_url else None |
| 167 | + authority_cloud = _resolve_known_cloud(authority_host) |
| 168 | + if authority_cloud is None: |
| 169 | + return |
| 170 | + endpoint_cloud = _resolve_known_cloud(endpoint_parsed.hostname) |
| 171 | + if endpoint_cloud is None or endpoint_cloud != authority_cloud: |
| 172 | + raise ValueError( |
| 173 | + "OIDC discovery for authority '{authority}' returned a " |
| 174 | + "{name} '{endpoint}' whose host is not in the same Microsoft " |
| 175 | + "sovereign cloud as the authority. MSAL refused to use that " |
| 176 | + "endpoint. Verify the OIDC discovery endpoint is not being " |
| 177 | + "intercepted and that the configured authority points at the " |
| 178 | + "correct sovereign cloud.".format( |
| 179 | + authority=authority_url, |
| 180 | + name=endpoint_name, |
| 181 | + endpoint=endpoint_url, |
| 182 | + )) |
| 183 | + |
44 | 184 |
|
45 | 185 | def _get_instance_discovery_host(instance): |
46 | 186 | return instance if instance in WELL_KNOWN_AUTHORITY_HOSTS else WORLD_WIDE |
@@ -118,16 +258,29 @@ def __init__( |
118 | 258 | self.device_authorization_endpoint = openid_config.get('device_authorization_endpoint') |
119 | 259 | _, _, self.tenant = canonicalize(self.token_endpoint) # Usually a GUID |
120 | 260 |
|
121 | | - # Validate the issuer if using OIDC authority |
122 | | - if self._oidc_authority_url and not self.has_valid_issuer(): |
123 | | - raise ValueError(( |
124 | | - "The issuer '{iss}' does not match the authority '{auth}' or a known pattern. " |
125 | | - "When using the 'oidc_authority' parameter in ClientApplication, the authority " |
126 | | - "will be validated against the issuer from {auth}/.well-known/openid-configuration ." |
127 | | - "If using a known Entra authority (e.g. login.microsoftonline.com) the " |
128 | | - "'authority' parameter should be used instead of 'oidc_authority'. " |
129 | | - "" |
130 | | - ).format(iss=self._issuer, auth=oidc_authority_url)) |
| 261 | + # Validate the issuer and enforce same-cloud endpoints (OIDC only). |
| 262 | + # See #5927 for the cross-cloud hardening. |
| 263 | + if self._oidc_authority_url: |
| 264 | + if not self.has_valid_issuer(): |
| 265 | + raise ValueError(( |
| 266 | + "The issuer '{iss}' does not match the authority '{auth}' or a known pattern. " |
| 267 | + "When using the 'oidc_authority' parameter in ClientApplication, the authority " |
| 268 | + "will be validated against the issuer from {auth}/.well-known/openid-configuration ." |
| 269 | + "If using a known Entra authority (e.g. login.microsoftonline.com) the " |
| 270 | + "'authority' parameter should be used instead of 'oidc_authority'. " |
| 271 | + "" |
| 272 | + ).format(iss=self._issuer, auth=self._oidc_authority_url)) |
| 273 | + _ensure_endpoint_same_cloud_as_authority( |
| 274 | + self._oidc_authority_url, self.token_endpoint, "token_endpoint") |
| 275 | + _ensure_endpoint_same_cloud_as_authority( |
| 276 | + self._oidc_authority_url, self.authorization_endpoint, |
| 277 | + "authorization_endpoint") |
| 278 | + if self.device_authorization_endpoint: |
| 279 | + _ensure_endpoint_same_cloud_as_authority( |
| 280 | + self._oidc_authority_url, |
| 281 | + self.device_authorization_endpoint, |
| 282 | + "device_authorization_endpoint") |
| 283 | + |
131 | 284 | def _initialize_oidc_authority(self, oidc_authority_url): |
132 | 285 | authority, self.instance, tenant = canonicalize(oidc_authority_url) |
133 | 286 | self.is_adfs = tenant.lower() == 'adfs' # As a convention |
@@ -201,58 +354,93 @@ def user_realm_discovery(self, username, correlation_id=None, response=None): |
201 | 354 | return {} # This can guide the caller to fall back normal ROPC flow |
202 | 355 |
|
203 | 356 | def has_valid_issuer(self): |
204 | | - """ |
205 | | - Returns True if the issuer from OIDC discovery is valid for this authority. |
206 | | -
|
207 | | - An issuer is valid if one of the following is true: |
208 | | - - It exactly matches the authority URL (with/without trailing slash) |
209 | | - - It has the same scheme and host as the authority (path can be different) |
210 | | - - The issuer host is a well-known Microsoft authority host |
211 | | - - The issuer host is a regional variant of a well-known host (e.g., westus2.login.microsoft.com) |
212 | | - - For CIAM, hosts that end with well-known B2C hosts (e.g., tenant.b2clogin.com) are accepted as valid issuers |
| 357 | + """True if the OIDC issuer is valid for this authority. |
| 358 | +
|
| 359 | + Steps below are evaluated in this order; the bracketed labels are |
| 360 | + the historical rule names retained for cross-reference with the |
| 361 | + MSAL.NET port (#5927). Order is security-sensitive. |
| 362 | +
|
| 363 | + Step 1 [Case 1]: Exact match. |
| 364 | + Step 2 [Case 4]: Same scheme + netloc (paths may differ). |
| 365 | + Step 3 [Rule 3]: CIAM tenant pattern (cross-host only). Must run |
| 366 | + before Step 4 so a ``<x>.ciamlogin.com`` issuer cannot bypass |
| 367 | + tenant matching via Rule 2b (CIAM resolves to Public). |
| 368 | + Step 4 [Rule 2]: Same Microsoft cloud. 2a accepts any known-MS |
| 369 | + issuer under a custom-domain authority (#5927 federation); |
| 370 | + 2b accepts a known-MS issuer under a known-MS authority only |
| 371 | + when the two clouds are identical. |
| 372 | + Step 5 [Case 3b]: Region-shaped prefix on the authority host. |
| 373 | + Step 6 [Case 5]: B2C subdomain (excluding ``.ciamlogin.com``, |
| 374 | + handled by Step 3). |
213 | 375 | """ |
214 | 376 | if not self._issuer or not self._oidc_authority_url: |
215 | 377 | return False |
216 | 378 |
|
217 | | - # Case 1: Exact match (most common case, normalized for trailing slashes) |
| 379 | + # Step 1 [Case 1]: exact match (trailing slash insensitive) |
218 | 380 | if self._issuer.rstrip("/") == self._oidc_authority_url.rstrip("/"): |
219 | 381 | return True |
220 | 382 |
|
221 | 383 | issuer_parsed = urlparse(self._issuer) |
222 | 384 | authority_parsed = urlparse(self._oidc_authority_url) |
223 | 385 | issuer_host = issuer_parsed.hostname.lower() if issuer_parsed.hostname else None |
| 386 | + authority_host = ( |
| 387 | + authority_parsed.hostname.lower() if authority_parsed.hostname else "") |
224 | 388 |
|
225 | 389 | if not issuer_host: |
226 | 390 | return False |
227 | | - |
228 | | - # Case 2: Issuer is from a trusted Microsoft host - O(1) lookup |
229 | | - if issuer_host in WELL_KNOWN_AUTHORITY_HOSTS: |
| 391 | + |
| 392 | + # Step 2 [Case 4]: same scheme + host. Runs before Step 3 so a CIAM |
| 393 | + # authority/issuer pair on the same host (different paths) passes. |
| 394 | + if (authority_parsed.scheme == issuer_parsed.scheme and |
| 395 | + authority_parsed.netloc == issuer_parsed.netloc): |
230 | 396 | return True |
231 | 397 |
|
232 | | - # Case 3: Regional variant check - O(1) lookup |
233 | | - # e.g., westus2.login.microsoft.com -> extract "login.microsoft.com" |
| 398 | + # Step 3 [Rule 3]: cross-host CIAM issuer. Tenant must match |
| 399 | + # authority's first path segment (or first hostname label). Must run |
| 400 | + # before Step 4 to block the Rule 2b CIAM bypass. |
| 401 | + if issuer_host.endswith(_CIAM_DOMAIN_SUFFIX): |
| 402 | + issuer_tenant = issuer_host[:-len(_CIAM_DOMAIN_SUFFIX)] |
| 403 | + auth_path_parts = [p for p in authority_parsed.path.split("/") if p] |
| 404 | + if auth_path_parts: |
| 405 | + authority_tenant = auth_path_parts[0].lower() |
| 406 | + else: |
| 407 | + authority_tenant = authority_host.split(".", 1)[0] |
| 408 | + if issuer_tenant and issuer_tenant == authority_tenant: |
| 409 | + normalized_issuer_path = issuer_parsed.path.rstrip("/").lower() |
| 410 | + if normalized_issuer_path in ( |
| 411 | + "", |
| 412 | + "/" + issuer_tenant, |
| 413 | + "/" + issuer_tenant + "/v2.0"): |
| 414 | + return True |
| 415 | + return False # Tenant mismatch: reject. |
| 416 | + |
| 417 | + # Step 4 [Rule 2]: known Microsoft issuer over HTTPS. |
| 418 | + # 2a: custom-domain authority -> accept (#5927 federation). |
| 419 | + # 2b: known-MS authority -> accept only if same cloud. |
| 420 | + issuer_cloud = _resolve_known_cloud(issuer_host) |
| 421 | + if issuer_cloud is not None and issuer_parsed.scheme == "https": |
| 422 | + authority_cloud = _resolve_known_cloud(authority_host) |
| 423 | + if authority_cloud is None: |
| 424 | + return True # 2a |
| 425 | + if authority_cloud == issuer_cloud: |
| 426 | + return True # 2b |
| 427 | + # Cross-cloud: fall through to reject. |
| 428 | + |
| 429 | + # Step 5 [Case 3b]: region-shaped prefix on the authority host |
| 430 | + # (e.g. issuer=us.someweb.com, authority=someweb.com). |
234 | 431 | dot_index = issuer_host.find(".") |
235 | 432 | if dot_index > 0: |
| 433 | + prefix = issuer_host[:dot_index] |
236 | 434 | potential_base = issuer_host[dot_index + 1:] |
237 | | - if "." not in issuer_host[:dot_index]: |
238 | | - # 3a: Base host is a trusted Microsoft host |
239 | | - if potential_base in WELL_KNOWN_AUTHORITY_HOSTS: |
240 | | - return True |
241 | | - # 3b: Issuer has a region prefix on the authority host |
242 | | - # e.g. issuer=us.someweb.com, authority=someweb.com |
243 | | - authority_host = authority_parsed.hostname.lower() if authority_parsed.hostname else "" |
244 | | - if potential_base == authority_host: |
245 | | - return True |
| 435 | + if (_REGION_PREFIX_PATTERN.match(prefix) |
| 436 | + and potential_base == authority_host): |
| 437 | + return True |
246 | 438 |
|
247 | | - # Case 4: Same scheme and host (path can differ) |
248 | | - if (authority_parsed.scheme == issuer_parsed.scheme and |
249 | | - authority_parsed.netloc == issuer_parsed.netloc): |
250 | | - return True |
251 | | - |
252 | | - # Case 5: Check if issuer host is a subdomain of a well-known B2C host |
253 | | - # e.g., tenant.b2clogin.com matches .b2clogin.com |
254 | | - # but fakeb2clogin.com does not |
255 | | - if any(issuer_host.endswith("." + h) for h in WELL_KNOWN_B2C_HOSTS): |
| 439 | + # Step 6 [Case 5]: B2C subdomain. .ciamlogin.com handled by Step 3. |
| 440 | + if any( |
| 441 | + issuer_host.endswith("." + h) |
| 442 | + for h in WELL_KNOWN_B2C_HOSTS |
| 443 | + if h != "ciamlogin.com"): |
256 | 444 | return True |
257 | 445 |
|
258 | 446 | return False |
|
0 commit comments