Skip to content

Add OIDC issuer validation #840

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 56 additions & 1 deletion msal/authority.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def __init__(
performed.
"""
self._http_client = http_client
self._oidc_authority_url = oidc_authority_url
if oidc_authority_url:
logger.debug("Initializing with OIDC authority: %s", oidc_authority_url)
tenant_discovery_endpoint = self._initialize_oidc_authority(
Expand Down Expand Up @@ -95,11 +96,19 @@ def __init__(
raise ValueError(error_message)
logger.debug(
'openid_config("%s") = %s', tenant_discovery_endpoint, openid_config)
self._issuer = openid_config.get('issuer')
self.authorization_endpoint = openid_config['authorization_endpoint']
self.token_endpoint = openid_config['token_endpoint']
self.device_authorization_endpoint = openid_config.get('device_authorization_endpoint')
_, _, self.tenant = canonicalize(self.token_endpoint) # Usually a GUID

# Validate the issuer if using OIDC authority
if self._oidc_authority_url and not self.has_valid_issuer():
raise ValueError((
"The issuer '{iss}' does not match the authority '{auth}' or a known pattern. "
"When using the 'oidc_authority' parameter in ClientApplication, the authority "
"will be validated against the issuer from {auth}/.well-known/openid-configuration ."
).format(iss=self._issuer, auth=oidc_authority_url))
def _initialize_oidc_authority(self, oidc_authority_url):
authority, self.instance, tenant = canonicalize(oidc_authority_url)
self.is_adfs = tenant.lower() == 'adfs' # As a convention
Expand Down Expand Up @@ -174,6 +183,53 @@ def user_realm_discovery(self, username, correlation_id=None, response=None):
self.__class__._domains_without_user_realm_discovery.add(self.instance)
return {} # This can guide the caller to fall back normal ROPC flow

def has_valid_issuer(self) -> bool:
"""
Returns True if the issuer from OIDC discovery is valid for this authority.

An issuer is valid if one of the following is true:
- It exactly matches the authority URL
- It has a known Microsoft host (e.g., login.microsoftonline.com)
- It has the same scheme and host as the authority (path can be different)
- For CIAM, the issuer follows the pattern of {tenant}.ciamlogin.com (tenant comes from the authority)
"""
if self._oidc_authority_url == self._issuer:
# The issuer matches the authority URL exactly
return True

issuer = urlparse(self._issuer) if self._issuer else None
if not issuer:
return False

# Check if issuer has a known Microsoft host
if issuer.hostname in WELL_KNOWN_AUTHORITY_HOSTS:
return True

# Check if issuer has the same scheme and host as the authority
authority = urlparse(self._oidc_authority_url)
if authority.scheme == issuer.scheme and authority.netloc == issuer.netloc:
return True

# Check CIAM scenario: issuer follows the pattern {tenant}.ciamlogin.com
# Extract tenant from authority URL - could be in the host or path
tenant = None
if authority.hostname.endswith(_CIAM_DOMAIN_SUFFIX):
# Normal CIAM host: {tenant}.ciamlogin.com
tenant = authority.hostname.rsplit(_CIAM_DOMAIN_SUFFIX, 1)[0]
else:
# Custom CIAM host: extract tenant from path
parts = authority.path.split('/')
if len(parts) >= 2 and parts[1]:
tenant = parts[1] # First path segment after the initial '/'

if tenant and issuer.hostname.endswith(_CIAM_DOMAIN_SUFFIX):
# Check if issuer follows the pattern {tenant}.ciamlogin.com
expected_issuer_host = f"{tenant}{_CIAM_DOMAIN_SUFFIX}"
if issuer.hostname == expected_issuer_host:
return True

# None of the conditions matched
return False

def canonicalize(authority_or_auth_endpoint):
# Returns (url_parsed_result, hostname_in_lowercase, tenant)
Expand Down Expand Up @@ -222,4 +278,3 @@ def tenant_discovery(tenant_discovery_endpoint, http_client, **kwargs):
resp.raise_for_status()
raise RuntimeError( # A fallback here, in case resp.raise_for_status() is no-op
"Unable to complete OIDC Discovery: %d, %s" % (resp.status_code, resp.text))

1 change: 1 addition & 0 deletions tests/test_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,7 @@ def test_should_fallback_when_pymsalruntime_failed_to_initialize_broker(self):
@patch("msal.authority.tenant_discovery", new=Mock(return_value={
"authorization_endpoint": "https://contoso.com/placeholder",
"token_endpoint": "https://contoso.com/placeholder",
"issuer": "https://contoso.com/placeholder",
}))
@patch("msal.application._init_broker", new=Mock()) # Pretend pymsalruntime installed and working
class TestBrokerFallbackWithDifferentAuthorities(unittest.TestCase):
Expand Down
127 changes: 115 additions & 12 deletions tests/test_authority.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import msal
from msal.authority import *
from msal.authority import _CIAM_DOMAIN_SUFFIX # Explicitly import the private constant
from tests import unittest
from tests.http_client import MinimalHttpClient

Expand Down Expand Up @@ -100,12 +101,12 @@ def test_authority_with_path_should_be_used_as_is(self, oidc_discovery):


@patch("msal.authority._instance_discovery")
@patch("msal.authority.tenant_discovery", return_value={
"authorization_endpoint": "https://contoso.com/authorize",
"token_endpoint": "https://contoso.com/token",
})
@patch("msal.authority.tenant_discovery") # Moved return_value out of the decorator
class OidcAuthorityTestCase(unittest.TestCase):
authority = "https://contoso.com/tenant"
authorization_endpoint = "https://contoso.com/authorize"
token_endpoint = "https://contoso.com/token"
issuer = "https://contoso.com/tenant" # Added as class variable for inheritance

def setUp(self):
# setUp() gives subclass a dynamic setup based on their authority
Expand All @@ -115,25 +116,37 @@ def setUp(self):
# Here the test is to confirm the OIDC endpoint contains no "/v2.0"
self.authority + "/.well-known/openid-configuration")

def setup_tenant_discovery(self, tenant_discovery):
"""Configure the tenant_discovery mock with class-specific values"""
tenant_discovery.return_value = {
"authorization_endpoint": self.authorization_endpoint,
"token_endpoint": self.token_endpoint,
"issuer": self.issuer,
}

def test_authority_obj_should_do_oidc_discovery_and_skip_instance_discovery(
self, oidc_discovery, instance_discovery):
self.setup_tenant_discovery(oidc_discovery)

c = MinimalHttpClient()
a = Authority(None, c, oidc_authority_url=self.authority)
instance_discovery.assert_not_called()
oidc_discovery.assert_called_once_with(self.oidc_discovery_endpoint, c)
self.assertEqual(a.authorization_endpoint, 'https://contoso.com/authorize')
self.assertEqual(a.token_endpoint, 'https://contoso.com/token')
self.assertEqual(a.authorization_endpoint, self.authorization_endpoint)
self.assertEqual(a.token_endpoint, self.token_endpoint)

def test_application_obj_should_do_oidc_discovery_and_skip_instance_discovery(
self, oidc_discovery, instance_discovery):
self.setup_tenant_discovery(oidc_discovery)

app = msal.ClientApplication(
"id", authority=None, oidc_authority=self.authority)
instance_discovery.assert_not_called()
oidc_discovery.assert_called_once_with(
self.oidc_discovery_endpoint, app.http_client)
self.assertEqual(
app.authority.authorization_endpoint, 'https://contoso.com/authorize')
self.assertEqual(app.authority.token_endpoint, 'https://contoso.com/token')
app.authority.authorization_endpoint, self.authorization_endpoint)
self.assertEqual(app.authority.token_endpoint, self.token_endpoint)


class DstsAuthorityTestCase(OidcAuthorityTestCase):
Expand All @@ -144,14 +157,14 @@ class DstsAuthorityTestCase(OidcAuthorityTestCase):
"https://some.url.dsts.core.azure-test.net/dstsv2/common/oauth2/authorize")
token_endpoint = (
"https://some.url.dsts.core.azure-test.net/dstsv2/common/oauth2/token")
issuer = "https://test-instance1-dsts.dsts.core.azure-test.net/dstsv2/common"

@patch("msal.authority._instance_discovery")
@patch("msal.authority.tenant_discovery", return_value={
"authorization_endpoint": authorization_endpoint,
"token_endpoint": token_endpoint,
}) # We need to create new patches (i.e. mocks) for non-inherited test cases
@patch("msal.authority.tenant_discovery") # Remove the hard-coded return_value
def test_application_obj_should_accept_dsts_url_as_an_authority(
self, oidc_discovery, instance_discovery):
self.setup_tenant_discovery(oidc_discovery)

app = msal.ClientApplication("id", authority=self.authority)
instance_discovery.assert_not_called()
oidc_discovery.assert_called_once_with(
Expand Down Expand Up @@ -274,3 +287,93 @@ def _test_turning_off_instance_discovery_should_skip_authority_validation_and_in
app.get_accounts() # This could make an instance metadata call for authority aliases
instance_metadata.assert_not_called()


class TestAuthorityIssuerValidation(unittest.TestCase):
"""Test cases for authority.has_valid_issuer method """

def setUp(self):
self.http_client = MinimalHttpClient()

def _create_authority_with_issuer(self, oidc_authority_url, issuer, tenant_discovery_mock):
tenant_discovery_mock.return_value = {
"authorization_endpoint": "https://example.com/oauth2/authorize",
"token_endpoint": "https://example.com/oauth2/token",
"issuer": issuer,
}
authority = Authority(
None,
self.http_client,
oidc_authority_url=oidc_authority_url
)
return authority

@patch("msal.authority.tenant_discovery")
def test_exact_match_issuer(self, tenant_discovery_mock):
"""Test when issuer exactly matches the OIDC authority URL"""
authority_url = "https://example.com/tenant"
authority = self._create_authority_with_issuer(authority_url, authority_url, tenant_discovery_mock)
self.assertTrue(authority.has_valid_issuer(), "Issuer should be valid when it exactly matches the authority URL")

@patch("msal.authority.tenant_discovery")
def test_no_issuer(self, tenant_discovery_mock):
"""Test when no issuer is returned from OIDC discovery"""
authority_url = "https://example.com/tenant"
tenant_discovery_mock.return_value = {
"authorization_endpoint": "https://example.com/oauth2/authorize",
"token_endpoint": "https://example.com/oauth2/token",
# No issuer key
}
# Since initialization now checks for valid issuer, we expect it to raise ValueError
with self.assertRaises(ValueError) as context:
Authority(None, self.http_client, oidc_authority_url=authority_url)
self.assertIn("issuer", str(context.exception).lower())

@patch("msal.authority.tenant_discovery")
def test_microsoft_host_issuer(self, tenant_discovery_mock):
"""Test when issuer has a known Microsoft host"""
authority_url = "https://custom-domain.com/tenant"
issuer = f"https://{WORLD_WIDE}/tenant"
authority = self._create_authority_with_issuer(authority_url, issuer, tenant_discovery_mock)
self.assertTrue(authority.has_valid_issuer(), "Issuer should be valid when it has a known Microsoft host")

@patch("msal.authority.tenant_discovery")
def test_same_scheme_and_host_different_path(self, tenant_discovery_mock):
"""Test when issuer has same scheme and host but different path"""
authority_url = "https://example.com/tenant"
issuer = "https://example.com/different/path"
authority = self._create_authority_with_issuer(authority_url, issuer, tenant_discovery_mock)
self.assertTrue(authority.has_valid_issuer(), "Issuer should be valid when it has the same scheme and host")

@patch("msal.authority.tenant_discovery")
def test_ciam_authority_with_matching_tenant(self, tenant_discovery_mock):
"""Test CIAM authority with matching tenant in path"""
authority_url = "https://custom-domain.com/tenant_name"
issuer = f"https://tenant_name{_CIAM_DOMAIN_SUFFIX}"
authority = self._create_authority_with_issuer(authority_url, issuer, tenant_discovery_mock)
self.assertTrue(authority.has_valid_issuer(), "Issuer should be valid for CIAM pattern with matching tenant")

@patch("msal.authority.tenant_discovery")
def test_ciam_authority_with_host_tenant(self, tenant_discovery_mock):
"""Test CIAM authority with tenant in hostname"""
tenant_name = "tenant_name"
authority_url = f"https://{tenant_name}{_CIAM_DOMAIN_SUFFIX}/custom/path"
issuer = f"https://{tenant_name}{_CIAM_DOMAIN_SUFFIX}"
authority = self._create_authority_with_issuer(authority_url, issuer, tenant_discovery_mock)
self.assertTrue(authority.has_valid_issuer(), "Issuer should be valid for CIAM pattern with tenant in hostname")

@patch("msal.authority.tenant_discovery")
def test_invalid_issuer(self, tenant_discovery_mock):
"""Test when issuer is completely different from authority"""
authority_url = "https://example.com/tenant"
issuer = "https://malicious-site.com/tenant"
tenant_discovery_mock.return_value = {
"authorization_endpoint": "https://example.com/oauth2/authorize",
"token_endpoint": "https://example.com/oauth2/token",
"issuer": issuer,
}
# Since initialization now checks for valid issuer, we expect it to raise ValueError
with self.assertRaises(ValueError) as context:
Authority(None, self.http_client, oidc_authority_url=authority_url)
self.assertIn("issuer", str(context.exception).lower())
self.assertIn(issuer, str(context.exception))
self.assertIn(authority_url, str(context.exception))
Loading