diff --git a/pydantic_settings/sources/providers/gcp.py b/pydantic_settings/sources/providers/gcp.py index 0885eb84..0ec2cb20 100644 --- a/pydantic_settings/sources/providers/gcp.py +++ b/pydantic_settings/sources/providers/gcp.py @@ -40,6 +40,15 @@ def import_gcp_secret_manager() -> None: ) from e +def _is_not_found_error(exc: Exception) -> bool: + try: + from google.api_core.exceptions import NotFound + + return isinstance(exc, NotFound) + except ImportError: + return False + + class GoogleSecretManagerMapping(Mapping[str, str | None]): _loaded_secrets: dict[str, str | None] _secret_client: SecretManagerServiceClient @@ -107,12 +116,26 @@ def _get_secret_value(self, gcp_secret_name: str, version: str = 'latest') -> st except Exception: return None + def _get_secret_value_or_raise(self, gcp_secret_name: str) -> str | None: + try: + return self._secret_client.access_secret_version( + name=self._secret_version_path(gcp_secret_name) + ).payload.data.decode('UTF-8') + except Exception as e: + if _is_not_found_error(e): + raise KeyError(gcp_secret_name) from e + return None + def __getitem__(self, key: str) -> str | None: if key in self._loaded_secrets: return self._loaded_secrets[key] + if self._case_sensitive: + self._loaded_secrets[key] = self._get_secret_value_or_raise(key) + return self._loaded_secrets[key] + gcp_secret_name = self._secret_name_map.get(key) - if gcp_secret_name is None and not self._case_sensitive: + if gcp_secret_name is None: gcp_secret_name = self._secret_name_map.get(key.lower()) if gcp_secret_name: @@ -204,9 +227,12 @@ def get_field_value(self, field: FieldInfo, field_name: str) -> tuple[Any, str, # of the same secret name to be retrieved independently and cached in the GoogleSecretManagerMapping if secret_version and isinstance(self.env_vars, GoogleSecretManagerMapping): for field_key, env_name, value_is_complex in self._extract_field_info(field, field_name): - gcp_secret_name = self.env_vars._secret_name_map.get(env_name) - if gcp_secret_name is None and not self.case_sensitive: - gcp_secret_name = self.env_vars._secret_name_map.get(env_name.lower()) + if self.case_sensitive: + gcp_secret_name: str | None = env_name + else: + gcp_secret_name = self.env_vars._secret_name_map.get(env_name) + if gcp_secret_name is None: + gcp_secret_name = self.env_vars._secret_name_map.get(env_name.lower()) if gcp_secret_name: env_val = self.env_vars._get_secret_value(gcp_secret_name, secret_version) diff --git a/tests/test_source_gcp_secret_manager.py b/tests/test_source_gcp_secret_manager.py index e7cabbc6..3aa96b46 100644 --- a/tests/test_source_gcp_secret_manager.py +++ b/tests/test_source_gcp_secret_manager.py @@ -16,6 +16,7 @@ try: gcp_secret_manager = True import_gcp_secret_manager() + from google.api_core.exceptions import NotFound, PermissionDenied from google.cloud.secretmanager import SecretManagerServiceClient except ImportError: gcp_secret_manager = False @@ -76,7 +77,7 @@ def mock_access_secret_version(name: str): resp = mocker.Mock() resp.payload.data.decode.return_value = secret_values[name] return resp - raise Exception(f'Secret not found or access denied: {name}') + raise NotFound(f'Secret not found: {name}') client.access_secret_version = mocker.Mock(side_effect=mock_access_secret_version) @@ -130,11 +131,34 @@ def test_secret_manager_mapping_secret_names(self, secret_manager_mapping): def test_secret_manager_mapping_getitem_access_error(self, secret_manager_mapping, mocker): secret_manager_mapping._secret_client.access_secret_version = mocker.Mock( - side_effect=Exception('Access denied') + side_effect=PermissionDenied('Access denied') ) assert secret_manager_mapping['test-secret'] is None + def test_case_sensitive_getitem_skips_list_secrets(self, mock_secret_client): + """With case_sensitive=True, fetching a secret must not call list_secrets.""" + mapping = GoogleSecretManagerMapping(mock_secret_client, project_id='test-project', case_sensitive=True) + + assert mapping['test-secret'] == 'test-value' + mock_secret_client.list_secrets.assert_not_called() + + def test_case_sensitive_getitem_not_found_raises_keyerror(self, mock_secret_client): + """With case_sensitive=True, a missing secret (NotFound) must raise KeyError.""" + mapping = GoogleSecretManagerMapping(mock_secret_client, project_id='test-project', case_sensitive=True) + + with pytest.raises(KeyError): + _ = mapping['nonexistent-secret'] + mock_secret_client.list_secrets.assert_not_called() + + def test_case_sensitive_getitem_permission_denied_returns_none(self, mock_secret_client, mocker): + """With case_sensitive=True, a PermissionDenied error must return None (not raise KeyError).""" + mock_secret_client.access_secret_version = mocker.Mock(side_effect=PermissionDenied('Access denied')) + mapping = GoogleSecretManagerMapping(mock_secret_client, project_id='test-project', case_sensitive=True) + + assert mapping['test-secret'] is None + mock_secret_client.list_secrets.assert_not_called() + def test_secret_manager_mapping_iter(self, secret_manager_mapping): assert list(secret_manager_mapping) == ['test-secret']