1414
1515"""Utilites for mutual TLS."""
1616
17+ import logging
1718from os import getenv
19+ import ssl
20+ from typing import Optional
1821
22+ from google .auth import environment_vars
1923from google .auth import exceptions
2024from google .auth .transport import _mtls_helper
2125
2226
27+ _LOGGER = logging .getLogger (__name__ )
28+
29+
2330def has_default_client_cert_source (include_context_aware = True ):
2431 """Check if default client SSL credentials exists on the device.
2532
@@ -60,7 +67,7 @@ def default_client_cert_source():
6067 client certificate bytes and private key bytes, both in PEM format.
6168
6269 Raises:
63- google.auth.exceptions.DefaultClientCertSourceError : If the default
70+ google.auth.exceptions.MutualTLSChannelError : If the default
6471 client SSL credentials don't exist or are malformed.
6572 """
6673 if not has_default_client_cert_source (include_context_aware = True ):
@@ -71,7 +78,12 @@ def default_client_cert_source():
7178 def callback ():
7279 try :
7380 _ , cert_bytes , key_bytes = _mtls_helper .get_client_cert_and_key ()
74- except (OSError , RuntimeError , ValueError ) as caught_exc :
81+ except (
82+ exceptions .ClientCertError ,
83+ OSError ,
84+ RuntimeError ,
85+ ValueError ,
86+ ) as caught_exc :
7587 new_exc = exceptions .MutualTLSChannelError (caught_exc )
7688 raise new_exc from caught_exc
7789
@@ -96,7 +108,7 @@ def default_client_encrypted_cert_source(cert_path, key_path):
96108 returns the cert_path, key_path and passphrase bytes.
97109
98110 Raises:
99- google.auth.exceptions.DefaultClientCertSourceError : If any problem
111+ google.auth.exceptions.MutualTLSChannelError : If any problem
100112 occurs when loading or saving the client certificate and key.
101113 """
102114 if not has_default_client_cert_source (include_context_aware = True ):
@@ -140,3 +152,163 @@ def should_use_client_cert():
140152 bool: indicating whether the client certificate should be used for mTLS.
141153 """
142154 return _mtls_helper .check_use_client_cert ()
155+
156+
157+ def load_client_cert_into_context (
158+ ctx : ssl .SSLContext ,
159+ cert_bytes : bytes ,
160+ key_bytes : bytes ,
161+ passphrase : Optional [bytes ] = None ,
162+ ) -> None :
163+ """Load a client certificate and key into an SSL context.
164+
165+ Args:
166+ ctx (ssl.SSLContext): The SSL context to load the certificate and key into.
167+ cert_bytes (bytes): The client certificate bytes in PEM format.
168+ key_bytes (bytes): The client private key bytes in PEM format.
169+ passphrase (Optional[bytes]): The passphrase for the client private key.
170+
171+ Raises:
172+ google.auth.exceptions.MutualTLSChannelError: If the SSL context is invalid,
173+ or if loading the certificate and key fails.
174+ """
175+ if ctx is None or not hasattr (ctx , "load_cert_chain" ):
176+ raise exceptions .MutualTLSChannelError (
177+ "Failed to load client certificate and key for mTLS. The provided context "
178+ "object is invalid or does not support loading certificate chains."
179+ )
180+
181+ try :
182+ with _mtls_helper .secure_cert_key_paths (
183+ cert_bytes , key_bytes , passphrase = passphrase
184+ ) as (
185+ cert_path ,
186+ key_path ,
187+ passphrase_val ,
188+ ):
189+ ctx .load_cert_chain (
190+ certfile = cert_path , keyfile = key_path , password = passphrase_val
191+ )
192+ except (
193+ ssl .SSLError ,
194+ OSError ,
195+ ValueError ,
196+ RuntimeError ,
197+ ) as caught_exc :
198+ new_exc = exceptions .MutualTLSChannelError (
199+ "Failed to load client certificate and key for mTLS."
200+ )
201+ raise new_exc from caught_exc
202+
203+
204+ def make_client_cert_ssl_context (
205+ cert_bytes : bytes ,
206+ key_bytes : bytes ,
207+ passphrase : Optional [bytes ] = None ,
208+ ) -> ssl .SSLContext :
209+ """Create a default SSL context loaded with the client certificate and key.
210+
211+ Args:
212+ cert_bytes (bytes): The client certificate bytes in PEM format.
213+ key_bytes (bytes): The client private key bytes in PEM format.
214+ passphrase (Optional[bytes]): The passphrase for the client private key.
215+
216+ Returns:
217+ ssl.SSLContext: The SSL context loaded with the client certificate and key.
218+
219+ Raises:
220+ google.auth.exceptions.MutualTLSChannelError: If loading the certificate and key fails.
221+ """
222+ ctx = ssl .create_default_context (ssl .Purpose .SERVER_AUTH )
223+ load_client_cert_into_context (ctx , cert_bytes , key_bytes , passphrase = passphrase )
224+ return ctx
225+
226+
227+ def load_default_client_cert (ctx : ssl .SSLContext ) -> bool :
228+ """Load the default client certificate and key into an SSL context if configured.
229+
230+ If client certificates are enabled and a default client certificate source is
231+ found, the certificate and key are loaded into the SSL context.
232+
233+ Args:
234+ ctx (ssl.SSLContext): The SSL context to load the default client certificate
235+ and key into.
236+
237+ Returns:
238+ bool: True if client certificates are enabled and the default client
239+ certificate was successfully loaded. False if client certificates
240+ are disabled or if no default certificate source is configured.
241+
242+ Raises:
243+ google.auth.exceptions.ClientCertError: If the default client certificate
244+ source exists but cannot be loaded or parsed.
245+ google.auth.exceptions.MutualTLSChannelError: If the default client certificate
246+ or key is malformed.
247+ """
248+ if not should_use_client_cert () or not has_default_client_cert_source ():
249+ return False
250+ (
251+ has_cert ,
252+ cert_bytes ,
253+ key_bytes ,
254+ passphrase ,
255+ ) = _mtls_helper .get_client_ssl_credentials ()
256+ if not has_cert :
257+ return False
258+ load_client_cert_into_context (ctx , cert_bytes , key_bytes , passphrase )
259+ return True
260+
261+
262+ def get_default_ssl_context () -> Optional [ssl .SSLContext ]:
263+ """Get a default SSL context loaded with the default client certificate.
264+
265+ Returns:
266+ ssl.SSLContext: An SSL context loaded with the default client
267+ certificate, or None if client certificates are not configured
268+ or available.
269+
270+ Raises:
271+ google.auth.exceptions.ClientCertError: If the default client certificate
272+ source exists but cannot be loaded or parsed.
273+ google.auth.exceptions.MutualTLSChannelError: If the default client certificate
274+ or key is malformed.
275+ """
276+ ctx = ssl .create_default_context (ssl .Purpose .SERVER_AUTH )
277+ return ctx if load_default_client_cert (ctx ) else None
278+
279+
280+ def should_use_mtls_endpoint (
281+ client_cert_available : Optional [bool ] = None ,
282+ ) -> bool :
283+ """Determine whether to use an mTLS endpoint.
284+
285+ This relies on the GOOGLE_API_USE_MTLS_ENDPOINT environment variable. If set to
286+ "always", returns True. If set to "never", returns False. If set to "auto"
287+ or unset, returns whether a client certificate is available.
288+
289+ Args:
290+ client_cert_available (bool): indicating if a client certificate
291+ is available. If None, this is determined by checking if client
292+ certificates are enabled and a default source is present.
293+
294+ Returns:
295+ bool: indicating if an mTLS endpoint should be used.
296+ """
297+ if client_cert_available is None :
298+ client_cert_available = should_use_client_cert ()
299+
300+ use_mtls_endpoint = getenv (environment_vars .GOOGLE_API_USE_MTLS_ENDPOINT , "auto" )
301+ use_mtls_endpoint = use_mtls_endpoint .lower ()
302+ if use_mtls_endpoint == "always" :
303+ return True
304+ if use_mtls_endpoint == "never" :
305+ return False
306+ if use_mtls_endpoint == "auto" :
307+ return client_cert_available
308+
309+ _LOGGER .warning (
310+ "Unsupported GOOGLE_API_USE_MTLS_ENDPOINT value %r. Accepted "
311+ "values: never, auto, always. Defaulting to auto." ,
312+ use_mtls_endpoint ,
313+ )
314+ return client_cert_available
0 commit comments