From 9df7f2c42374d6d509607600eab824b42bb4bf23 Mon Sep 17 00:00:00 2001 From: Ruben Laguna Date: Tue, 12 May 2026 13:16:17 +0200 Subject: [PATCH 1/3] Skip list_secrets call when case_sensitive=True When case_sensitive=True, secret names are used as-is without any name translation, so there's no need to enumerate all secrets via list_secrets (which requires secretmanager.secrets.list permission). __getitem__ now calls _get_secret_value directly when case_sensitive=True. get_field_value likewise uses env_name directly when case_sensitive=True, avoiding access to _secret_name_map (and the underlying list_secrets call) even when a SecretVersion is specified. --- pydantic_settings/sources/providers/gcp.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/pydantic_settings/sources/providers/gcp.py b/pydantic_settings/sources/providers/gcp.py index 0885eb84..d9c69a94 100644 --- a/pydantic_settings/sources/providers/gcp.py +++ b/pydantic_settings/sources/providers/gcp.py @@ -111,8 +111,15 @@ def __getitem__(self, key: str) -> str | None: if key in self._loaded_secrets: return self._loaded_secrets[key] + if self._case_sensitive: + value = self._get_secret_value(key) + if value is None: + raise KeyError(key) + self._loaded_secrets[key] = value + return value + gcp_secret_name = self._secret_name_map.get(key) - if gcp_secret_name is None and not self._case_sensitive: + if gcp_secret_name is None: gcp_secret_name = self._secret_name_map.get(key.lower()) if gcp_secret_name: @@ -204,9 +211,12 @@ def get_field_value(self, field: FieldInfo, field_name: str) -> tuple[Any, str, # of the same secret name to be retrieved independently and cached in the GoogleSecretManagerMapping if secret_version and isinstance(self.env_vars, GoogleSecretManagerMapping): for field_key, env_name, value_is_complex in self._extract_field_info(field, field_name): - gcp_secret_name = self.env_vars._secret_name_map.get(env_name) - if gcp_secret_name is None and not self.case_sensitive: - gcp_secret_name = self.env_vars._secret_name_map.get(env_name.lower()) + if self.case_sensitive: + gcp_secret_name: str | None = env_name + else: + gcp_secret_name = self.env_vars._secret_name_map.get(env_name) + if gcp_secret_name is None: + gcp_secret_name = self.env_vars._secret_name_map.get(env_name.lower()) if gcp_secret_name: env_val = self.env_vars._get_secret_value(gcp_secret_name, secret_version) From 75201c5f1b7d04f1220214648d78069f57552f04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rub=C3=A9n=20Laguna?= Date: Thu, 14 May 2026 15:04:23 +0200 Subject: [PATCH 2/3] Distinguish NotFound from PermissionDenied in case_sensitive GCP lookup - Add _is_not_found_error() helper using google.api_core.exceptions.NotFound - Add _get_secret_value_or_raise() which raises KeyError on NotFound but returns None for other errors (e.g. PermissionDenied) - Use _get_secret_value_or_raise() in __getitem__ for case_sensitive=True, avoiding the need for list_secrets to check key existence - Update test fixtures to raise NotFound for missing secrets and PermissionDenied for access-denied scenarios --- pydantic_settings/sources/providers/gcp.py | 26 +++++++++++++++++----- tests/test_source_gcp_secret_manager.py | 5 +++-- 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/pydantic_settings/sources/providers/gcp.py b/pydantic_settings/sources/providers/gcp.py index d9c69a94..0ec2cb20 100644 --- a/pydantic_settings/sources/providers/gcp.py +++ b/pydantic_settings/sources/providers/gcp.py @@ -40,6 +40,15 @@ def import_gcp_secret_manager() -> None: ) from e +def _is_not_found_error(exc: Exception) -> bool: + try: + from google.api_core.exceptions import NotFound + + return isinstance(exc, NotFound) + except ImportError: + return False + + class GoogleSecretManagerMapping(Mapping[str, str | None]): _loaded_secrets: dict[str, str | None] _secret_client: SecretManagerServiceClient @@ -107,16 +116,23 @@ def _get_secret_value(self, gcp_secret_name: str, version: str = 'latest') -> st except Exception: return None + def _get_secret_value_or_raise(self, gcp_secret_name: str) -> str | None: + try: + return self._secret_client.access_secret_version( + name=self._secret_version_path(gcp_secret_name) + ).payload.data.decode('UTF-8') + except Exception as e: + if _is_not_found_error(e): + raise KeyError(gcp_secret_name) from e + return None + def __getitem__(self, key: str) -> str | None: if key in self._loaded_secrets: return self._loaded_secrets[key] if self._case_sensitive: - value = self._get_secret_value(key) - if value is None: - raise KeyError(key) - self._loaded_secrets[key] = value - return value + self._loaded_secrets[key] = self._get_secret_value_or_raise(key) + return self._loaded_secrets[key] gcp_secret_name = self._secret_name_map.get(key) if gcp_secret_name is None: diff --git a/tests/test_source_gcp_secret_manager.py b/tests/test_source_gcp_secret_manager.py index e7cabbc6..315ebffb 100644 --- a/tests/test_source_gcp_secret_manager.py +++ b/tests/test_source_gcp_secret_manager.py @@ -16,6 +16,7 @@ try: gcp_secret_manager = True import_gcp_secret_manager() + from google.api_core.exceptions import NotFound, PermissionDenied from google.cloud.secretmanager import SecretManagerServiceClient except ImportError: gcp_secret_manager = False @@ -76,7 +77,7 @@ def mock_access_secret_version(name: str): resp = mocker.Mock() resp.payload.data.decode.return_value = secret_values[name] return resp - raise Exception(f'Secret not found or access denied: {name}') + raise NotFound(f'Secret not found: {name}') client.access_secret_version = mocker.Mock(side_effect=mock_access_secret_version) @@ -130,7 +131,7 @@ def test_secret_manager_mapping_secret_names(self, secret_manager_mapping): def test_secret_manager_mapping_getitem_access_error(self, secret_manager_mapping, mocker): secret_manager_mapping._secret_client.access_secret_version = mocker.Mock( - side_effect=Exception('Access denied') + side_effect=PermissionDenied('Access denied') ) assert secret_manager_mapping['test-secret'] is None From 5be53f997fe39d1a31805c994023eabfdbefb730 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rub=C3=A9n=20Laguna?= Date: Sat, 16 May 2026 14:43:37 +0200 Subject: [PATCH 3/3] Add tests for case_sensitive GCP secret lookup behavior - Verify list_secrets is not called when case_sensitive=True - Verify NotFound raises KeyError - Verify PermissionDenied returns None --- tests/test_source_gcp_secret_manager.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/tests/test_source_gcp_secret_manager.py b/tests/test_source_gcp_secret_manager.py index 315ebffb..3aa96b46 100644 --- a/tests/test_source_gcp_secret_manager.py +++ b/tests/test_source_gcp_secret_manager.py @@ -136,6 +136,29 @@ def test_secret_manager_mapping_getitem_access_error(self, secret_manager_mappin assert secret_manager_mapping['test-secret'] is None + def test_case_sensitive_getitem_skips_list_secrets(self, mock_secret_client): + """With case_sensitive=True, fetching a secret must not call list_secrets.""" + mapping = GoogleSecretManagerMapping(mock_secret_client, project_id='test-project', case_sensitive=True) + + assert mapping['test-secret'] == 'test-value' + mock_secret_client.list_secrets.assert_not_called() + + def test_case_sensitive_getitem_not_found_raises_keyerror(self, mock_secret_client): + """With case_sensitive=True, a missing secret (NotFound) must raise KeyError.""" + mapping = GoogleSecretManagerMapping(mock_secret_client, project_id='test-project', case_sensitive=True) + + with pytest.raises(KeyError): + _ = mapping['nonexistent-secret'] + mock_secret_client.list_secrets.assert_not_called() + + def test_case_sensitive_getitem_permission_denied_returns_none(self, mock_secret_client, mocker): + """With case_sensitive=True, a PermissionDenied error must return None (not raise KeyError).""" + mock_secret_client.access_secret_version = mocker.Mock(side_effect=PermissionDenied('Access denied')) + mapping = GoogleSecretManagerMapping(mock_secret_client, project_id='test-project', case_sensitive=True) + + assert mapping['test-secret'] is None + mock_secret_client.list_secrets.assert_not_called() + def test_secret_manager_mapping_iter(self, secret_manager_mapping): assert list(secret_manager_mapping) == ['test-secret']