55 from urlparse import urlparse
66import logging
77
8-
98logger = logging .getLogger (__name__ )
10-
119# Endpoints were copied from here
1210# https://docs.microsoft.com/en-us/azure/active-directory/develop/authentication-national-cloud#azure-ad-authentication-endpoints
1311AZURE_US_GOVERNMENT = "login.microsoftonline.us"
2119 'login-us.microsoftonline.com' ,
2220 AZURE_US_GOVERNMENT ,
2321 ])
22+
23+ # Trusted issuer hosts for OIDC issuer validation
24+ # Includes all well-known Microsoft identity provider hosts and national clouds
25+ TRUSTED_ISSUER_HOSTS = frozenset ([
26+ # Global/Public cloud
27+ "login.microsoftonline.com" ,
28+ "login.microsoft.com" ,
29+ "login.windows.net" ,
30+ "sts.windows.net" ,
31+ # China cloud
32+ "login.chinacloudapi.cn" ,
33+ "login.partner.microsoftonline.cn" ,
34+ # Germany cloud (legacy)
35+ "login.microsoftonline.de" ,
36+ # US Government clouds
37+ "login.microsoftonline.us" ,
38+ "login.usgovcloudapi.net" ,
39+ "login-us.microsoftonline.com" ,
40+ "https://login.sovcloud-identity.fr" , # AzureBleu
41+ "https://login.sovcloud-identity.de" , # AzureDelos
42+ "https://login.sovcloud-identity.sg" , # AzureGovSG
43+ ])
44+
2445WELL_KNOWN_B2C_HOSTS = [
2546 "b2clogin.com" ,
2647 "b2clogin.cn" ,
@@ -67,12 +88,12 @@ def __init__(
6788 performed.
6889 """
6990 self ._http_client = http_client
91+ self ._oidc_authority_url = oidc_authority_url
7092 if oidc_authority_url :
7193 logger .debug ("Initializing with OIDC authority: %s" , oidc_authority_url )
7294 tenant_discovery_endpoint = self ._initialize_oidc_authority (
7395 oidc_authority_url )
7496 else :
75- logger .debug ("Initializing with Entra authority: %s" , authority_url )
7697 tenant_discovery_endpoint = self ._initialize_entra_authority (
7798 authority_url , validate_authority , instance_discovery )
7899 try :
@@ -93,14 +114,22 @@ def __init__(
93114 .format (authority_url )
94115 ) + " Also please double check your tenant name or GUID is correct."
95116 raise ValueError (error_message )
96- openid_config .pop ("issuer" , None ) # Not used in MSAL.py, so remove it therefore no need to validate it
97- logger .debug (
98- 'openid_config("%s") = %s' , tenant_discovery_endpoint , openid_config )
117+ self ._issuer = openid_config .get ('issuer' )
99118 self .authorization_endpoint = openid_config ['authorization_endpoint' ]
100119 self .token_endpoint = openid_config ['token_endpoint' ]
101120 self .device_authorization_endpoint = openid_config .get ('device_authorization_endpoint' )
102121 _ , _ , self .tenant = canonicalize (self .token_endpoint ) # Usually a GUID
103122
123+ # Validate the issuer if using OIDC authority
124+ if self ._oidc_authority_url and not self .has_valid_issuer ():
125+ raise ValueError ((
126+ "The issuer '{iss}' does not match the authority '{auth}' or a known pattern. "
127+ "When using the 'oidc_authority' parameter in ClientApplication, the authority "
128+ "will be validated against the issuer from {auth}/.well-known/openid-configuration ."
129+ "If using a known Entra authority (e.g. login.microsoftonline.com) the "
130+ "'authority' parameter should be used instead of 'oidc_authority'. "
131+ ""
132+ ).format (iss = self ._issuer , auth = oidc_authority_url ))
104133 def _initialize_oidc_authority (self , oidc_authority_url ):
105134 authority , self .instance , tenant = canonicalize (oidc_authority_url )
106135 self .is_adfs = tenant .lower () == 'adfs' # As a convention
@@ -175,6 +204,60 @@ def user_realm_discovery(self, username, correlation_id=None, response=None):
175204 self .__class__ ._domains_without_user_realm_discovery .add (self .instance )
176205 return {} # This can guide the caller to fall back normal ROPC flow
177206
207+ def has_valid_issuer (self ):
208+ """
209+ Returns True if the issuer from OIDC discovery is valid for this authority.
210+
211+ An issuer is valid if one of the following is true:
212+ - It exactly matches the authority URL (with/without trailing slash)
213+ - It has the same scheme and host as the authority (path can be different)
214+ - The issuer host is a well-known Microsoft authority host
215+ - The issuer host is a regional variant of a well-known host (e.g., westus2.login.microsoft.com)
216+ - For CIAM, hosts that end with well-known B2C hosts (e.g., tenant.b2clogin.com) are accepted as valid issuers
217+ """
218+ if not self ._issuer or not self ._oidc_authority_url :
219+ return False
220+
221+ # Case 1: Exact match (most common case, normalized for trailing slashes)
222+ if self ._issuer .rstrip ("/" ) == self ._oidc_authority_url .rstrip ("/" ):
223+ return True
224+
225+ issuer_parsed = urlparse (self ._issuer )
226+ authority_parsed = urlparse (self ._oidc_authority_url )
227+ issuer_host = issuer_parsed .hostname .lower () if issuer_parsed .hostname else None
228+
229+ if not issuer_host :
230+ return False
231+
232+ # Case 2: Issuer is from a trusted Microsoft host - O(1) lookup
233+ if issuer_host in TRUSTED_ISSUER_HOSTS :
234+ return True
235+
236+ # Case 3: Regional variant check - O(1) lookup
237+ # e.g., westus2.login.microsoft.com -> extract "login.microsoft.com"
238+ dot_index = issuer_host .find ("." )
239+ if dot_index > 0 :
240+ potential_base = issuer_host [dot_index + 1 :]
241+ if "." not in issuer_host [:dot_index ]:
242+ # 3a: Base host is a trusted Microsoft host
243+ if potential_base in TRUSTED_ISSUER_HOSTS :
244+ return True
245+ # 3b: Issuer has a region prefix on the authority host
246+ # e.g. issuer=us.someweb.com, authority=someweb.com
247+ authority_host = authority_parsed .hostname .lower () if authority_parsed .hostname else ""
248+ if potential_base == authority_host :
249+ return True
250+
251+ # Case 4: Same scheme and host (path can differ)
252+ if (authority_parsed .scheme == issuer_parsed .scheme and
253+ authority_parsed .netloc == issuer_parsed .netloc ):
254+ return True
255+
256+ # Case 5: Check if issuer host ends with any well-known B2C host (e.g., tenant.b2clogin.com)
257+ if any (issuer_host .endswith (h ) for h in WELL_KNOWN_B2C_HOSTS ):
258+ return True
259+
260+ return False
178261
179262def canonicalize (authority_or_auth_endpoint ):
180263 # Returns (url_parsed_result, hostname_in_lowercase, tenant)
@@ -223,4 +306,3 @@ def tenant_discovery(tenant_discovery_endpoint, http_client, **kwargs):
223306 resp .raise_for_status ()
224307 raise RuntimeError ( # A fallback here, in case resp.raise_for_status() is no-op
225308 "Unable to complete OIDC Discovery: %d, %s" % (resp .status_code , resp .text ))
226-
0 commit comments