Skip to content

Commit b904001

Browse files
committed
Add retry after handling in ADBC Spark driver
1 parent 9eee98d commit b904001

5 files changed

Lines changed: 542 additions & 1 deletion

File tree

Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
using System;
19+
using System.Net;
20+
using System.Net.Http;
21+
using System.Threading;
22+
using System.Threading.Tasks;
23+
using System.IO;
24+
using System.Text;
25+
using Thrift;
26+
using Thrift.Protocol;
27+
using Thrift.Transport;
28+
using Apache.Hive.Service.Rpc.Thrift;
29+
30+
namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
31+
{
32+
/// <summary>
33+
/// HTTP handler that implements retry behavior for 503 responses with Retry-After headers.
34+
/// </summary>
35+
internal class RetryHttpHandler : DelegatingHandler
36+
{
37+
private readonly bool _retryEnabled;
38+
private readonly int _retryTimeoutSeconds;
39+
40+
/// <summary>
41+
/// Initializes a new instance of the <see cref="RetryHttpHandler"/> class.
42+
/// </summary>
43+
/// <param name="innerHandler">The inner handler to delegate to.</param>
44+
/// <param name="retryEnabled">Whether retry behavior is enabled.</param>
45+
/// <param name="retryTimeoutSeconds">Maximum total time in seconds to retry before failing.</param>
46+
public RetryHttpHandler(HttpMessageHandler innerHandler, bool retryEnabled, int retryTimeoutSeconds)
47+
: base(innerHandler)
48+
{
49+
_retryEnabled = retryEnabled;
50+
_retryTimeoutSeconds = retryTimeoutSeconds;
51+
}
52+
53+
/// <summary>
54+
/// Sends an HTTP request to the inner handler with retry logic for 503 responses.
55+
/// </summary>
56+
protected override async Task<HttpResponseMessage> SendAsync(
57+
HttpRequestMessage request,
58+
CancellationToken cancellationToken)
59+
{
60+
// If retry is disabled, just pass through to the inner handler
61+
if (!_retryEnabled)
62+
{
63+
return await base.SendAsync(request, cancellationToken);
64+
}
65+
66+
// Clone the request content if it's not null so we can reuse it for retries
67+
var requestContentClone = request.Content != null
68+
? await CloneHttpContentAsync(request.Content)
69+
: null;
70+
71+
HttpResponseMessage response;
72+
string? lastErrorMessage = null;
73+
DateTime startTime = DateTime.UtcNow;
74+
int totalRetrySeconds = 0;
75+
76+
do
77+
{
78+
// Set the content for each attempt (if needed)
79+
if (requestContentClone != null && request.Content == null)
80+
{
81+
request.Content = await CloneHttpContentAsync(requestContentClone);
82+
}
83+
84+
response = await base.SendAsync(request, cancellationToken);
85+
86+
// If it's not a 503 response, return immediately
87+
if (response.StatusCode != HttpStatusCode.ServiceUnavailable)
88+
{
89+
return response;
90+
}
91+
92+
// Check for Retry-After header
93+
if (!response.Headers.TryGetValues("Retry-After", out var retryAfterValues))
94+
{
95+
// No Retry-After header, so return the response as is
96+
return response;
97+
}
98+
99+
// Parse the Retry-After value
100+
string retryAfterValue = string.Join(",", retryAfterValues);
101+
if (!int.TryParse(retryAfterValue, out int retryAfterSeconds) || retryAfterSeconds <= 0)
102+
{
103+
// Invalid Retry-After value, return the response as is
104+
return response;
105+
}
106+
107+
// Extract error message from response if possible
108+
try
109+
{
110+
lastErrorMessage = await ExtractErrorMessageAsync(response);
111+
}
112+
catch
113+
{
114+
// If we can't extract the error message, just use a generic one
115+
lastErrorMessage = $"Service temporarily unavailable (HTTP 503). Retry after {retryAfterSeconds} seconds.";
116+
}
117+
118+
// Check if we've exceeded the timeout
119+
totalRetrySeconds += retryAfterSeconds;
120+
if (_retryTimeoutSeconds > 0 && totalRetrySeconds > _retryTimeoutSeconds)
121+
{
122+
// We've exceeded the timeout, so break out of the loop
123+
break;
124+
}
125+
126+
// Dispose the response before retrying
127+
response.Dispose();
128+
129+
// Wait for the specified retry time
130+
await Task.Delay(TimeSpan.FromSeconds(retryAfterSeconds), cancellationToken);
131+
132+
// Reset the request content for the next attempt
133+
request.Content = null;
134+
135+
} while (!cancellationToken.IsCancellationRequested);
136+
137+
// If we get here, we've either exceeded the timeout or been cancelled
138+
if (cancellationToken.IsCancellationRequested)
139+
{
140+
throw new OperationCanceledException("Request cancelled during retry wait", cancellationToken);
141+
}
142+
143+
// Create a custom exception with the SQL state code and last error message
144+
var exception = new AdbcException(
145+
lastErrorMessage ?? "Service temporarily unavailable and retry timeout exceeded",
146+
AdbcStatusCode.IOError);
147+
148+
// Add SQL state as part of the message since we can't set it directly
149+
throw new AdbcException(
150+
$"[SQLState: 08001] {exception.Message}",
151+
AdbcStatusCode.IOError);
152+
}
153+
154+
/// <summary>
155+
/// Clones an HttpContent object so it can be reused for retries.
156+
/// </summary>
157+
private static async Task<HttpContent> CloneHttpContentAsync(HttpContent content)
158+
{
159+
var ms = new MemoryStream();
160+
await content.CopyToAsync(ms);
161+
ms.Position = 0;
162+
163+
var clone = new StreamContent(ms);
164+
if (content.Headers != null)
165+
{
166+
foreach (var header in content.Headers)
167+
{
168+
clone.Headers.Add(header.Key, header.Value);
169+
}
170+
}
171+
return clone;
172+
}
173+
174+
/// <summary>
175+
/// Attempts to extract the error message from a Thrift TApplicationException in the response body.
176+
/// </summary>
177+
private static async Task<string?> ExtractErrorMessageAsync(HttpResponseMessage response)
178+
{
179+
if (response.Content == null)
180+
{
181+
return null;
182+
}
183+
184+
// Check if the content type is application/x-thrift
185+
if (response.Content.Headers.ContentType?.MediaType != "application/x-thrift")
186+
{
187+
// If it's not Thrift, just return the content as a string
188+
return await response.Content.ReadAsStringAsync();
189+
}
190+
191+
try
192+
{
193+
// For Thrift content, just return a generic message
194+
// We can't easily parse the Thrift message without access to the specific methods
195+
return await response.Content.ReadAsStringAsync();
196+
}
197+
catch
198+
{
199+
// If we can't read the content, return null
200+
return null;
201+
}
202+
}
203+
}
204+
}

