Skip to content
This repository was archived by the owner on Mar 6, 2026. It is now read-only.

Commit 07d7818

Browse files
chore: Correct lint and imports, plus add testcases for exceptions check
Signed-off-by: Radhika Agrawal <agrawalradhika@google.com>
1 parent 0d45640 commit 07d7818

2 files changed

Lines changed: 72 additions & 69 deletions

File tree

google/auth/aio/transport/mtls.py

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,23 +13,18 @@
1313
# limitations under the License.
1414

1515
"""
16-
Helper functions for mTLS in asyncio.
16+
Helper functions for mTLS in async.
1717
"""
1818

19-
import asyncio
20-
import contextlib
2119
import logging
22-
import os
23-
from os import environ, getenv, path
24-
import ssl
25-
import tempfile
26-
from typing import Optional
20+
from os import getenv, path
2721

28-
from google.auth import exceptions
22+
import google.auth.transport._mtls_helper
2923

3024
CERTIFICATE_CONFIGURATION_DEFAULT_PATH = "~/.config/gcloud/certificate_config.json"
3125
_LOGGER = logging.getLogger(__name__)
3226

27+
3328
def _check_config_path(config_path):
3429
"""Checks for config file path. If it exists, returns the absolute path with user expansion;
3530
otherwise returns None.
@@ -53,16 +48,10 @@ def has_default_client_cert_source():
5348
Returns:
5449
bool: indicating if the default client cert source exists.
5550
"""
56-
if (
57-
_check_config_path(CERTIFICATE_CONFIGURATION_DEFAULT_PATH)
58-
is not None
59-
):
51+
if _check_config_path(CERTIFICATE_CONFIGURATION_DEFAULT_PATH) is not None:
6052
return True
6153
cert_config_path = getenv("GOOGLE_API_CERTIFICATE_CONFIG")
62-
if (
63-
cert_config_path
64-
and _check_config_path(cert_config_path) is not None
65-
):
54+
if cert_config_path and _check_config_path(cert_config_path) is not None:
6655
return True
6756
return False
6857

