Skip to content
57 changes: 56 additions & 1 deletion msal/authority.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def __init__(
performed.
"""
self._http_client = http_client
self._oidc_authority_url = oidc_authority_url
if oidc_authority_url:
logger.debug("Initializing with OIDC authority: %s", oidc_authority_url)
tenant_discovery_endpoint = self._initialize_oidc_authority(
Expand Down Expand Up @@ -95,11 +96,19 @@ def __init__(
raise ValueError(error_message)
logger.debug(
'openid_config("%s") = %s', tenant_discovery_endpoint, openid_config)
self._issuer = openid_config.get('issuer')
self.authorization_endpoint = openid_config['authorization_endpoint']
self.token_endpoint = openid_config['token_endpoint']
self.device_authorization_endpoint = openid_config.get('device_authorization_endpoint')
_, _, self.tenant = canonicalize(self.token_endpoint) # Usually a GUID

# Validate the issuer if using OIDC authority
if self._oidc_authority_url and not self.has_valid_issuer():
raise ValueError((
"The issuer '{iss}' does not match the authority '{auth}' or a known pattern. "
Comment thread
Avery-Dunn marked this conversation as resolved.
"When using the 'oidc_authority' parameter in ClientApplication, the authority "
"will be validated against the issuer from {auth}/.well-known/openid-configuration ."
).format(iss=self._issuer, auth=oidc_authority_url))
def _initialize_oidc_authority(self, oidc_authority_url):
authority, self.instance, tenant = canonicalize(oidc_authority_url)
self.is_adfs = tenant.lower() == 'adfs' # As a convention
Expand Down Expand Up @@ -174,6 +183,53 @@ def user_realm_discovery(self, username, correlation_id=None, response=None):
self.__class__._domains_without_user_realm_discovery.add(self.instance)
return {} # This can guide the caller to fall back normal ROPC flow

def has_valid_issuer(self) -> bool:
"""
Returns True if the issuer from OIDC discovery is valid for this authority.

An issuer is valid if one of the following is true:
- It exactly matches the authority URL
- It has a known Microsoft host (e.g., login.microsoftonline.com)
- It has the same scheme and host as the authority (path can be different)
- For CIAM, the issuer follows the pattern of {tenant}.ciamlogin.com (tenant comes from the authority)
"""
if self._oidc_authority_url == self._issuer:
# The issuer matches the authority URL exactly
return True

issuer = urlparse(self._issuer) if self._issuer else None
if not issuer:
return False

# Check if issuer has a known Microsoft host
if issuer.hostname in WELL_KNOWN_AUTHORITY_HOSTS:
return True

# Check if issuer has the same scheme and host as the authority
authority = urlparse(self._oidc_authority_url)
if authority.scheme == issuer.scheme and authority.netloc == issuer.netloc:
return True

# Check CIAM scenario: issuer follows the pattern {tenant}.ciamlogin.com
# Extract tenant from authority URL - could be in the host or path
tenant = None
if authority.hostname.endswith(_CIAM_DOMAIN_SUFFIX):
# Normal CIAM host: {tenant}.ciamlogin.com
tenant = authority.hostname.rsplit(_CIAM_DOMAIN_SUFFIX, 1)[0]
else:
# Custom CIAM host: extract tenant from path
parts = authority.path.split('/')
if len(parts) >= 2 and parts[1]:
tenant = parts[1] # First path segment after the initial '/'

if tenant and issuer.hostname.endswith(_CIAM_DOMAIN_SUFFIX):
# Check if issuer follows the pattern {tenant}.ciamlogin.com
expected_issuer_host = f"{tenant}{_CIAM_DOMAIN_SUFFIX}"
if issuer.hostname == expected_issuer_host:
return True

# None of the conditions matched
return False

def canonicalize(authority_or_auth_endpoint):
# Returns (url_parsed_result, hostname_in_lowercase, tenant)
Expand Down Expand Up @@ -222,4 +278,3 @@ def tenant_discovery(tenant_discovery_endpoint, http_client, **kwargs):
resp.raise_for_status()
raise RuntimeError( # A fallback here, in case resp.raise_for_status() is no-op
"Unable to complete OIDC Discovery: %d, %s" % (resp.status_code, resp.text))

1 change: 1 addition & 0 deletions tests/test_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,7 @@ def test_should_fallback_when_pymsalruntime_failed_to_initialize_broker(self):
@patch("msal.authority.tenant_discovery", new=Mock(return_value={
"authorization_endpoint": "https://contoso.com/placeholder",
"token_endpoint": "https://contoso.com/placeholder",
"issuer": "https://contoso.com/placeholder",
}))
@patch("msal.application._init_broker", new=Mock()) # Pretend pymsalruntime installed and working
class TestBrokerFallbackWithDifferentAuthorities(unittest.TestCase):
Expand Down
127 changes: 115 additions & 12 deletions tests/test_authority.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import msal
from msal.authority import *
from msal.authority import _CIAM_DOMAIN_SUFFIX # Explicitly import the private constant
from tests import unittest
from tests.http_client import MinimalHttpClient

Expand Down Expand Up @@ -100,12 +101,12 @@ def test_authority_with_path_should_be_used_as_is(self, oidc_discovery):


@patch("msal.authority._instance_discovery")
@patch("msal.authority.tenant_discovery", return_value={
"authorization_endpoint": "https://contoso.com/authorize",
"token_endpoint": "https://contoso.com/token",
})
@patch("msal.authority.tenant_discovery") # Moved return_value out of the decorator
class OidcAuthorityTestCase(unittest.TestCase):
authority = "https://contoso.com/tenant"
authorization_endpoint = "https://contoso.com/authorize"
token_endpoint = "https://contoso.com/token"
issuer = "https://contoso.com/tenant" # Added as class variable for inheritance

def setUp(self):
# setUp() gives subclass a dynamic setup based on their authority
Expand All @@ -115,25 +116,37 @@ def setUp(self):
# Here the test is to confirm the OIDC endpoint contains no "/v2.0"
self.authority + "/.well-known/openid-configuration")

def setup_tenant_discovery(self, tenant_discovery):
Comment thread
Avery-Dunn marked this conversation as resolved.
"""Configure the tenant_discovery mock with class-specific values"""
tenant_discovery.return_value = {
"authorization_endpoint": self.authorization_endpoint,
"token_endpoint": self.token_endpoint,
"issuer": self.issuer,
}

def test_authority_obj_should_do_oidc_discovery_and_skip_instance_discovery(
self, oidc_discovery, instance_discovery):
self.setup_tenant_discovery(oidc_discovery)

c = MinimalHttpClient()
a = Authority(None, c, oidc_authority_url=self.authority)
instance_discovery.assert_not_called()
oidc_discovery.assert_called_once_with(self.oidc_discovery_endpoint, c)
self.assertEqual(a.authorization_endpoint, 'https://contoso.com/authorize')
self.assertEqual(a.token_endpoint, 'https://contoso.com/token')
self.assertEqual(a.authorization_endpoint, self.authorization_endpoint)
self.assertEqual(a.token_endpoint, self.token_endpoint)

def test_application_obj_should_do_oidc_discovery_and_skip_instance_discovery(
self, oidc_discovery, instance_discovery):
self.setup_tenant_discovery(oidc_discovery)

app = msal.ClientApplication(
"id", authority=None, oidc_authority=self.authority)
instance_discovery.assert_not_called()
oidc_discovery.assert_called_once_with(
self.oidc_discovery_endpoint, app.http_client)
self.assertEqual(
app.authority.authorization_endpoint, 'https://contoso.com/authorize')
self.assertEqual(app.authority.token_endpoint, 'https://contoso.com/token')
app.authority.authorization_endpoint, self.authorization_endpoint)
self.assertEqual(app.authority.token_endpoint, self.token_endpoint)


class DstsAuthorityTestCase(OidcAuthorityTestCase):
Expand All @@ -144,14 +157,14 @@ class DstsAuthorityTestCase(OidcAuthorityTestCase):
"https://some.url.dsts.core.azure-test.net/dstsv2/common/oauth2/authorize")
token_endpoint = (
"https://some.url.dsts.core.azure-test.net/dstsv2/common/oauth2/token")
issuer = "https://test-instance1-dsts.dsts.core.azure-test.net/dstsv2/common"

@patch("msal.authority._instance_discovery")
@patch("msal.authority.tenant_discovery", return_value={
"authorization_endpoint": authorization_endpoint,
"token_endpoint": token_endpoint,
}) # We need to create new patches (i.e. mocks) for non-inherited test cases
@patch("msal.authority.tenant_discovery") # Remove the hard-coded return_value
def test_application_obj_should_accept_dsts_url_as_an_authority(
self, oidc_discovery, instance_discovery):
self.setup_tenant_discovery(oidc_discovery)

app = msal.ClientApplication("id", authority=self.authority)
instance_discovery.assert_not_called()
oidc_discovery.assert_called_once_with(
Expand Down Expand Up @@ -274,3 +287,93 @@ def _test_turning_off_instance_discovery_should_skip_authority_validation_and_in
app.get_accounts() # This could make an instance metadata call for authority aliases
instance_metadata.assert_not_called()


class TestAuthorityIssuerValidation(unittest.TestCase):
"""Test cases for authority.has_valid_issuer method """

def setUp(self):
self.http_client = MinimalHttpClient()

def _create_authority_with_issuer(self, oidc_authority_url, issuer, tenant_discovery_mock):
tenant_discovery_mock.return_value = {
"authorization_endpoint": "https://example.com/oauth2/authorize",
"token_endpoint": "https://example.com/oauth2/token",
"issuer": issuer,
}
authority = Authority(
None,
self.http_client,
oidc_authority_url=oidc_authority_url
)
return authority

@patch("msal.authority.tenant_discovery")
def test_exact_match_issuer(self, tenant_discovery_mock):
"""Test when issuer exactly matches the OIDC authority URL"""
authority_url = "https://example.com/tenant"
authority = self._create_authority_with_issuer(authority_url, authority_url, tenant_discovery_mock)
self.assertTrue(authority.has_valid_issuer(), "Issuer should be valid when it exactly matches the authority URL")

@patch("msal.authority.tenant_discovery")
def test_no_issuer(self, tenant_discovery_mock):
"""Test when no issuer is returned from OIDC discovery"""
authority_url = "https://example.com/tenant"
tenant_discovery_mock.return_value = {
"authorization_endpoint": "https://example.com/oauth2/authorize",
"token_endpoint": "https://example.com/oauth2/token",
# No issuer key
}
# Since initialization now checks for valid issuer, we expect it to raise ValueError
with self.assertRaises(ValueError) as context:
Authority(None, self.http_client, oidc_authority_url=authority_url)
self.assertIn("issuer", str(context.exception).lower())

@patch("msal.authority.tenant_discovery")
def test_microsoft_host_issuer(self, tenant_discovery_mock):
"""Test when issuer has a known Microsoft host"""
authority_url = "https://custom-domain.com/tenant"
issuer = f"https://{WORLD_WIDE}/tenant"
authority = self._create_authority_with_issuer(authority_url, issuer, tenant_discovery_mock)
self.assertTrue(authority.has_valid_issuer(), "Issuer should be valid when it has a known Microsoft host")

@patch("msal.authority.tenant_discovery")
def test_same_scheme_and_host_different_path(self, tenant_discovery_mock):
"""Test when issuer has same scheme and host but different path"""
authority_url = "https://example.com/tenant"
issuer = "https://example.com/different/path"
authority = self._create_authority_with_issuer(authority_url, issuer, tenant_discovery_mock)
self.assertTrue(authority.has_valid_issuer(), "Issuer should be valid when it has the same scheme and host")

@patch("msal.authority.tenant_discovery")
def test_ciam_authority_with_matching_tenant(self, tenant_discovery_mock):
"""Test CIAM authority with matching tenant in path"""
authority_url = "https://custom-domain.com/tenant_name"
issuer = f"https://tenant_name{_CIAM_DOMAIN_SUFFIX}"
authority = self._create_authority_with_issuer(authority_url, issuer, tenant_discovery_mock)
self.assertTrue(authority.has_valid_issuer(), "Issuer should be valid for CIAM pattern with matching tenant")

@patch("msal.authority.tenant_discovery")
def test_ciam_authority_with_host_tenant(self, tenant_discovery_mock):
"""Test CIAM authority with tenant in hostname"""
tenant_name = "tenant_name"
authority_url = f"https://{tenant_name}{_CIAM_DOMAIN_SUFFIX}/custom/path"
issuer = f"https://{tenant_name}{_CIAM_DOMAIN_SUFFIX}"
authority = self._create_authority_with_issuer(authority_url, issuer, tenant_discovery_mock)
self.assertTrue(authority.has_valid_issuer(), "Issuer should be valid for CIAM pattern with tenant in hostname")

@patch("msal.authority.tenant_discovery")
def test_invalid_issuer(self, tenant_discovery_mock):
"""Test when issuer is completely different from authority"""
authority_url = "https://example.com/tenant"
issuer = "https://malicious-site.com/tenant"
tenant_discovery_mock.return_value = {
"authorization_endpoint": "https://example.com/oauth2/authorize",
"token_endpoint": "https://example.com/oauth2/token",
"issuer": issuer,
}
# Since initialization now checks for valid issuer, we expect it to raise ValueError
with self.assertRaises(ValueError) as context:
Authority(None, self.http_client, oidc_authority_url=authority_url)
self.assertIn("issuer", str(context.exception).lower())
self.assertIn(issuer, str(context.exception))
self.assertIn(authority_url, str(context.exception))
Loading