csharp/src/Drivers/Apache/Spark/SparkHttpConnection.cs

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,16 @@ public SparkHttpConnection(IReadOnlyDictionary<string, string> properties) : bas
4444
{
4545
}
4646

47+
/// <summary>
48+
/// Gets a value indicating whether to retry requests that receive a 503 response with a Retry-After header.
49+
/// </summary>
50+
protected bool TemporarilyUnavailableRetry { get; private set; } = true;
51+
52+
/// <summary>
53+
/// Gets the maximum total time in seconds to retry 503 responses before failing.
54+
/// </summary>
55+
protected int TemporarilyUnavailableRetryTimeout { get; private set; } = 900;
56+
4757
protected override void ValidateAuthentication()
4858
{
4959
// Validate authentication parameters
@@ -136,6 +146,33 @@ protected override void ValidateOptions()
136146
? connectTimeoutMsValue
137147
: throw new ArgumentOutOfRangeException(SparkParameters.ConnectTimeoutMilliseconds, connectTimeoutMs, $"must be a value of 0 (infinite) or between 1 .. {int.MaxValue}. default is 30000 milliseconds.");
138148
}
149+
150+
// Parse retry configuration parameters
151+
Properties.TryGetValue(SparkParameters.TemporarilyUnavailableRetry, out string? tempUnavailableRetryStr);
152+
int tempUnavailableRetryValue = 1; // Default to enabled
153+
if (tempUnavailableRetryStr != null && !int.TryParse(tempUnavailableRetryStr, out tempUnavailableRetryValue))
154+
{
155+
throw new ArgumentOutOfRangeException(SparkParameters.TemporarilyUnavailableRetry, tempUnavailableRetryStr,
156+
$"must be a value of 0 (disabled) or 1 (enabled). Default is 1.");
157+
}
158+
TemporarilyUnavailableRetry = tempUnavailableRetryValue != 0;
159+
160+
Properties.TryGetValue(SparkParameters.TemporarilyUnavailableRetryTimeout, out string? tempUnavailableRetryTimeoutStr);
161+
if (tempUnavailableRetryTimeoutStr != null)
162+
{
163+
if (!int.TryParse(tempUnavailableRetryTimeoutStr, NumberStyles.Integer, CultureInfo.InvariantCulture, out int tempUnavailableRetryTimeoutValue) ||
164+
tempUnavailableRetryTimeoutValue < 0)
165+
{
166+
throw new ArgumentOutOfRangeException(SparkParameters.TemporarilyUnavailableRetryTimeout, tempUnavailableRetryTimeoutStr,
167+
$"must be a value of 0 (retry indefinitely) or a positive integer representing seconds. Default is 900 seconds (15 minutes).");
168+
}
169+
TemporarilyUnavailableRetryTimeout = tempUnavailableRetryTimeoutValue;
170+
}
171+
else
172+
{
173+
TemporarilyUnavailableRetryTimeout = 900; // Default to 15 minutes
174+
}
175+
139176
TlsOptions = HiveServer2TlsImpl.GetHttpTlsOptions(Properties);
140177
}
141178

