Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 30 additions & 4 deletions pydantic_settings/sources/providers/gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ def import_gcp_secret_manager() -> None:
) from e


def _is_not_found_error(exc: Exception) -> bool:
try:
from google.api_core.exceptions import NotFound

return isinstance(exc, NotFound)
except ImportError:
return False


class GoogleSecretManagerMapping(Mapping[str, str | None]):
_loaded_secrets: dict[str, str | None]
_secret_client: SecretManagerServiceClient
Expand Down Expand Up @@ -107,12 +116,26 @@ def _get_secret_value(self, gcp_secret_name: str, version: str = 'latest') -> st
except Exception:
return None

def _get_secret_value_or_raise(self, gcp_secret_name: str) -> str | None:
try:
return self._secret_client.access_secret_version(
name=self._secret_version_path(gcp_secret_name)
).payload.data.decode('UTF-8')
except Exception as e:
if _is_not_found_error(e):
raise KeyError(gcp_secret_name) from e
return None

def __getitem__(self, key: str) -> str | None:
if key in self._loaded_secrets:
return self._loaded_secrets[key]

if self._case_sensitive:
self._loaded_secrets[key] = self._get_secret_value_or_raise(key)
return self._loaded_secrets[key]

gcp_secret_name = self._secret_name_map.get(key)
if gcp_secret_name is None and not self._case_sensitive:
if gcp_secret_name is None:
gcp_secret_name = self._secret_name_map.get(key.lower())

if gcp_secret_name:
Expand Down Expand Up @@ -204,9 +227,12 @@ def get_field_value(self, field: FieldInfo, field_name: str) -> tuple[Any, str,
# of the same secret name to be retrieved independently and cached in the GoogleSecretManagerMapping
if secret_version and isinstance(self.env_vars, GoogleSecretManagerMapping):
for field_key, env_name, value_is_complex in self._extract_field_info(field, field_name):
gcp_secret_name = self.env_vars._secret_name_map.get(env_name)
if gcp_secret_name is None and not self.case_sensitive:
gcp_secret_name = self.env_vars._secret_name_map.get(env_name.lower())
if self.case_sensitive:
gcp_secret_name: str | None = env_name
else:
gcp_secret_name = self.env_vars._secret_name_map.get(env_name)
if gcp_secret_name is None:
gcp_secret_name = self.env_vars._secret_name_map.get(env_name.lower())

if gcp_secret_name:
env_val = self.env_vars._get_secret_value(gcp_secret_name, secret_version)
Expand Down
28 changes: 26 additions & 2 deletions tests/test_source_gcp_secret_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
try:
gcp_secret_manager = True
import_gcp_secret_manager()
from google.api_core.exceptions import NotFound, PermissionDenied
from google.cloud.secretmanager import SecretManagerServiceClient
except ImportError:
gcp_secret_manager = False
Expand Down Expand Up @@ -76,7 +77,7 @@ def mock_access_secret_version(name: str):
resp = mocker.Mock()
resp.payload.data.decode.return_value = secret_values[name]
return resp
raise Exception(f'Secret not found or access denied: {name}')
raise NotFound(f'Secret not found: {name}')
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it a breaking change?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, those are the actual exceptions that the gcp actually raises. The previous test was just raising the general Exception now it raises the same exception it will be raised by gcloud sdk


client.access_secret_version = mocker.Mock(side_effect=mock_access_secret_version)

Expand Down Expand Up @@ -130,11 +131,34 @@ def test_secret_manager_mapping_secret_names(self, secret_manager_mapping):

def test_secret_manager_mapping_getitem_access_error(self, secret_manager_mapping, mocker):
secret_manager_mapping._secret_client.access_secret_version = mocker.Mock(
side_effect=Exception('Access denied')
side_effect=PermissionDenied('Access denied')
)

assert secret_manager_mapping['test-secret'] is None

def test_case_sensitive_getitem_skips_list_secrets(self, mock_secret_client):
"""With case_sensitive=True, fetching a secret must not call list_secrets."""
mapping = GoogleSecretManagerMapping(mock_secret_client, project_id='test-project', case_sensitive=True)

assert mapping['test-secret'] == 'test-value'
mock_secret_client.list_secrets.assert_not_called()

def test_case_sensitive_getitem_not_found_raises_keyerror(self, mock_secret_client):
"""With case_sensitive=True, a missing secret (NotFound) must raise KeyError."""
mapping = GoogleSecretManagerMapping(mock_secret_client, project_id='test-project', case_sensitive=True)

with pytest.raises(KeyError):
_ = mapping['nonexistent-secret']
mock_secret_client.list_secrets.assert_not_called()

def test_case_sensitive_getitem_permission_denied_returns_none(self, mock_secret_client, mocker):
"""With case_sensitive=True, a PermissionDenied error must return None (not raise KeyError)."""
mock_secret_client.access_secret_version = mocker.Mock(side_effect=PermissionDenied('Access denied'))
mapping = GoogleSecretManagerMapping(mock_secret_client, project_id='test-project', case_sensitive=True)

assert mapping['test-secret'] is None
mock_secret_client.list_secrets.assert_not_called()

def test_secret_manager_mapping_iter(self, secret_manager_mapping):
assert list(secret_manager_mapping) == ['test-secret']

Expand Down
Loading