-
Notifications
You must be signed in to change notification settings - Fork 140
Expand file tree
/
Copy pathunified_http_client.py
More file actions
367 lines (304 loc) · 13.3 KB
/
unified_http_client.py
File metadata and controls
367 lines (304 loc) · 13.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
import logging
import ssl
import urllib.parse
import urllib.request
from contextlib import contextmanager
from typing import Dict, Any, Optional, Generator
import urllib3
from urllib3 import PoolManager, ProxyManager
from urllib3.util import make_headers
from urllib3.exceptions import MaxRetryError
# Compatibility import for different urllib3 versions
try:
# If urllib3~=2.0 is installed
from urllib3 import BaseHTTPResponse
except ImportError:
# If urllib3~=1.0 is installed
from urllib3 import HTTPResponse as BaseHTTPResponse
from databricks.sql.auth.retry import DatabricksRetryPolicy, CommandType
from databricks.sql.exc import RequestError
from databricks.sql.common.http import HttpMethod
from databricks.sql.common.http_utils import (
detect_and_parse_proxy,
)
logger = logging.getLogger(__name__)
def _extract_http_status_from_max_retry_error(e: MaxRetryError) -> Optional[int]:
"""
Extract HTTP status code from MaxRetryError if available.
urllib3 structures MaxRetryError in different ways depending on the failure scenario:
- e.reason.response.status: Most common case when retries are exhausted
- e.response.status: Alternate structure in some scenarios
Args:
e: MaxRetryError exception from urllib3
Returns:
HTTP status code as int if found, None otherwise
"""
# Try primary structure: e.reason.response.status
if (
hasattr(e, "reason")
and e.reason is not None
and hasattr(e.reason, "response")
and e.reason.response is not None
):
http_code = getattr(e.reason.response, "status", None)
if http_code is not None:
return http_code
# Try alternate structure: e.response.status
if (
hasattr(e, "response")
and e.response is not None
and hasattr(e.response, "status")
):
return e.response.status
return None
class UnifiedHttpClient:
"""
Unified HTTP client for all Databricks SQL connector HTTP operations.
This client uses urllib3 for robust HTTP communication with retry policies,
connection pooling, SSL support, and proxy support. It replaces the various
singleton HTTP clients and direct requests usage throughout the codebase.
The client supports per-request proxy decisions, automatically routing requests
through proxy or direct connections based on system proxy bypass rules and
the target hostname of each request.
"""
def __init__(self, client_context):
"""
Initialize the unified HTTP client.
Args:
client_context: ClientContext instance containing HTTP configuration
"""
self.config = client_context
# Since the unified http client is used for all requests, we need to have proxy and direct pool managers
# for per-request proxy decisions.
self._direct_pool_manager = None
self._proxy_pool_manager = None
self._retry_policy = None
self._proxy_uri = None
self._proxy_auth = None
self._setup_pool_managers()
def _setup_pool_managers(self):
"""Set up both direct and proxy pool managers for per-request proxy decisions."""
# SSL context setup
ssl_context = None
if self.config.ssl_options:
ssl_context = ssl.create_default_context()
# Configure SSL verification
if not self.config.ssl_options.tls_verify:
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
elif not self.config.ssl_options.tls_verify_hostname:
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_REQUIRED
# Load custom CA file if specified
if self.config.ssl_options.tls_trusted_ca_file:
ssl_context.load_verify_locations(
self.config.ssl_options.tls_trusted_ca_file
)
# Load client certificate if specified
if (
self.config.ssl_options.tls_client_cert_file
and self.config.ssl_options.tls_client_cert_key_file
):
ssl_context.load_cert_chain(
self.config.ssl_options.tls_client_cert_file,
self.config.ssl_options.tls_client_cert_key_file,
self.config.ssl_options.tls_client_cert_key_password,
)
# Create retry policy
self._retry_policy = DatabricksRetryPolicy(
delay_min=self.config.retry_delay_min,
delay_max=self.config.retry_delay_max,
stop_after_attempts_count=self.config.retry_stop_after_attempts_count,
stop_after_attempts_duration=self.config.retry_stop_after_attempts_duration,
delay_default=self.config.retry_delay_default,
force_dangerous_codes=self.config.retry_dangerous_codes,
)
# Initialize the required attributes that DatabricksRetryPolicy expects
# but doesn't initialize in its constructor
self._retry_policy._command_type = None
self._retry_policy._retry_start_time = None
# Common pool manager kwargs
pool_kwargs = {
"num_pools": self.config.pool_connections,
"maxsize": self.config.pool_maxsize,
"retries": self._retry_policy,
"timeout": urllib3.Timeout(
connect=self.config.socket_timeout, read=self.config.socket_timeout
)
if self.config.socket_timeout
else None,
"ssl_context": ssl_context,
}
# Always create a direct pool manager
self._direct_pool_manager = PoolManager(**pool_kwargs)
# Detect system proxy configuration
# We use 'https' as default scheme since most requests will be HTTPS
parsed_url = urllib.parse.urlparse(self.config.hostname)
self.scheme = parsed_url.scheme or "https"
self.host = parsed_url.hostname
# Check if system has proxy configured for our scheme
try:
# Use shared proxy detection logic, skipping bypass since we handle that per-request
proxy_url, proxy_auth = detect_and_parse_proxy(
self.scheme,
self.host,
skip_bypass=True,
proxy_auth_method=self.config.proxy_auth_method,
)
if proxy_url:
# Store proxy configuration for per-request decisions
self._proxy_uri = proxy_url
self._proxy_auth = proxy_auth
# Create proxy pool manager
self._proxy_pool_manager = ProxyManager(
proxy_url, proxy_headers=proxy_auth, **pool_kwargs
)
logger.debug("Initialized with proxy support: %s", proxy_url)
else:
self._proxy_pool_manager = None
logger.debug("No system proxy detected, using direct connections only")
except Exception as e:
# If proxy detection fails, fall back to direct connections only
logger.debug("Error detecting system proxy configuration: %s", e)
self._proxy_pool_manager = None
def _should_use_proxy(self, target_host: str) -> bool:
"""
Determine if a request to the target host should use proxy.
Args:
target_host: The hostname of the target URL
Returns:
True if proxy should be used, False for direct connection
"""
# If no proxy is configured, always use direct connection
if not self._proxy_pool_manager or not self._proxy_uri:
return False
# Check system proxy bypass rules for this specific host
try:
# proxy_bypass returns True if the host should BYPASS the proxy
# We want the opposite - True if we should USE the proxy
return not urllib.request.proxy_bypass(target_host)
except Exception as e:
# If proxy_bypass fails, default to using proxy (safer choice)
logger.debug("Error checking proxy bypass for host %s: %s", target_host, e)
return True
def _get_pool_manager_for_url(self, url: str) -> urllib3.PoolManager:
"""
Get the appropriate pool manager for the given URL.
Args:
url: The target URL
Returns:
PoolManager instance (either direct or proxy)
"""
parsed_url = urllib.parse.urlparse(url)
target_host = parsed_url.hostname
if target_host and self._should_use_proxy(target_host):
logger.debug("Using proxy for request to %s", target_host)
return self._proxy_pool_manager
else:
logger.debug("Using direct connection for request to %s", target_host)
return self._direct_pool_manager
def _prepare_headers(
self, headers: Optional[Dict[str, str]] = None
) -> Dict[str, str]:
"""Prepare headers for the request, including User-Agent."""
request_headers = {}
if self.config.user_agent:
request_headers["User-Agent"] = self.config.user_agent
if headers:
request_headers.update(headers)
return request_headers
def _prepare_retry_policy(self):
"""Set up the retry policy for the current request."""
if isinstance(self._retry_policy, DatabricksRetryPolicy):
# Set command type for HTTP requests to OTHER (not database commands)
self._retry_policy.command_type = CommandType.OTHER
# Start the retry timer for duration-based retry limits
self._retry_policy.start_retry_timer()
@contextmanager
def request_context(
self,
method: HttpMethod,
url: str,
headers: Optional[Dict[str, str]] = None,
**kwargs,
) -> Generator[BaseHTTPResponse, None, None]:
"""
Context manager for making HTTP requests with proper resource cleanup.
Args:
method: HTTP method (HttpMethod.GET, HttpMethod.POST, HttpMethod.PUT, HttpMethod.DELETE)
url: URL to request
headers: Optional headers dict
**kwargs: Additional arguments passed to urllib3 request
Yields:
BaseHTTPResponse: The HTTP response object
"""
logger.debug(
"Making %s request to %s", method, urllib.parse.urlparse(url).netloc
)
request_headers = self._prepare_headers(headers)
# Prepare retry policy for this request
self._prepare_retry_policy()
# Select appropriate pool manager based on target URL
pool_manager = self._get_pool_manager_for_url(url)
response = None
try:
response = pool_manager.request(
method=method.value, url=url, headers=request_headers, **kwargs
)
yield response
except MaxRetryError as e:
logger.error("HTTP request failed after retries: %s", e)
# Extract HTTP status code from MaxRetryError if available
http_code = _extract_http_status_from_max_retry_error(e)
context = {}
if http_code is not None:
context["http-code"] = http_code
logger.error("HTTP request failed with status code: %d", http_code)
raise RequestError(f"HTTP request failed: {e}", context=context)
except Exception as e:
logger.error("HTTP request error: %s", e)
raise RequestError(f"HTTP request error: {e}")
finally:
if response:
response.close()
def request(
self,
method: HttpMethod,
url: str,
headers: Optional[Dict[str, str]] = None,
**kwargs,
) -> BaseHTTPResponse:
"""
Make an HTTP request.
Args:
method: HTTP method (HttpMethod.GET, HttpMethod.POST, HttpMethod.PUT, HttpMethod.DELETE, etc.)
url: URL to request
headers: Optional headers dict
**kwargs: Additional arguments passed to urllib3 request
Returns:
BaseHTTPResponse: The HTTP response object with data and metadata pre-loaded
"""
with self.request_context(method, url, headers=headers, **kwargs) as response:
# Read the response data to ensure it's available after context exit
# Note: status and headers remain accessible after close(); calling response.read() loads and caches the response data so it remains accessible after the response is closed.
response.read()
return response
def using_proxy(self) -> bool:
"""Check if proxy support is available (not whether it's being used for a specific request)."""
return self._proxy_pool_manager is not None
@property
def proxy_uri(self) -> Optional[str]:
"""Get the configured proxy URI, if any."""
return self._proxy_uri
def close(self):
"""Close the underlying connection pools."""
if self._direct_pool_manager:
self._direct_pool_manager.clear()
self._direct_pool_manager = None
if self._proxy_pool_manager:
self._proxy_pool_manager.clear()
self._proxy_pool_manager = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()