@@ -162,7 +199,14 @@ protected override TTransport CreateTransport()
162199
AuthenticationHeaderValue? authenticationHeaderValue = GetAuthenticationHeaderValue(authTypeValue, token, username, password, access_token);
163200

164201
HttpClientHandler httpClientHandler = HiveServer2TlsImpl.NewHttpClientHandler(TlsOptions);
165-
HttpClient httpClient = new(httpClientHandler);
202+
203+
// Create a RetryHttpHandler that wraps the HttpClientHandler to handle 503 responses
204+
var retryHandler = new RetryHttpHandler(
205+
httpClientHandler,
206+
TemporarilyUnavailableRetry,
207+
TemporarilyUnavailableRetryTimeout);
208+
209+
HttpClient httpClient = new(retryHandler);
166210
httpClient.BaseAddress = baseAddress;
167211
httpClient.DefaultRequestHeaders.Authorization = authenticationHeaderValue;
168212
httpClient.DefaultRequestHeaders.UserAgent.ParseAdd(s_userAgent);

csharp/src/Drivers/Apache/Spark/SparkParameters.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,18 @@ public static class SparkParameters
5252
/// Default value is 5 minutes if not specified.
5353
/// </summary>
5454
public const string CloudFetchTimeoutMinutes = "adbc.spark.cloudfetch.timeout_minutes";
55+
56+
/// <summary>
57+
/// Controls whether to retry requests that receive a 503 response with a Retry-After header.
58+
/// Default value is 1 (enabled). Set to 0 to disable retry behavior.
59+
/// </summary>
60+
public const string TemporarilyUnavailableRetry = "adbc.spark.temporarily_unavailable_retry";
61+
62+
/// <summary>
63+
/// Maximum total time in seconds to retry 503 responses before failing.
64+
/// Default value is 900 seconds (15 minutes). Set to 0 to retry indefinitely.
65+
/// </summary>
66+
public const string TemporarilyUnavailableRetryTimeout = "adbc.spark.temporarily_unavailable_retry_timeout";
5567
}
5668

5769
public static class SparkAuthTypeConstants

0 commit comments

Comments
 (0)