From c87d5f8561ce4609bb7521d16100175d62be924b Mon Sep 17 00:00:00 2001 From: Paul Bourhis Date: Sat, 10 Jan 2026 14:22:17 +0100 Subject: [PATCH 1/7] Enhance OAUTH2 and OIDC authentication support with improved claims handling and configuration options --- docs/en_US/oauth2.rst | 158 ++++++- web/pgadmin/authenticate/__init__.py | 28 +- web/pgadmin/authenticate/oauth2.py | 307 ++++++++++--- web/pgadmin/browser/tests/__init__.py | 9 + .../browser/tests/test_oauth2_with_mocking.py | 414 +++++++++++++----- 5 files changed, 718 insertions(+), 198 deletions(-) diff --git a/docs/en_US/oauth2.rst b/docs/en_US/oauth2.rst index 916a3a63c61..c42b5f1b9d9 100644 --- a/docs/en_US/oauth2.rst +++ b/docs/en_US/oauth2.rst @@ -1,15 +1,40 @@ .. _oauth2: -***************************************** -`Enabling OAUTH2 Authentication`:index: -***************************************** +******************************************************* +`Enabling OAUTH2 and OIDC Authentication`:index: +******************************************************* -To enable OAUTH2 authentication for pgAdmin, you must configure the OAUTH2 -settings in the *config_local.py* or *config_system.py* file (see the -:ref:`config.py ` documentation) on the system where pgAdmin is -installed in Server mode. You can copy these settings from *config.py* file -and modify the values for the following parameters: +To enable OAUTH2 or OpenID Connect (OIDC) authentication for pgAdmin, you must +configure the OAUTH2 settings in the *config_local.py* or *config_system.py* +file (see the :ref:`config.py ` documentation) on the system where +pgAdmin is installed in Server mode. You can copy these settings from *config.py* +file and modify the values for the following parameters. + +OAuth2 vs OpenID Connect (OIDC) +================================ + +pgAdmin supports both OAuth2 and OIDC authentication protocols: + +**OAuth2** is an authorization framework that allows third-party applications to +obtain limited access to user accounts. When using OAuth2, pgAdmin must explicitly +call the provider's userinfo endpoint to retrieve user profile information. + +**OpenID Connect (OIDC)** is an identity layer built on top of OAuth2 that provides +standardized user authentication and profile information. When using OIDC, user +identity information is included directly in the ID token, which is more efficient +and secure. + +.. note:: + When **OAUTH2_SERVER_METADATA_URL** is configured, pgAdmin treats the provider + as an OIDC provider and will: + + - Use ID token claims for user identity (sub, email, preferred_username) + - Skip the userinfo endpoint call when ID token contains sufficient information + - Validate the ID token automatically using the provider's public keys + + This is the **recommended approach** for modern identity providers like + Microsoft Entra ID (Azure AD), Google, Keycloak, Auth0, and Okta. .. _AzureAD: https://learn.microsoft.com/en-us/security/zero-trust/develop/configure-tokens-group-claims-app-roles @@ -32,20 +57,17 @@ and modify the values for the following parameters: "OAUTH2_CLIENT_SECRET", "Oauth2 Client Secret" "OAUTH2_TOKEN_URL", "Oauth2 Access Token endpoint" "OAUTH2_AUTHORIZATION_URL", "Endpoint for user authorization" - "OAUTH2_SERVER_METADATA_URL", "Server metadata url for your OAuth2 provider" + "OAUTH2_SERVER_METADATA_URL", "**OIDC Discovery URL** (recommended for OIDC providers). When set, pgAdmin will use OIDC flow with automatic ID token validation and user claims from the ID token. Example: *https://login.microsoftonline.com/{tenant}/v2.0/.well-known/openid-configuration*. When using this parameter, OAUTH2_TOKEN_URL and OAUTH2_AUTHORIZATION_URL are optional as they will be discovered automatically." "OAUTH2_API_BASE_URL", "Oauth2 base URL endpoint to make requests simple, ex: *https://api.github.com/*" - "OAUTH2_USERINFO_ENDPOINT", "User Endpoint, ex: *user* (for github, or *user/emails* if the user's email address is private) and *userinfo* (for google)," - "OAUTH2_SCOPE", "Oauth scope, ex: 'openid email profile'. Note that an 'email' claim is required in the resulting profile." + "OAUTH2_USERINFO_ENDPOINT", "User Endpoint, ex: *user* (for github, or *user/emails* if the user's email address is private) and *userinfo* (for google). **For OIDC providers**, this is optional if the ID token contains sufficient claims (email, preferred_username, or sub)." + "OAUTH2_SCOPE", "Oauth scope, ex: 'openid email profile'. **For OIDC providers**, include 'openid' scope to receive an ID token." "OAUTH2_ICON", "The Font-awesome icon to be placed on the oauth2 button, ex: fa-github" "OAUTH2_BUTTON_COLOR", "Oauth2 button color" - "OAUTH2_USERNAME_CLAIM", "The claim which is used for the username. If the value is empty - the email is used as username, but if a value is provided, the claim has to exist. Ex: *oid* (for AzureAD), *email* (for Github)" + "OAUTH2_USERNAME_CLAIM", "The claim which is used for the username. If the value is empty, **for OIDC providers** pgAdmin will use: 1) email, 2) preferred_username, or 3) sub (in that order). **For OAuth2 providers** without OIDC, email is required. Ex: *oid* (for AzureAD), *email* (for Github), *preferred_username* (for Keycloak)" "OAUTH2_AUTO_CREATE_USER", "Set the value to *True* if you want to automatically create a pgAdmin user corresponding to a successfully authenticated Oauth2 user. Please note that password is not stored in the pgAdmin database." - "OAUTH2_ADDITIONAL_CLAIMS", "If a dictionary is provided, pgAdmin will check for a matching key and value on the userinfo endpoint - and in the Id Token. In case there is no match with the provided config, the user will receive an authorization error. - Useful for checking AzureAD_ *wids* or *groups*, GitLab_ *owner*, *maintainer* and *reporter* claims." + "OAUTH2_ADDITIONAL_CLAIMS", "If a dictionary is provided, pgAdmin will check for a matching key and value on the **ID token first** (for OIDC providers), then fall back to the userinfo endpoint response. In case there is no match with the provided config, the user will receive an authorization error. Useful for checking AzureAD_ *wids* or *groups*, GitLab_ *owner*, *maintainer* and *reporter* claims." "OAUTH2_SSL_CERT_VERIFICATION", "Set this variable to False to disable SSL certificate verification for OAuth2 provider. This may need to set False, in case of self-signed certificates." "OAUTH2_CHALLENGE_METHOD", "Enable PKCE workflow. PKCE method name, only *S256* is supported" @@ -83,3 +105,107 @@ Ref: https://oauth.net/2/pkce To enable PKCE workflow, set the configuration parameters OAUTH2_CHALLENGE_METHOD to *S256* and OAUTH2_RESPONSE_TYPE to *code*. Both parameters are mandatory to enable PKCE workflow. + +OIDC Configuration Examples +============================ + +Using OIDC with Discovery Metadata (Recommended) +------------------------------------------------- + +When using OIDC providers, configure the **OAUTH2_SERVER_METADATA_URL** parameter +to enable automatic discovery and ID token validation: + +.. code-block:: python + + OAUTH2_CONFIG = [{ + 'OAUTH2_NAME': 'my-oidc-provider', + 'OAUTH2_DISPLAY_NAME': 'My OIDC Provider', + 'OAUTH2_CLIENT_ID': 'your-client-id', + 'OAUTH2_CLIENT_SECRET': 'your-client-secret', + 'OAUTH2_SERVER_METADATA_URL': 'https://provider.example.com/.well-known/openid-configuration', + 'OAUTH2_SCOPE': 'openid email profile', + # OAUTH2_USERINFO_ENDPOINT is optional when using OIDC + # Token and authorization URLs are discovered automatically + }] + +With this configuration: + +- pgAdmin will use the OIDC discovery endpoint to automatically find token and authorization URLs +- User identity will be extracted from ID token claims (sub, email, preferred_username) +- The userinfo endpoint will only be called as a fallback if ID token lacks required claims +- ID token will be automatically validated using the provider's public keys + +Username Resolution for OIDC +----------------------------- + +When **OAUTH2_SERVER_METADATA_URL** is configured (OIDC mode), pgAdmin will +resolve the username in the following order: + +1. **OAUTH2_USERNAME_CLAIM** (if configured) - checks ID token first, then userinfo +2. **email** claim from ID token or userinfo endpoint +3. **preferred_username** claim from ID token (standard OIDC claim) +4. **sub** claim from ID token (always present in OIDC, used as last resort) + +Example with custom username claim: + +.. code-block:: python + + OAUTH2_CONFIG = [{ + # ... other config ... + 'OAUTH2_USERNAME_CLAIM': 'preferred_username', + # pgAdmin will use 'preferred_username' from ID token for the username + }] + +Example without custom claim (uses automatic fallback): + +.. code-block:: python + + OAUTH2_CONFIG = [{ + # ... other config ... + # No OAUTH2_USERNAME_CLAIM specified + # pgAdmin will try: email -> preferred_username -> sub + }] + +Additional Claims Authorization with OIDC +------------------------------------------ + +When using **OAUTH2_ADDITIONAL_CLAIMS** with OIDC providers, pgAdmin will: + +1. Check the ID token claims first (more secure, no additional network call) +2. Fall back to userinfo endpoint response if needed + +Example: + +.. code-block:: python + + OAUTH2_CONFIG = [{ + # ... other config ... + 'OAUTH2_ADDITIONAL_CLAIMS': { + 'groups': ['admin-group', 'pgadmin-users'], + 'roles': ['database-admin'] + }, + # pgAdmin will check these claims in ID token first, + # then userinfo endpoint if not found + }] + +Legacy OAuth2 Configuration (Without OIDC) +------------------------------------------- + +For providers that don't support OIDC discovery, configure all endpoints manually: + +.. code-block:: python + + OAUTH2_CONFIG = [{ + 'OAUTH2_NAME': 'github', + 'OAUTH2_DISPLAY_NAME': 'GitHub', + 'OAUTH2_CLIENT_ID': 'your-client-id', + 'OAUTH2_CLIENT_SECRET': 'your-client-secret', + 'OAUTH2_TOKEN_URL': 'https://github.com/login/oauth/access_token', + 'OAUTH2_AUTHORIZATION_URL': 'https://github.com/login/oauth/authorize', + 'OAUTH2_API_BASE_URL': 'https://api.github.com/', + 'OAUTH2_USERINFO_ENDPOINT': 'user', + 'OAUTH2_SCOPE': 'user:email', + # No OAUTH2_SERVER_METADATA_URL - pure OAuth2 mode + }] + +In this mode, user identity is retrieved only from the userinfo endpoint. diff --git a/web/pgadmin/authenticate/__init__.py b/web/pgadmin/authenticate/__init__.py index 84f5e9ccc21..7ed8b79029d 100644 --- a/web/pgadmin/authenticate/__init__.py +++ b/web/pgadmin/authenticate/__init__.py @@ -227,15 +227,27 @@ def as_dict(self): return res def update_auth_sources(self): - for auth_src in [KERBEROS, OAUTH2]: - if auth_src in self.auth_sources: - if 'internal_button' in request.form: + # Only mutate the ordered list of auth sources when a user explicitly + # selected an auth method on the login form. + # + # Without this guard, a plain internal login POST (email/password) can + # incorrectly drop INTERNAL/LDAP and try OAUTH2 first, which then fails + # because no oauth2 provider button was provided. + if request.method != 'POST': + return + + if 'internal_button' in request.form: + for auth_src in [KERBEROS, OAUTH2]: + if auth_src in self.auth_sources: self.auth_sources.remove(auth_src) - else: - if INTERNAL in self.auth_sources: - self.auth_sources.remove(INTERNAL) - if LDAP in self.auth_sources: - self.auth_sources.remove(LDAP) + return + + if 'oauth2_button' in request.form: + if INTERNAL in self.auth_sources: + self.auth_sources.remove(INTERNAL) + if LDAP in self.auth_sources: + self.auth_sources.remove(LDAP) + return def set_current_source(self, source): self.current_source = source diff --git a/web/pgadmin/authenticate/oauth2.py b/web/pgadmin/authenticate/oauth2.py index 076e4589ecf..5db2d3e72c1 100644 --- a/web/pgadmin/authenticate/oauth2.py +++ b/web/pgadmin/authenticate/oauth2.py @@ -12,7 +12,7 @@ import config from authlib.integrations.flask_client import OAuth -from flask import current_app, url_for, session, request,\ +from flask import current_app, url_for, session, request, \ redirect, Flask, flash from flask_babel import gettext from flask_security import login_user, current_user @@ -104,6 +104,10 @@ class OAuth2Authentication(BaseAuthentication): email_keys = ['mail', 'email'] def __init__(self): + # Selected provider name (set during authenticate()). + # Initializing avoids AttributeError in edge cases/tests. + self.oauth2_current_client = None + for oauth2_config in config.OAUTH2_CONFIG: OAuth2Authentication.oauth2_config[ @@ -147,11 +151,39 @@ def get_source_name(self): return OAUTH2 def get_friendly_name(self): - return self.oauth2_config[self.oauth2_current_client]['OAUTH2_NAME'] + provider = self.oauth2_config.get(self.oauth2_current_client) + if not provider: + return OAUTH2 + return provider.get('OAUTH2_NAME', OAUTH2) def validate(self, form): return True, None + def _is_oidc_provider(self): + """ + Determine if the current provider is configured as an OIDC provider. + Returns True if OAUTH2_SERVER_METADATA_URL is defined. + """ + provider = self.oauth2_config.get(self.oauth2_current_client) + if not provider: + return False + return 'OAUTH2_SERVER_METADATA_URL' in provider and \ + provider['OAUTH2_SERVER_METADATA_URL'] is not None + + def _get_id_token_claims(self): + """ + Extract and return claims from the ID token. + When using OIDC with server_metadata_url, Authlib automatically + parses the ID token and stores the claims in the 'userinfo' key + of the token response. + + Returns: + dict: ID token claims, or empty dict if not available + """ + if not self._is_oidc_provider(): + return {} + return session.get('oauth2_token', {}).get('userinfo', {}) + def get_profile_dict(self, profile): """ Returns the dictionary from profile @@ -165,6 +197,94 @@ def get_profile_dict(self, profile): else: return {} + def _resolve_username(self, id_token_claims, profile_dict): + """ + Resolve username from available claims with OIDC-aware fallback. + + Resolution order: + 1. If OAUTH2_USERNAME_CLAIM is configured, use that claim from + ID token first, then userinfo profile + 2. For OIDC providers, check in order: email, preferred_username, sub + 3. For non-OIDC providers, use email only + + Args: + id_token_claims (dict): Claims from ID token + profile_dict (dict): Claims from userinfo endpoint + + Returns: + tuple: (username, email) or (None, None) if resolution fails + """ + provider = self.oauth2_config.get(self.oauth2_current_client, {}) + username_claim = provider.get('OAUTH2_USERNAME_CLAIM') + + # Extract email from profile (backward compatibility) + email_key = [value for value in self.email_keys + if value in profile_dict.keys()] + email = profile_dict[email_key[0]] if email_key else None + + # If specific username claim is configured, look for it + if username_claim: + # Check ID token claims first + if username_claim in id_token_claims: + username = id_token_claims[username_claim] + current_app.logger.debug( + f'Found username claim "{username_claim}" ' + f'in ID token: {username}') + return username, email + # Fall back to userinfo profile + elif username_claim in profile_dict: + username = profile_dict[username_claim] + current_app.logger.debug( + f'Found username claim "{username_claim}" ' + f'in profile: {username}') + return username, email + else: + current_app.logger.error( + f'Required username claim "{username_claim}" ' + f'not found in ID token or profile') + return None, email + + # For OIDC providers, use standard claim hierarchy + if self._is_oidc_provider(): + # Priority 1: email (from ID token or profile) + if 'email' in id_token_claims: + username = id_token_claims['email'] + # Use as email if not found elsewhere + email = email or username + current_app.logger.debug( + f'Using email from ID token as username: {username}') + return username, email + elif email: + current_app.logger.debug( + f'Using email from profile as username: {email}') + return email, email + + # Priority 2: preferred_username + if 'preferred_username' in id_token_claims: + username = id_token_claims['preferred_username'] + current_app.logger.debug( + f'Using preferred_username from ID token: {username}') + return username, email + + # Priority 3: sub (always present in OIDC) + if 'sub' in id_token_claims: + username = id_token_claims['sub'] + current_app.logger.debug( + f'Using sub from ID token as last resort: {username}') + return username, email + + # Should not reach here for valid OIDC provider + current_app.logger.warning( + 'OIDC provider but no standard claims found in ID token') + + # For non-OIDC OAuth2 providers, email is required + if email: + current_app.logger.debug( + f'Using email as username for OAuth2 provider: {email}') + return email, email + + return None, None + def login(self, form): profile = self.get_user_profile() profile_dict = self.get_profile_dict(profile) @@ -172,46 +292,45 @@ def login(self, form): current_app.logger.debug(f"profile: {profile}") current_app.logger.debug(f"profile_dict: {profile_dict}") - if not profile_dict: + # Get ID token claims for OIDC providers + id_token_claims = self._get_id_token_claims() + current_app.logger.debug(f"id_token_claims: {id_token_claims}") + + # For OIDC providers, we must have either ID token claims or profile + if ( + self._is_oidc_provider() and + not id_token_claims and + not profile_dict + ): + error_msg = "No profile data found from OIDC provider." + current_app.logger.exception(error_msg) + return False, gettext(error_msg) + + # For non-OIDC providers, profile is required + if not self._is_oidc_provider() and not profile_dict: error_msg = "No profile data found." current_app.logger.exception(error_msg) return False, gettext(error_msg) - email_key = [ - value for value in self.email_keys - if value in profile_dict.keys() - ] - email = profile_dict[email_key[0]] if (len(email_key) > 0) else None + # Resolve username using OIDC-aware logic + username, email = self._resolve_username(id_token_claims, profile_dict) - username = email - username_claim = None - if 'OAUTH2_USERNAME_CLAIM' in self.oauth2_config[ - self.oauth2_current_client]: - username_claim = self.oauth2_config[ - self.oauth2_current_client - ]['OAUTH2_USERNAME_CLAIM'] - if username_claim is not None: - id_token = session['oauth2_token'].get('userinfo', {}) - if username_claim in profile: - username = profile[username_claim] - current_app.logger.debug('Found username claim in profile') - elif username_claim in id_token: - username = id_token[username_claim] - current_app.logger.debug('Found username claim in id_token') + if not username: + if self._is_oidc_provider(): + error_msg = ( + 'Could not extract username from OIDC claims. ' + 'Please ensure your OIDC provider returns standard ' + 'claims (email, preferred_username, or sub).' + ) else: - error_msg = "The claim '%s' is required to login into " \ - "pgAdmin. Please update your OAuth2 profile." % ( - username_claim) - current_app.logger.exception(error_msg) - return False, gettext(error_msg) - else: - if not email or email == '': - error_msg = "An email id or OAUTH2_USERNAME_CLAIM is" \ - " required to login into pgAdmin. Please update your" \ - " OAuth2 profile for email id or set" \ - " OAUTH2_USERNAME_CLAIM config parameter." - current_app.logger.exception(error_msg) - return False, gettext(error_msg) + error_msg = ( + 'An email id or OAUTH2_USERNAME_CLAIM is required to ' + 'login into pgAdmin. Please update your OAuth2 profile ' + 'for email id or set OAUTH2_USERNAME_CLAIM config ' + 'parameter.' + ) + current_app.logger.exception(error_msg) + return False, gettext(error_msg) additional_claims = None if 'OAUTH2_ADDITIONAL_CLAIMS' in self.oauth2_config[ @@ -221,27 +340,54 @@ def login(self, form): self.oauth2_current_client ]['OAUTH2_ADDITIONAL_CLAIMS'] - # checking oauth provider userinfo response - valid_profile, reason = self.__is_any_claim_valid(profile, - additional_claims) - current_app.logger.debug(f"profile claims: {profile}") - current_app.logger.debug(f"reason: {reason}") - - # checking oauth provider idtoken claims - id_token_claims = session.get('oauth2_token', {}).get('userinfo',{}) - valid_idtoken, reason = self.__is_any_claim_valid(id_token_claims, - additional_claims) - current_app.logger.debug(f"idtoken claims: {id_token_claims}") - current_app.logger.debug(f"reason: {reason}") - - if not valid_profile and not valid_idtoken: - return_msg = "The user is not authorized to login" \ - " based on your identity profile." \ - " Please contact your administrator." - audit_msg = f"The authenticated user {username} is not" \ - " authorized to access pgAdmin based on OAUTH2 config. " \ - f"Reason: additional claim required {additional_claims}, " \ - f"profile claims {profile}, idtoken cliams {id_token_claims}." + # For OIDC providers, check ID token claims first, then userinfo + # For non-OIDC providers, check userinfo only + if self._is_oidc_provider(): + valid_idtoken, reason = self.__is_any_claim_valid( + id_token_claims, additional_claims) + current_app.logger.debug( + f'ID token claims: {id_token_claims}' + ) + current_app.logger.debug( + f'ID token validation reason: {reason}' + ) + + # If ID token validation succeeds, we're done + if valid_idtoken: + valid_combined = True + else: + # Fall back to userinfo profile + valid_profile, reason = self.__is_any_claim_valid( + profile, additional_claims) + current_app.logger.debug( + f'Profile claims: {profile}' + ) + current_app.logger.debug( + f'Profile validation reason: {reason}' + ) + valid_combined = valid_profile + else: + # Non-OIDC: only check userinfo profile + valid_combined, reason = self.__is_any_claim_valid( + profile, additional_claims) + current_app.logger.debug( + f'Profile claims: {profile}' + ) + current_app.logger.debug( + f'Validation reason: {reason}' + ) + + if not valid_combined: + return_msg = ( + 'The user is not authorized to login based on your identity ' + 'profile. Please contact your administrator.' + ) + audit_msg = ( + f'The authenticated user {username} is not authorized to ' + 'access pgAdmin based on OAUTH2 config. ' + f'Reason: additional claim required {additional_claims}, ' + f'profile claims {profile}, ID token claims {id_token_claims}.' + ) current_app.logger.warning(audit_msg) return False, return_msg @@ -267,6 +413,47 @@ def get_user_profile(self): session['oauth2_logout_url'] = self.oauth2_config[ self.oauth2_current_client]['OAUTH2_LOGOUT_URL'] + # For OIDC providers with server_metadata_url, Authlib automatically + # parses the ID token and stores claims in the 'userinfo' key. + # We can skip the userinfo endpoint call if ID token has sufficient + # data. + if self._is_oidc_provider(): + id_token_claims = session.get('oauth2_token', {}).get( + 'userinfo', {} + ) + # Check if we have basic required claims in ID token + has_sufficient_claims = any([ + 'email' in id_token_claims, + 'preferred_username' in id_token_claims, + 'sub' in id_token_claims + ]) + + if has_sufficient_claims: + current_app.logger.debug( + 'OIDC provider: using ID token claims, ' + 'skipping userinfo endpoint') + # Return ID token claims as profile + return id_token_claims + else: + current_app.logger.debug( + 'OIDC provider: ID token lacks standard claims, ' + 'falling back to userinfo endpoint') + + # For non-OIDC providers or when ID token is insufficient, + # call the userinfo endpoint + if 'OAUTH2_USERINFO_ENDPOINT' not in self.oauth2_config[ + self.oauth2_current_client]: + if self._is_oidc_provider(): + # OIDC provider should have provided claims in ID token + current_app.logger.warning( + 'OIDC provider has no userinfo endpoint configured ' + 'and ID token lacks standard claims') + else: + current_app.logger.error( + 'OAUTH2_USERINFO_ENDPOINT not configured for ' + 'non-OIDC provider') + return {} + resp = self.oauth2_clients[self.oauth2_current_client].get( self.oauth2_config[ self.oauth2_current_client]['OAUTH2_USERINFO_ENDPOINT'], @@ -276,7 +463,11 @@ def get_user_profile(self): return resp.json() def authenticate(self, form): - self.oauth2_current_client = request.form['oauth2_button'] + # Prefer the explicit oauth2 button value. + # Avoid raising BadRequestKeyError when oauth2 isn't selected. + self.oauth2_current_client = request.form.get('oauth2_button') + if not self.oauth2_current_client: + return False, gettext('No OAuth2 provider selected.') redirect_url = url_for(OAUTH2_AUTHORIZE, _external=True) if self.oauth2_current_client not in self.oauth2_clients: diff --git a/web/pgadmin/browser/tests/__init__.py b/web/pgadmin/browser/tests/__init__.py index f33b2742b78..00e23a7b16c 100644 --- a/web/pgadmin/browser/tests/__init__.py +++ b/web/pgadmin/browser/tests/__init__.py @@ -11,5 +11,14 @@ class BrowserGenerateTestCase(BaseTestGenerator): + # This is a smoke/placeholder test to ensure the browser tests package is + # discovered by the regression runner. It should not require a live server + # connection. + def setUp(self): + return + + def tearDown(self): + return + def runTest(self): return diff --git a/web/pgadmin/browser/tests/test_oauth2_with_mocking.py b/web/pgadmin/browser/tests/test_oauth2_with_mocking.py index 26337486318..0bddb2530b3 100644 --- a/web/pgadmin/browser/tests/test_oauth2_with_mocking.py +++ b/web/pgadmin/browser/tests/test_oauth2_with_mocking.py @@ -12,8 +12,8 @@ from regression.python_test_utils import test_utils as utils from pgadmin.authenticate.registry import AuthSourceRegistry from unittest.mock import patch, MagicMock -from pgadmin.authenticate import AuthSourceManager from pgadmin.utils.constants import OAUTH2, INTERNAL +from flask import current_app, redirect class Oauth2LoginMockTestCase(BaseTestGenerator): @@ -24,38 +24,83 @@ class Oauth2LoginMockTestCase(BaseTestGenerator): scenarios = [ ('Oauth2 External Authentication', dict( - auth_source=['oauth2'], oauth2_provider='github', - flag=1 + kind='external_redirect', + profile={}, + id_token_claims=None, )), ('Oauth2 Authentication', dict( - auth_source=['oauth2'], oauth2_provider='github', - flag=2 + kind='login_success', + profile={'email': 'oauth2@gmail.com'}, + id_token_claims=None, )), ('Oauth2 Additional Claims Authentication', dict( - auth_source=['oauth2'], oauth2_provider='auth-with-additional-claim-check', - flag=3 + kind='login_success', + profile={'email': 'oauth2@gmail.com', 'wids': ['789']}, + id_token_claims=None, )), ('Oauth2 PKCE Support', dict( - auth_source=['oauth2'], oauth2_provider='keycloak-pkce', - flag=4 + kind='pkce', + profile={}, + id_token_claims=None, + )), + ('OIDC Uses ID Token Claims', dict( + oauth2_provider='oidc-basic', + kind='login_success', + profile={}, + id_token_claims={'email': 'oidc@example.com', 'sub': 'abc'}, + )), + ('OIDC Falls Back To Profile Email', dict( + oauth2_provider='oidc-basic', + kind='login_success', + profile={'email': 'fallback@example.com'}, + id_token_claims={'sub': 'abc'}, + )), + ('OIDC Username Claim Precedence', dict( + oauth2_provider='oidc-username-claim', + kind='login_success', + profile={'email': 'email@example.com'}, + id_token_claims={'preferred_username': 'preferred-user'}, + )), + ('OIDC Additional Claims Via ID Token', dict( + oauth2_provider='oidc-additional-claims', + kind='login_success', + profile={'email': 'claims@example.com'}, + id_token_claims={'groups': ['group-a']}, + )), + ('OIDC Additional Claims Rejected', dict( + oauth2_provider='oidc-additional-claims', + kind='login_failure', + profile={'email': 'claims@example.com'}, + id_token_claims={'groups': ['group-b']}, + )), + ('OIDC get_user_profile Skips Userinfo', dict( + oauth2_provider='oidc-basic', + kind='oidc_get_user_profile_skip', + profile={}, + id_token_claims=None, + )), + ('OIDC get_user_profile Calls Userinfo', dict( + oauth2_provider='oidc-basic', + kind='oidc_get_user_profile_call', + profile={}, + id_token_claims=None, )), ] @classmethod def setUpClass(cls): - """ - We need to logout the test client as we are testing - OAuth2 login scenarios. - """ + """Logout the test client as we are testing OAuth2 login scenarios.""" cls.tester.logout() def setUp(self): - app_config.AUTHENTICATION_SOURCES = self.auth_source + app_config.AUTHENTICATION_SOURCES = [OAUTH2] self.app.PGADMIN_EXTERNAL_AUTH_SOURCE = OAUTH2 + # Ensure OAuth2 users can be created during tests. + app_config.OAUTH2_AUTO_CREATE_USER = True app_config.OAUTH2_CONFIG = [ { 'OAUTH2_NAME': 'github', @@ -111,6 +156,49 @@ def setUp(self): 'OAUTH2_BUTTON_COLOR': '#3253a8', 'OAUTH2_CHALLENGE_METHOD': 'S256', 'OAUTH2_RESPONSE_TYPE': 'code', + }, + { + 'OAUTH2_NAME': 'oidc-basic', + 'OAUTH2_DISPLAY_NAME': 'OIDC Basic', + 'OAUTH2_CLIENT_ID': 'testclientid', + 'OAUTH2_CLIENT_SECRET': 'testclientsec', + 'OAUTH2_TOKEN_URL': 'https://oidc.example/token', + 'OAUTH2_AUTHORIZATION_URL': 'https://oidc.example/auth', + 'OAUTH2_API_BASE_URL': 'https://oidc.example/', + 'OAUTH2_USERINFO_ENDPOINT': 'userinfo', + 'OAUTH2_SCOPE': 'openid email profile', + 'OAUTH2_SERVER_METADATA_URL': + 'https://oidc.example/.well-known/openid-configuration', + }, + { + 'OAUTH2_NAME': 'oidc-username-claim', + 'OAUTH2_DISPLAY_NAME': 'OIDC Username Claim', + 'OAUTH2_CLIENT_ID': 'testclientid', + 'OAUTH2_CLIENT_SECRET': 'testclientsec', + 'OAUTH2_TOKEN_URL': 'https://oidc.example/token', + 'OAUTH2_AUTHORIZATION_URL': 'https://oidc.example/auth', + 'OAUTH2_API_BASE_URL': 'https://oidc.example/', + 'OAUTH2_USERINFO_ENDPOINT': 'userinfo', + 'OAUTH2_SCOPE': 'openid email profile', + 'OAUTH2_SERVER_METADATA_URL': + 'https://oidc.example/.well-known/openid-configuration', + 'OAUTH2_USERNAME_CLAIM': 'preferred_username', + }, + { + 'OAUTH2_NAME': 'oidc-additional-claims', + 'OAUTH2_DISPLAY_NAME': 'OIDC Additional Claims', + 'OAUTH2_CLIENT_ID': 'testclientid', + 'OAUTH2_CLIENT_SECRET': 'testclientsec', + 'OAUTH2_TOKEN_URL': 'https://oidc.example/token', + 'OAUTH2_AUTHORIZATION_URL': 'https://oidc.example/auth', + 'OAUTH2_API_BASE_URL': 'https://oidc.example/', + 'OAUTH2_USERINFO_ENDPOINT': 'userinfo', + 'OAUTH2_SCOPE': 'openid email profile', + 'OAUTH2_SERVER_METADATA_URL': + 'https://oidc.example/.well-known/openid-configuration', + 'OAUTH2_ADDITIONAL_CLAIMS': { + 'groups': ['group-a'] + } } ] @@ -121,96 +209,151 @@ def runTest(self): "Can not run Oauth2 Authentication in the Desktop mode." ) - if self.flag == 1: - self.test_external_authentication() - elif self.flag == 2: - self.test_oauth2_authentication() - elif self.flag == 3: - self.test_oauth2_authentication_with_additional_claims_success() - elif self.flag == 4: - self.test_oauth2_authentication_with_pkce() - - def test_external_authentication(self): - """ - Ensure that the user should be redirected - to the external url for the authentication. - """ - - AuthSourceManager.update_auth_sources = MagicMock() + self._reset_oauth2_state() - try: - self.tester.login( + if self.kind == 'external_redirect': + self._test_external_authentication(self.oauth2_provider) + elif self.kind == 'pkce': + self.test_oauth2_authentication_with_pkce() + elif self.kind == 'login_success': + self._test_oauth2_login_success( + self.oauth2_provider, self.profile, self.id_token_claims + ) + elif self.kind == 'login_failure': + self._test_oauth2_login_failure( + self.oauth2_provider, self.profile, self.id_token_claims + ) + elif self.kind == 'oidc_get_user_profile_skip': + self._test_oidc_get_user_profile_skip_userinfo( + self.oauth2_provider + ) + elif self.kind == 'oidc_get_user_profile_call': + self._test_oidc_get_user_profile_calls_userinfo( + self.oauth2_provider + ) + else: + self.fail(f'Unknown test kind: {self.kind}') + + def _reset_oauth2_state(self): + """Reset singleton caches so each subTest gets a clean OAuth2 state.""" + # Clear AuthSourceRegistry singleton instances. + AuthSourceRegistry._objects = dict() + + # Clear per-app cache of instantiated auth sources. + with self.app.app_context(): + cached = getattr(current_app, '_pgadmin_auth_sources', None) + if isinstance(cached, dict): + cached.clear() + else: + setattr(current_app, '_pgadmin_auth_sources', {}) + + # Clear OAuth2Authentication class-level caches. + from pgadmin.authenticate.oauth2 import OAuth2Authentication + OAuth2Authentication.oauth2_clients = {} + OAuth2Authentication.oauth2_config = {} + + def _assert_oauth2_session_logged_in(self): + with self.tester.session_transaction() as sess: + asm = sess.get('auth_source_manager') + self.assertIsNotNone(asm) + self.assertEqual(asm.get('current_source'), OAUTH2) + + def _assert_oauth2_session_not_logged_in(self): + with self.tester.session_transaction() as sess: + asm = sess.get('auth_source_manager') + self.assertTrue(asm is None or asm == {}) + + def _test_external_authentication(self, provider): + """Ensure the user is redirected to an external URL.""" + from pgadmin.authenticate.oauth2 import OAuth2Authentication + + def _fake_authenticate(self, _form): + self.oauth2_current_client = provider + return False, redirect('https://example.com/') + + with patch.object( + OAuth2Authentication, 'authenticate', new=_fake_authenticate + ): + try: + self.tester.login( + email=None, password=None, + _follow_redirects=True, + headers=None, + extra_form_data=dict(oauth2_button=provider) + ) + except Exception as e: + self.assertEqual( + 'Following external redirects is not supported.', + str(e) + ) + + def _test_oauth2_login_success( + self, provider, profile, id_token_claims=None + ): + from pgadmin.authenticate.oauth2 import OAuth2Authentication + + def _fake_authenticate(self, _form): + self.oauth2_current_client = provider + # Important: AuthSourceManager may be constructed with a dict + # form for oauth2_button flows, so avoid returning a username. + return True, None + + def _fake_get_user_profile(self): + if id_token_claims is not None: + from flask import session + session['oauth2_token'] = { + 'access_token': 'test-access-token', + 'userinfo': id_token_claims + } + return profile + + with patch.object( + OAuth2Authentication, 'authenticate', new=_fake_authenticate + ), patch.object( + OAuth2Authentication, 'get_user_profile', + new=_fake_get_user_profile + ): + res = self.tester.login( email=None, password=None, _follow_redirects=True, headers=None, - extra_form_data=dict(oauth2_button=self.oauth2_provider) + extra_form_data=dict(oauth2_button=provider) ) - except Exception as e: - self.assertEqual('Following external' - ' redirects is not supported.', str(e)) - - def test_oauth2_authentication(self): - """ - Ensure that when the client sends an correct authorization token, - they receive a 200 OK response and the user principal is extracted and - passed on to the routed method. - """ - - profile = self.mock_user_profile() - - # Mock Oauth2 Authenticate - AuthSourceRegistry._registry[OAUTH2].authenticate = MagicMock( - return_value=[True, '']) - - AuthSourceManager.update_auth_sources = MagicMock() - - # Create AuthSourceManager object - auth_obj = AuthSourceManager({}, [OAUTH2]) - auth_source = AuthSourceRegistry.get(OAUTH2) - auth_obj.set_source(auth_source) - auth_obj.set_current_source(auth_source.get_source_name()) - - # Check the login with Oauth2 - res = self.tester.login(email=None, password=None, - _follow_redirects=True, - headers=None, - extra_form_data=dict( - oauth2_button=self.oauth2_provider) - ) - - respdata = 'Gravatar image for %s' % profile['email'] - self.assertTrue(respdata in res.data.decode('utf8')) - - def test_oauth2_authentication_with_additional_claims_success(self): - """ - Ensure that when an oauth2 config has a dict OAUTH2_ADDITIONAL_CLAIMS, - any match of the OAUTH2_ADDITIONAL_CLAIMS dict will allow user login. - """ - - profile = self.mock_user_profile_with_additional_claims() - - # Mock Oauth2 Authenticate - AuthSourceRegistry._registry[OAUTH2].authenticate = MagicMock( - return_value=[True, '']) - - AuthSourceManager.update_auth_sources = MagicMock() - - # Create AuthSourceManager object - auth_obj = AuthSourceManager({}, [OAUTH2]) - auth_source = AuthSourceRegistry.get(OAUTH2) - auth_obj.set_source(auth_source) - auth_obj.set_current_source(auth_source.get_source_name()) - - # Check the login with Oauth2 - res = self.tester.login(email=None, password=None, - _follow_redirects=True, - headers=None, - extra_form_data=dict( - oauth2_button=self.oauth2_provider) - ) - - respdata = 'Gravatar image for %s' % profile['email'] - self.assertTrue(respdata in res.data.decode('utf8')) + self.assertEqual(res.status_code, 200) + self._assert_oauth2_session_logged_in() + + def _test_oauth2_login_failure( + self, provider, profile, id_token_claims=None + ): + from pgadmin.authenticate.oauth2 import OAuth2Authentication + + def _fake_authenticate(self, _form): + self.oauth2_current_client = provider + return True, None + + def _fake_get_user_profile(self): + if id_token_claims is not None: + from flask import session + session['oauth2_token'] = { + 'access_token': 'test-access-token', + 'userinfo': id_token_claims + } + return profile + + with patch.object( + OAuth2Authentication, 'authenticate', new=_fake_authenticate + ), patch.object( + OAuth2Authentication, 'get_user_profile', + new=_fake_get_user_profile + ): + res = self.tester.login( + email=None, password=None, + _follow_redirects=True, + headers=None, + extra_form_data=dict(oauth2_button=provider) + ) + self.assertEqual(res.status_code, 200) + self._assert_oauth2_session_not_logged_in() def test_oauth2_authentication_with_pkce(self): """ @@ -219,13 +362,21 @@ def test_oauth2_authentication_with_pkce(self): the default client_kwargs is correctly included. """ - with patch('pgadmin.authenticate.oauth2.OAuth.register') as \ - mock_register: + with patch( + 'pgadmin.authenticate.oauth2.OAuth.register' + ) as mock_register: from pgadmin.authenticate.oauth2 import OAuth2Authentication OAuth2Authentication() - args, kwargs = mock_register.call_args + pkce_call = None + for _args, _kwargs in mock_register.call_args_list: + if _kwargs.get('name') == 'keycloak-pkce': + pkce_call = (_args, _kwargs) + break + + self.assertIsNotNone(pkce_call) + _, kwargs = pkce_call client_kwargs = kwargs.get('client_kwargs', {}) # Check that PKCE and default client_kwargs are included @@ -236,21 +387,52 @@ def test_oauth2_authentication_with_pkce(self): self.assertEqual( client_kwargs.get('scope'), 'openid email profile') self.assertEqual( - client_kwargs.get('verify'), 'true') - - def mock_user_profile_with_additional_claims(self): - profile = {'email': 'oauth2@gmail.com', 'wids': ['789']} - - AuthSourceRegistry._registry[OAUTH2].get_user_profile = MagicMock( - return_value=profile) - return profile - - def mock_user_profile(self): - profile = {'email': 'oauth2@gmail.com'} + client_kwargs.get('verify'), True) + + def _test_oidc_get_user_profile_skip_userinfo(self, provider): + from pgadmin.authenticate.oauth2 import OAuth2Authentication + + with self.app.test_request_context('/'): + oauth = OAuth2Authentication() + oauth.oauth2_current_client = provider + + client = MagicMock() + client.authorize_access_token = MagicMock(return_value={ + 'access_token': 't', + 'userinfo': {'email': 'oidc-skip@example.com', 'sub': 'abc'} + }) + client.get = MagicMock(side_effect=AssertionError( + 'userinfo endpoint should not be called')) + + OAuth2Authentication.oauth2_clients[provider] = client + profile = oauth.get_user_profile() + self.assertEqual(profile.get('email'), 'oidc-skip@example.com') + client.get.assert_not_called() + + def _test_oidc_get_user_profile_calls_userinfo(self, provider): + from pgadmin.authenticate.oauth2 import OAuth2Authentication + + with self.app.test_request_context('/'): + oauth = OAuth2Authentication() + oauth.oauth2_current_client = provider + + client = MagicMock() + client.authorize_access_token = MagicMock(return_value={ + 'access_token': 't', + 'userinfo': {} + }) + + resp = MagicMock() + resp.raise_for_status = MagicMock() + resp.json = MagicMock( + return_value={'email': 'userinfo@example.com'} + ) + client.get = MagicMock(return_value=resp) - AuthSourceRegistry._registry[OAUTH2].get_user_profile = MagicMock( - return_value=profile) - return profile + OAuth2Authentication.oauth2_clients[provider] = client + profile = oauth.get_user_profile() + self.assertEqual(profile.get('email'), 'userinfo@example.com') + client.get.assert_called_once() def tearDown(self): self.tester.logout() From 631c8de6cb6614f53e278bfba5af534f1d981307 Mon Sep 17 00:00:00 2001 From: Paul Bourhis Date: Sat, 10 Jan 2026 15:13:30 +0100 Subject: [PATCH 2/7] Change logging level from exception to error for OIDC profile data issues --- web/pgadmin/authenticate/oauth2.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/web/pgadmin/authenticate/oauth2.py b/web/pgadmin/authenticate/oauth2.py index 5db2d3e72c1..8e67c593ff7 100644 --- a/web/pgadmin/authenticate/oauth2.py +++ b/web/pgadmin/authenticate/oauth2.py @@ -303,13 +303,13 @@ def login(self, form): not profile_dict ): error_msg = "No profile data found from OIDC provider." - current_app.logger.exception(error_msg) + current_app.logger.error(error_msg) return False, gettext(error_msg) # For non-OIDC providers, profile is required if not self._is_oidc_provider() and not profile_dict: error_msg = "No profile data found." - current_app.logger.exception(error_msg) + current_app.logger.error(error_msg) return False, gettext(error_msg) # Resolve username using OIDC-aware logic @@ -329,7 +329,7 @@ def login(self, form): 'for email id or set OAUTH2_USERNAME_CLAIM config ' 'parameter.' ) - current_app.logger.exception(error_msg) + current_app.logger.error(error_msg) return False, gettext(error_msg) additional_claims = None From cb6a27349305316fe90652aad3c2f4e83df89a80 Mon Sep 17 00:00:00 2001 From: Paul Bourhis Date: Sat, 10 Jan 2026 15:34:05 +0100 Subject: [PATCH 3/7] Refactor debug logging in OAuth2 authentication to improve clarity and consistency --- web/pgadmin/authenticate/oauth2.py | 54 ++++++++++++++++++++---------- 1 file changed, 37 insertions(+), 17 deletions(-) diff --git a/web/pgadmin/authenticate/oauth2.py b/web/pgadmin/authenticate/oauth2.py index 8e67c593ff7..d612c369e18 100644 --- a/web/pgadmin/authenticate/oauth2.py +++ b/web/pgadmin/authenticate/oauth2.py @@ -229,14 +229,14 @@ def _resolve_username(self, id_token_claims, profile_dict): username = id_token_claims[username_claim] current_app.logger.debug( f'Found username claim "{username_claim}" ' - f'in ID token: {username}') + 'in ID token') return username, email # Fall back to userinfo profile elif username_claim in profile_dict: username = profile_dict[username_claim] current_app.logger.debug( f'Found username claim "{username_claim}" ' - f'in profile: {username}') + 'in profile') return username, email else: current_app.logger.error( @@ -252,25 +252,25 @@ def _resolve_username(self, id_token_claims, profile_dict): # Use as email if not found elsewhere email = email or username current_app.logger.debug( - f'Using email from ID token as username: {username}') + 'Using email from ID token as username') return username, email elif email: current_app.logger.debug( - f'Using email from profile as username: {email}') + 'Using email from profile as username') return email, email # Priority 2: preferred_username if 'preferred_username' in id_token_claims: username = id_token_claims['preferred_username'] current_app.logger.debug( - f'Using preferred_username from ID token: {username}') + 'Using preferred_username from ID token') return username, email # Priority 3: sub (always present in OIDC) if 'sub' in id_token_claims: username = id_token_claims['sub'] current_app.logger.debug( - f'Using sub from ID token as last resort: {username}') + 'Using sub from ID token as last resort') return username, email # Should not reach here for valid OIDC provider @@ -280,7 +280,7 @@ def _resolve_username(self, id_token_claims, profile_dict): # For non-OIDC OAuth2 providers, email is required if email: current_app.logger.debug( - f'Using email as username for OAuth2 provider: {email}') + 'Using email as username for OAuth2 provider') return email, email return None, None @@ -289,12 +289,23 @@ def login(self, form): profile = self.get_user_profile() profile_dict = self.get_profile_dict(profile) - current_app.logger.debug(f"profile: {profile}") - current_app.logger.debug(f"profile_dict: {profile_dict}") + profile_dict_keys = [] + if isinstance(profile_dict, dict): + profile_dict_keys = sorted(profile_dict.keys()) + current_app.logger.debug( + f'profile_dict keys: {profile_dict_keys}' + if profile_dict_keys else 'profile_dict empty' + ) # Get ID token claims for OIDC providers id_token_claims = self._get_id_token_claims() - current_app.logger.debug(f"id_token_claims: {id_token_claims}") + id_token_claims_keys = [] + if isinstance(id_token_claims, dict): + id_token_claims_keys = sorted(id_token_claims.keys()) + current_app.logger.debug( + f'id_token_claims keys: {id_token_claims_keys}' + if id_token_claims_keys else 'id_token_claims empty' + ) # For OIDC providers, we must have either ID token claims or profile if ( @@ -346,7 +357,7 @@ def login(self, form): valid_idtoken, reason = self.__is_any_claim_valid( id_token_claims, additional_claims) current_app.logger.debug( - f'ID token claims: {id_token_claims}' + f'ID token claim keys: {id_token_claims_keys}' ) current_app.logger.debug( f'ID token validation reason: {reason}' @@ -358,9 +369,9 @@ def login(self, form): else: # Fall back to userinfo profile valid_profile, reason = self.__is_any_claim_valid( - profile, additional_claims) + profile_dict, additional_claims) current_app.logger.debug( - f'Profile claims: {profile}' + f'Profile claim keys: {profile_dict_keys}' ) current_app.logger.debug( f'Profile validation reason: {reason}' @@ -369,9 +380,9 @@ def login(self, form): else: # Non-OIDC: only check userinfo profile valid_combined, reason = self.__is_any_claim_valid( - profile, additional_claims) + profile_dict, additional_claims) current_app.logger.debug( - f'Profile claims: {profile}' + f'Profile claim keys: {profile_dict_keys}' ) current_app.logger.debug( f'Validation reason: {reason}' @@ -382,11 +393,20 @@ def login(self, form): 'The user is not authorized to login based on your identity ' 'profile. Please contact your administrator.' ) + + additional_claim_names = [] + if isinstance(additional_claims, dict): + additional_claim_names = sorted(additional_claims.keys()) + audit_msg = ( f'The authenticated user {username} is not authorized to ' 'access pgAdmin based on OAUTH2 config. ' - f'Reason: additional claim required {additional_claims}, ' - f'profile claims {profile}, ID token claims {id_token_claims}.' + 'Reason: additional claims required. ' + f'additional_claim_names={additional_claim_names}, ' + f'profile_len={len(profile_dict)}, ' + f'profile_keys={profile_dict_keys}, ' + f'id_token_len={len(id_token_claims)}, ' + f'id_token_keys={id_token_claims_keys}.' ) current_app.logger.warning(audit_msg) return False, return_msg From 99272d528649f2a9d6f45ded9c277be44cc9feba Mon Sep 17 00:00:00 2001 From: Paul Bourhis Date: Sat, 10 Jan 2026 16:20:57 +0100 Subject: [PATCH 4/7] Add error handling for missing OAuth2 provider and enhance claims processing logic --- web/pgadmin/authenticate/oauth2.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/web/pgadmin/authenticate/oauth2.py b/web/pgadmin/authenticate/oauth2.py index d612c369e18..d63dbaeaf04 100644 --- a/web/pgadmin/authenticate/oauth2.py +++ b/web/pgadmin/authenticate/oauth2.py @@ -286,6 +286,11 @@ def _resolve_username(self, id_token_claims, profile_dict): return None, None def login(self, form): + if not self.oauth2_current_client: + error_msg = 'No OAuth2 provider available.' + current_app.logger.error(error_msg) + return False, gettext(error_msg) + profile = self.get_user_profile() profile_dict = self.get_profile_dict(profile) @@ -449,6 +454,27 @@ def get_user_profile(self): ]) if has_sufficient_claims: + provider = self.oauth2_config.get( + self.oauth2_current_client, {} + ) + username_claim = provider.get('OAUTH2_USERNAME_CLAIM') + additional_claims = provider.get('OAUTH2_ADDITIONAL_CLAIMS') + + # If custom username claim or additional authorization + # claims are configured, they may exist only in userinfo; + # don't skip userinfo unless ID token has them. + needs_userinfo = False + if username_claim and username_claim not in id_token_claims: + needs_userinfo = True + if isinstance(additional_claims, dict) and additional_claims: + missing_authz_keys = [ + k for k in additional_claims.keys() + if k not in id_token_claims + ] + if missing_authz_keys: + needs_userinfo = True + + if has_sufficient_claims and not needs_userinfo: current_app.logger.debug( 'OIDC provider: using ID token claims, ' 'skipping userinfo endpoint') From 48748184224e3193519ee9c557cbd7512220a00d Mon Sep 17 00:00:00 2001 From: Paul Bourhis Date: Sat, 10 Jan 2026 18:39:24 +0100 Subject: [PATCH 5/7] Enhance OIDC ID token handling by implementing JWT parsing and updating tests to mock claims extraction --- web/pgadmin/authenticate/oauth2.py | 46 +++++++++++++------ .../browser/tests/test_oauth2_with_mocking.py | 28 +++++++++-- 2 files changed, 55 insertions(+), 19 deletions(-) diff --git a/web/pgadmin/authenticate/oauth2.py b/web/pgadmin/authenticate/oauth2.py index d63dbaeaf04..18cbdfb8f8b 100644 --- a/web/pgadmin/authenticate/oauth2.py +++ b/web/pgadmin/authenticate/oauth2.py @@ -172,17 +172,36 @@ def _is_oidc_provider(self): def _get_id_token_claims(self): """ - Extract and return claims from the ID token. - When using OIDC with server_metadata_url, Authlib automatically - parses the ID token and stores the claims in the 'userinfo' key - of the token response. + Extract and return claims from the ID token JWT. + + Uses Authlib's parse_id_token() method which handles JWT decoding + and validation according to the OIDC specification. Returns: - dict: ID token claims, or empty dict if not available + dict: ID token claims, or empty dict if not available or + parsing fails """ if not self._is_oidc_provider(): return {} - return session.get('oauth2_token', {}).get('userinfo', {}) + + token = session.get('oauth2_token') + if not token: + return {} + + client = self.oauth2_clients.get(self.oauth2_current_client) + if not client: + return {} + + try: + claims = client.parse_id_token(token) + if isinstance(claims, dict): + return claims + except Exception as e: + current_app.logger.warning( + f'Failed to parse ID token via authlib: {e}' + ) + + return {} def get_profile_dict(self, profile): """ @@ -438,14 +457,11 @@ def get_user_profile(self): session['oauth2_logout_url'] = self.oauth2_config[ self.oauth2_current_client]['OAUTH2_LOGOUT_URL'] - # For OIDC providers with server_metadata_url, Authlib automatically - # parses the ID token and stores claims in the 'userinfo' key. - # We can skip the userinfo endpoint call if ID token has sufficient - # data. + # For OIDC providers, parse the ID token JWT to extract claims. + # We can skip the userinfo endpoint call if the ID token has + # sufficient claims for authentication and authorization. if self._is_oidc_provider(): - id_token_claims = session.get('oauth2_token', {}).get( - 'userinfo', {} - ) + id_token_claims = self._get_id_token_claims() # Check if we have basic required claims in ID token has_sufficient_claims = any([ 'email' in id_token_claims, @@ -476,13 +492,13 @@ def get_user_profile(self): if has_sufficient_claims and not needs_userinfo: current_app.logger.debug( - 'OIDC provider: using ID token claims, ' + 'OIDC provider: using parsed ID token JWT claims, ' 'skipping userinfo endpoint') # Return ID token claims as profile return id_token_claims else: current_app.logger.debug( - 'OIDC provider: ID token lacks standard claims, ' + 'OIDC provider: ID token JWT lacks standard claims, ' 'falling back to userinfo endpoint') # For non-OIDC providers or when ID token is insufficient, diff --git a/web/pgadmin/browser/tests/test_oauth2_with_mocking.py b/web/pgadmin/browser/tests/test_oauth2_with_mocking.py index 0bddb2530b3..51dc23049f6 100644 --- a/web/pgadmin/browser/tests/test_oauth2_with_mocking.py +++ b/web/pgadmin/browser/tests/test_oauth2_with_mocking.py @@ -303,8 +303,15 @@ def _fake_get_user_profile(self): from flask import session session['oauth2_token'] = { 'access_token': 'test-access-token', - 'userinfo': id_token_claims + 'id_token': 'mock.jwt.token', + 'token_type': 'Bearer' } + # Mock parse_id_token to return the claims + client = OAuth2Authentication.oauth2_clients.get(provider) + if client: + client.parse_id_token = MagicMock( + return_value=id_token_claims + ) return profile with patch.object( @@ -336,8 +343,15 @@ def _fake_get_user_profile(self): from flask import session session['oauth2_token'] = { 'access_token': 'test-access-token', - 'userinfo': id_token_claims + 'id_token': 'mock.jwt.token', + 'token_type': 'Bearer' } + # Mock parse_id_token to return the claims + client = OAuth2Authentication.oauth2_clients.get(provider) + if client: + client.parse_id_token = MagicMock( + return_value=id_token_claims + ) return profile with patch.object( @@ -396,11 +410,15 @@ def _test_oidc_get_user_profile_skip_userinfo(self, provider): oauth = OAuth2Authentication() oauth.oauth2_current_client = provider + claims = {'email': 'oidc-skip@example.com', 'sub': 'abc'} + client = MagicMock() client.authorize_access_token = MagicMock(return_value={ 'access_token': 't', - 'userinfo': {'email': 'oidc-skip@example.com', 'sub': 'abc'} + 'id_token': 'mock.jwt.token', + 'token_type': 'Bearer' }) + client.parse_id_token = MagicMock(return_value=claims) client.get = MagicMock(side_effect=AssertionError( 'userinfo endpoint should not be called')) @@ -419,8 +437,10 @@ def _test_oidc_get_user_profile_calls_userinfo(self, provider): client = MagicMock() client.authorize_access_token = MagicMock(return_value={ 'access_token': 't', - 'userinfo': {} + 'token_type': 'Bearer' }) + # parse_id_token returns empty dict when no id_token present + client.parse_id_token = MagicMock(return_value={}) resp = MagicMock() resp.raise_for_status = MagicMock() From df0f20fb801e72998ec8525415ea32f770c3215d Mon Sep 17 00:00:00 2001 From: Paul Bourhis Date: Mon, 12 Jan 2026 15:16:08 +0100 Subject: [PATCH 6/7] Refactor ID token claims extraction for OIDC providers and update tests to mock userinfo handling --- web/pgadmin/authenticate/oauth2.py | 31 +++++++++---------- .../browser/tests/test_oauth2_with_mocking.py | 27 +++++----------- 2 files changed, 23 insertions(+), 35 deletions(-) diff --git a/web/pgadmin/authenticate/oauth2.py b/web/pgadmin/authenticate/oauth2.py index 18cbdfb8f8b..427afdf0054 100644 --- a/web/pgadmin/authenticate/oauth2.py +++ b/web/pgadmin/authenticate/oauth2.py @@ -172,10 +172,14 @@ def _is_oidc_provider(self): def _get_id_token_claims(self): """ - Extract and return claims from the ID token JWT. + Extract and return ID token claims for OIDC providers. - Uses Authlib's parse_id_token() method which handles JWT decoding - and validation according to the OIDC specification. + In pgAdmin's Authlib integration, the token response returned by + authorize_access_token() may include a decoded claims dict under the + 'userinfo' key (e.g. populated from the ID token). + + If those claims are not present, this returns an empty dict and the + caller should fall back to the configured userinfo endpoint. Returns: dict: ID token claims, or empty dict if not available or @@ -185,21 +189,12 @@ def _get_id_token_claims(self): return {} token = session.get('oauth2_token') - if not token: - return {} - - client = self.oauth2_clients.get(self.oauth2_current_client) - if not client: + if not isinstance(token, dict): return {} - try: - claims = client.parse_id_token(token) - if isinstance(claims, dict): - return claims - except Exception as e: - current_app.logger.warning( - f'Failed to parse ID token via authlib: {e}' - ) + claims = token.get('userinfo') + if isinstance(claims, dict): + return claims return {} @@ -469,6 +464,10 @@ def get_user_profile(self): 'sub' in id_token_claims ]) + # Default to requiring the userinfo endpoint unless we can prove + # the ID token claims are sufficient for our configured needs. + needs_userinfo = True + if has_sufficient_claims: provider = self.oauth2_config.get( self.oauth2_current_client, {} diff --git a/web/pgadmin/browser/tests/test_oauth2_with_mocking.py b/web/pgadmin/browser/tests/test_oauth2_with_mocking.py index 51dc23049f6..0f157ab770b 100644 --- a/web/pgadmin/browser/tests/test_oauth2_with_mocking.py +++ b/web/pgadmin/browser/tests/test_oauth2_with_mocking.py @@ -304,14 +304,9 @@ def _fake_get_user_profile(self): session['oauth2_token'] = { 'access_token': 'test-access-token', 'id_token': 'mock.jwt.token', - 'token_type': 'Bearer' + 'token_type': 'Bearer', + 'userinfo': id_token_claims } - # Mock parse_id_token to return the claims - client = OAuth2Authentication.oauth2_clients.get(provider) - if client: - client.parse_id_token = MagicMock( - return_value=id_token_claims - ) return profile with patch.object( @@ -344,14 +339,9 @@ def _fake_get_user_profile(self): session['oauth2_token'] = { 'access_token': 'test-access-token', 'id_token': 'mock.jwt.token', - 'token_type': 'Bearer' + 'token_type': 'Bearer', + 'userinfo': id_token_claims } - # Mock parse_id_token to return the claims - client = OAuth2Authentication.oauth2_clients.get(provider) - if client: - client.parse_id_token = MagicMock( - return_value=id_token_claims - ) return profile with patch.object( @@ -416,9 +406,9 @@ def _test_oidc_get_user_profile_skip_userinfo(self, provider): client.authorize_access_token = MagicMock(return_value={ 'access_token': 't', 'id_token': 'mock.jwt.token', - 'token_type': 'Bearer' + 'token_type': 'Bearer', + 'userinfo': claims }) - client.parse_id_token = MagicMock(return_value=claims) client.get = MagicMock(side_effect=AssertionError( 'userinfo endpoint should not be called')) @@ -437,10 +427,9 @@ def _test_oidc_get_user_profile_calls_userinfo(self, provider): client = MagicMock() client.authorize_access_token = MagicMock(return_value={ 'access_token': 't', - 'token_type': 'Bearer' + 'token_type': 'Bearer', + 'userinfo': {} }) - # parse_id_token returns empty dict when no id_token present - client.parse_id_token = MagicMock(return_value={}) resp = MagicMock() resp.raise_for_status = MagicMock() From fccb31eb3afdb32e4d42eda2c4cca2b8c9373d06 Mon Sep 17 00:00:00 2001 From: Paul Bourhis Date: Mon, 12 Jan 2026 17:48:38 +0100 Subject: [PATCH 7/7] Refactor OAuth2 configuration to use get method for optional URLs --- web/pgadmin/authenticate/oauth2.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/web/pgadmin/authenticate/oauth2.py b/web/pgadmin/authenticate/oauth2.py index 427afdf0054..828314cca03 100644 --- a/web/pgadmin/authenticate/oauth2.py +++ b/web/pgadmin/authenticate/oauth2.py @@ -139,9 +139,9 @@ def __init__(self): name=oauth2_config['OAUTH2_NAME'], client_id=oauth2_config['OAUTH2_CLIENT_ID'], client_secret=oauth2_config['OAUTH2_CLIENT_SECRET'], - access_token_url=oauth2_config['OAUTH2_TOKEN_URL'], - authorize_url=oauth2_config['OAUTH2_AUTHORIZATION_URL'], - api_base_url=oauth2_config['OAUTH2_API_BASE_URL'], + access_token_url=oauth2_config.get('OAUTH2_TOKEN_URL'), + authorize_url=oauth2_config.get('OAUTH2_AUTHORIZATION_URL'), + api_base_url=oauth2_config.get('OAUTH2_API_BASE_URL'), client_kwargs=client_kwargs, server_metadata_url=oauth2_config.get( 'OAUTH2_SERVER_METADATA_URL', None)