@@ -95,7 +84,9 @@ def get_client_ssl_credentials(
9584
"""
9685

9786
# Attempt to retrieve X.509 Workload cert and key.
98-
cert, key = google.auth.transport._mtls_helper._get_workload_cert_and_key(certificate_config_path)
87+
cert, key = google.auth.transport._mtls_helper._get_workload_cert_and_key(
88+
certificate_config_path
89+
)
9990
if cert and key:
10091
return True, cert, key, None
10192

@@ -128,4 +119,3 @@ def get_client_cert_and_key(client_cert_callback=None):
128119

129120
has_cert, cert, key, _ = get_client_ssl_credentials(generate_encrypted_key=False)
130121
return has_cert, cert, key
131-
Lines changed: 63 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,88 +1,101 @@
1-
import os
2-
import pytest
1+
# Copyright 2024 Google LLC
2+
# Licensed under the Apache License, Version 2.0...
3+
34
from unittest import mock
5+
6+
import pytest
7+
48
from google.auth import exceptions
5-
# Assuming the provided code is in a file named google/auth/transport/aio/mtls_helper.py
6-
from google.auth.transport.aio import mtls_helper
9+
from google.auth.aio.transport import mtls
710

811
CERT_DATA = b"client-cert"
912
KEY_DATA = b"client-key"
1013

11-
class TestMTLSHelper:
1214

13-
@mock.patch("os.path.expanduser")
14-
@mock.patch("os.path.exists")
15+
class TestMTLS:
16+
@mock.patch("google.auth.aio.transport.mtls.path.expanduser")
17+
@mock.patch("google.auth.aio.transport.mtls.path.exists")
1518
def test__check_config_path_exists(self, mock_exists, mock_expand):
1619
mock_expand.side_effect = lambda x: x.replace("~", "/home/user")
1720
mock_exists.return_value = True
18-
19-
path = "/home/user/config.json"
20-
result = mtls_helper._check_config_path("~/config.json")
21-
22-
assert result == path
23-
mock_exists.assert_called_with(path)
24-
25-
@mock.patch("os.path.exists", return_value=False)
21+
22+
input_path = "~/config.json"
23+
expected_path = "/home/user/config.json"
24+
result = mtls._check_config_path(input_path)
25+
26+
assert result == expected_path
27+
mock_exists.assert_called_with(expected_path)
28+
29+
@mock.patch("google.auth.aio.transport.mtls.path.exists", return_value=False)
2630
def test__check_config_path_not_found(self, mock_exists):
27-
result = mtls_helper._check_config_path("nonexistent.json")
31+
result = mtls._check_config_path("nonexistent.json")
2832
assert result is None
2933

30-
@mock.patch("google.auth.transport.aio.mtls_helper._check_config_path")
31-
@mock.patch("os.getenv")
32-
def test_has_default_client_cert_source_default_path(self, mock_getenv, mock_check):
33-
# Case 1: Default config path exists
34-
mock_check.side_effect = lambda x: x if x == mtls_helper.CERTIFICATE_CONFIGURATION_DEFAULT_PATH else None
35-
36-
assert mtls_helper.has_default_client_cert_source() is True
37-
38-
@mock.patch("google.auth.transport.aio.mtls_helper._check_config_path")
39-
@mock.patch("os.getenv")
34+
@mock.patch("google.auth.aio.transport.mtls._check_config_path")
35+
@mock.patch("google.auth.aio.transport.mtls.getenv")
4036
def test_has_default_client_cert_source_env_var(self, mock_getenv, mock_check):
41-
# Case 2: Default path doesn't exist, but env var path does
37+
# Mocking so the default path fails but the env var path succeeds
4238
custom_path = "/custom/path.json"
43-
mock_check.side_effect = lambda x: x if x == custom_path else None
39+
mock_check.side_effect = lambda x: custom_path if x == custom_path else None
4440
mock_getenv.return_value = custom_path
45-
46-
assert mtls_helper.has_default_client_cert_source() is True
4741

48-
@mock.patch("google.auth.transport.aio.mtls_helper._check_config_path", return_value=None)
49-
def test_has_default_client_cert_source_none(self, mock_check):
50-
assert mtls_helper.has_default_client_cert_source() is False
42+
assert mtls.has_default_client_cert_source() is True
5143

5244
@mock.patch("google.auth.transport._mtls_helper._get_workload_cert_and_key")
5345
def test_get_client_ssl_credentials_success(self, mock_workload):
5446
mock_workload.return_value = (CERT_DATA, KEY_DATA)
55-
56-
success, cert, key, passphrase = mtls_helper.get_client_ssl_credentials()
57-
47+
48+
success, cert, key, passphrase = mtls.get_client_ssl_credentials()
49+
5850
assert success is True
5951
assert cert == CERT_DATA
6052
assert key == KEY_DATA
6153
assert passphrase is None
6254

63-
@mock.patch("google.auth.transport._mtls_helper._get_workload_cert_and_key", return_value=(None, None))
64-
def test_get_client_ssl_credentials_fail(self, mock_workload):
65-
success, cert, key, passphrase = mtls_helper.get_client_ssl_credentials()
66-
assert success is False
67-
assert cert is None
68-
6955
def test_get_client_cert_and_key_callback(self):
70-
# Callback should take priority
56+
# The callback should be tried first and return immediately
7157
callback = mock.Mock(return_value=(CERT_DATA, KEY_DATA))
72-
73-
success, cert, key = mtls_helper.get_client_cert_and_key(callback)
74-
58+
59+
success, cert, key = mtls.get_client_cert_and_key(callback)
60+
7561
assert success is True
7662
assert cert == CERT_DATA
7763
assert key == KEY_DATA
7864
callback.assert_called_once()
7965

80-
@mock.patch("google.auth.transport.aio.mtls_helper.get_client_ssl_credentials")
66+
@mock.patch("google.auth.aio.transport.mtls.get_client_ssl_credentials")
8167
def test_get_client_cert_and_key_default(self, mock_get_ssl):
68+
# If no callback, it should call get_client_ssl_credentials
8269
mock_get_ssl.return_value = (True, CERT_DATA, KEY_DATA, None)
83-
84-
success, cert, key = mtls_helper.get_client_cert_and_key(None)
85-
70+
71+
success, cert, key = mtls.get_client_cert_and_key(None)
72+
8673
assert success is True
8774
assert cert == CERT_DATA
8875
assert key == KEY_DATA
76+
mock_get_ssl.assert_called_with(generate_encrypted_key=False)
77+
78+
@mock.patch("google.auth.transport._mtls_helper._get_workload_cert_and_key")
79+
def test_get_client_ssl_credentials_error(self, mock_workload):
80+
"""Tests that ClientCertError is propagated correctly."""
81+
# Setup the mock to raise the specific google-auth exception
82+
mock_workload.side_effect = exceptions.ClientCertError(
83+
"Failed to read metadata"
84+
)
85+
86+
# Verify that calling our function raises the same exception
87+
with pytest.raises(exceptions.ClientCertError, match="Failed to read metadata"):
88+
mtls.get_client_ssl_credentials()
89+
90+
@mock.patch("google.auth.aio.transport.mtls.get_client_ssl_credentials")
91+
def test_get_client_cert_and_key_exception_propagation(self, mock_get_ssl):
92+
"""Tests that get_client_cert_and_key propagates errors from its internal calls."""
93+
mock_get_ssl.side_effect = exceptions.ClientCertError(
94+
"Underlying credentials failed"
95+
)
96+
97+
with pytest.raises(
98+
exceptions.ClientCertError, match="Underlying credentials failed"
99+
):
100+
# Pass None for callback so it attempts to call get_client_ssl_credentials
101+
mtls.get_client_cert_and_key(client_cert_callback=None)

0 commit comments

Comments
 (0)