Skip to content

Commit 30626e5

Browse files
authored
feat(csharp): Add retry-after behavior for 503 responses in Spark ADBC driver (#2664)
## Description This PR implements retry-after behavior for the Spark ADBC driver when receiving 503 responses with Retry-After headers. This is particularly useful for Databricks clusters that may return 503 responses when a cluster is starting up or experiencing temporary unavailability. ## Changes - Added new configuration parameters: - `adbc.spark.temporarily_unavailable_retry` (default: 1 - enabled) - `adbc.spark.temporarily_unavailable_retry_timeout` (default: 900 seconds) - Created a `RetryHttpHandler` class that wraps the existing `HttpClientHandler` to handle 503 responses - Modified `SparkHttpConnection` to use the new retry handler - Added comprehensive unit tests for the retry behavior ## Implementation Details When a 503 response with a Retry-After header is received: 1. The handler will wait for the number of seconds specified in the header 2. It will then retry the request 3. If another 503 response is received, it will continue retrying 4. If the total retry time exceeds the configured timeout, it will fail with an appropriate error message ## Testing Added unit tests to verify: - Retry behavior for 503 responses with Retry-After headers - Timeout behavior when retry time exceeds the configured limit - Handling of invalid or missing Retry-After headers - Disabling retry behavior via configuration - Parameter validationv
1 parent b89c9d0 commit 30626e5

7 files changed

Lines changed: 533 additions & 2 deletions

File tree

csharp/src/Drivers/Apache/Spark/SparkHttpConnection.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,11 +135,17 @@ protected override void ValidateOptions()
135135
? connectTimeoutMsValue
136136
: throw new ArgumentOutOfRangeException(SparkParameters.ConnectTimeoutMilliseconds, connectTimeoutMs, $"must be a value of 0 (infinite) or between 1 .. {int.MaxValue}. default is 30000 milliseconds.");
137137
}
138+
138139
TlsOptions = HiveServer2TlsImpl.GetHttpTlsOptions(Properties);
139140
}
140141

141142
internal override IArrowArrayStream NewReader<T>(T statement, Schema schema, TGetResultSetMetadataResp? metadataResp = null) => new HiveServer2Reader(statement, schema, dataTypeConversion: statement.Connection.DataTypeConversion);
142143

144+
protected virtual HttpMessageHandler CreateHttpHandler()
145+
{
146+
return HiveServer2TlsImpl.NewHttpClientHandler(TlsOptions);
147+
}
148+
143149
protected override TTransport CreateTransport()
144150
{
145151
// Assumption: parameters have already been validated.
@@ -160,8 +166,7 @@ protected override TTransport CreateTransport()
160166
Uri baseAddress = GetBaseAddress(uri, hostName, path, port, SparkParameters.HostName, TlsOptions.IsTlsEnabled);
161167
AuthenticationHeaderValue? authenticationHeaderValue = GetAuthenticationHeaderValue(authTypeValue, token, username, password, access_token);
162168

163-
HttpClientHandler httpClientHandler = HiveServer2TlsImpl.NewHttpClientHandler(TlsOptions);
164-
HttpClient httpClient = new(httpClientHandler);
169+
HttpClient httpClient = new(CreateHttpHandler());
165170
httpClient.BaseAddress = baseAddress;
166171
httpClient.DefaultRequestHeaders.Authorization = authenticationHeaderValue;
167172
httpClient.DefaultRequestHeaders.UserAgent.ParseAdd(s_userAgent);

csharp/src/Drivers/Databricks/DatabricksConnection.cs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
using System.Collections.Generic;
2020
using System.Diagnostics;
2121
using System.Linq;
22+
using System.Net.Http;
2223
using System.Threading;
2324
using System.Threading.Tasks;
2425
using Apache.Arrow.Adbc.Drivers.Apache;
@@ -39,6 +40,8 @@ internal class DatabricksConnection : SparkHttpConnection
3940
private bool _useCloudFetch = true;
4041
private bool _canDecompressLz4 = true;
4142
private long _maxBytesPerFile = DefaultMaxBytesPerFile;
43+
private const bool DefaultRetryOnUnavailable= true;
44+
private const int DefaultTemporarilyUnavailableRetryTimeout = 500;
4245

4346
public DatabricksConnection(IReadOnlyDictionary<string, string> properties) : base(properties)
4447
{
@@ -122,6 +125,26 @@ private void ValidateProperties()
122125
/// </summary>
123126
internal long MaxBytesPerFile => _maxBytesPerFile;
124127

128+
/// <summary>
129+
/// Gets a value indicating whether to retry requests that receive a 503 response with a Retry-After header.
130+
/// </summary>
131+
protected bool TemporarilyUnavailableRetry { get; private set; } = DefaultRetryOnUnavailable;
132+
133+
/// <summary>
134+
/// Gets the maximum total time in seconds to retry 503 responses before failing.
135+
/// </summary>
136+
protected int TemporarilyUnavailableRetryTimeout { get; private set; } = DefaultTemporarilyUnavailableRetryTimeout;
137+
138+
protected override HttpMessageHandler CreateHttpHandler()
139+
{
140+
var baseHandler = base.CreateHttpHandler();
141+
if (TemporarilyUnavailableRetry)
142+
{
143+
return new RetryHttpHandler(baseHandler, TemporarilyUnavailableRetryTimeout);
144+
}
145+
return baseHandler;
146+
}
147+
125148
internal override IArrowArrayStream NewReader<T>(T statement, Schema schema, TGetResultSetMetadataResp? metadataResp = null)
126149
{
127150
// Get result format from metadata response if available
@@ -259,6 +282,34 @@ private string EscapeSqlString(string value)
259282
return "`" + value.Replace("`", "``") + "`";
260283
}
261284

285+
protected override void ValidateOptions()
286+
{
287+
base.ValidateOptions();
288+
289+
if (Properties.TryGetValue(DatabricksParameters.TemporarilyUnavailableRetry, out string? tempUnavailableRetryStr))
290+
{
291+
if (!bool.TryParse(tempUnavailableRetryStr, out bool tempUnavailableRetryValue))
292+
{
293+
throw new ArgumentOutOfRangeException(DatabricksParameters.TemporarilyUnavailableRetry, tempUnavailableRetryStr,
294+
$"must be a value of false (disabled) or true (enabled). Default is true.");
295+
}
296+
297+
TemporarilyUnavailableRetry = tempUnavailableRetryValue;
298+
}
299+
300+
301+
if(Properties.TryGetValue(DatabricksParameters.TemporarilyUnavailableRetryTimeout, out string? tempUnavailableRetryTimeoutStr))
302+
{
303+
if (!int.TryParse(tempUnavailableRetryTimeoutStr, out int tempUnavailableRetryTimeoutValue) ||
304+
tempUnavailableRetryTimeoutValue < 0)
305+
{
306+
throw new ArgumentOutOfRangeException(DatabricksParameters.TemporarilyUnavailableRetryTimeout, tempUnavailableRetryTimeoutStr,
307+
$"must be a value of 0 (retry indefinitely) or a positive integer representing seconds. Default is 900 seconds (15 minutes).");
308+
}
309+
TemporarilyUnavailableRetryTimeout = tempUnavailableRetryTimeoutValue;
310+
}
311+
}
312+
262313
protected override Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TGetSchemasResp response, CancellationToken cancellationToken = default) =>
263314
Task.FromResult(response.DirectResults.ResultSetMetadata);
264315
protected override Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TGetCatalogsResp response, CancellationToken cancellationToken = default) =>
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
using System;
19+
20+
namespace Apache.Arrow.Adbc.Drivers.Databricks
21+
{
22+
public class DatabricksException : AdbcException
23+
{
24+
private string? _sqlState;
25+
private int _nativeError;
26+
27+
public DatabricksException()
28+
{
29+
}
30+
31+
public DatabricksException(string message) : base(message)
32+
{
33+
}
34+
35+
public DatabricksException(string message, AdbcStatusCode statusCode) : base(message, statusCode)
36+
{
37+
}
38+
39+
public DatabricksException(string message, Exception innerException) : base(message, innerException)
40+
{
41+
}
42+
43+
public DatabricksException(string message, AdbcStatusCode statusCode, Exception innerException) : base(message, statusCode, innerException)
44+
{
45+
}
46+
47+
public override string? SqlState
48+
{
49+
get { return _sqlState; }
50+
}
51+
52+
public override int NativeError
53+
{
54+
get { return _nativeError; }
55+
}
56+
57+
internal DatabricksException SetSqlState(string sqlState)
58+
{
59+
_sqlState = sqlState;
60+
return this;
61+
}
62+
63+
internal DatabricksException SetNativeError(int nativeError)
64+
{
65+
_nativeError = nativeError;
66+
return this;
67+
}
68+
}
69+
}

csharp/src/Drivers/Databricks/DatabricksParameters.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,16 @@ public class DatabricksParameters : SparkParameters
7575
/// and value "true" will result in executing "set use_cached_result=true" on the server.
7676
/// </summary>
7777
public const string ServerSidePropertyPrefix = "adbc.databricks.SSP_";
78+
/// Controls whether to retry requests that receive a 503 response with a Retry-After header.
79+
/// Default value is true (enabled). Set to false to disable retry behavior.
80+
/// </summary>
81+
public const string TemporarilyUnavailableRetry = "adbc.spark.temporarily_unavailable_retry";
82+
83+
/// <summary>
84+
/// Maximum total time in seconds to retry 503 responses before failing.
85+
/// Default value is 900 seconds (15 minutes). Set to 0 to retry indefinitely.
86+
/// </summary>
87+
public const string TemporarilyUnavailableRetryTimeout = "adbc.spark.temporarily_unavailable_retry_timeout";
7888
}
7989

8090
/// <summary>
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
using System;
19+
using System.Net;
20+
using System.Net.Http;
21+
using System.Threading;
22+
using System.Threading.Tasks;
23+
using System.IO;
24+
25+
namespace Apache.Arrow.Adbc.Drivers.Databricks
26+
{
27+
/// <summary>
28+
/// HTTP handler that implements retry behavior for 503 responses with Retry-After headers.
29+
/// </summary>
30+
internal class RetryHttpHandler : DelegatingHandler
31+
{
32+
private readonly int _retryTimeoutSeconds;
33+
34+
/// <summary>
35+
/// Initializes a new instance of the <see cref="RetryHttpHandler"/> class.
36+
/// </summary>
37+
/// <param name="innerHandler">The inner handler to delegate to.</param>
38+
/// <param name="retryEnabled">Whether retry behavior is enabled.</param>
39+
/// <param name="retryTimeoutSeconds">Maximum total time in seconds to retry before failing.</param>
40+
public RetryHttpHandler(HttpMessageHandler innerHandler, int retryTimeoutSeconds)
41+
: base(innerHandler)
42+
{
43+
_retryTimeoutSeconds = retryTimeoutSeconds;
44+
}
45+
46+
/// <summary>
47+
/// Sends an HTTP request to the inner handler with retry logic for 503 responses.
48+
/// </summary>
49+
protected override async Task<HttpResponseMessage> SendAsync(
50+
HttpRequestMessage request,
51+
CancellationToken cancellationToken)
52+
{
53+
// Clone the request content if it's not null so we can reuse it for retries
54+
var requestContentClone = request.Content != null
55+
? await CloneHttpContentAsync(request.Content)
56+
: null;
57+
58+
HttpResponseMessage response;
59+
string? lastErrorMessage = null;
60+
DateTime startTime = DateTime.UtcNow;
61+
int totalRetrySeconds = 0;
62+
63+
do
64+
{
65+
// Set the content for each attempt (if needed)
66+
if (requestContentClone != null && request.Content == null)
67+
{
68+
request.Content = await CloneHttpContentAsync(requestContentClone);
69+
}
70+
71+
response = await base.SendAsync(request, cancellationToken);
72+
73+
// If it's not a 503 response, return immediately
74+
if (response.StatusCode != HttpStatusCode.ServiceUnavailable)
75+
{
76+
return response;
77+
}
78+
79+
// Check for Retry-After header
80+
if (!response.Headers.TryGetValues("Retry-After", out var retryAfterValues))
81+
{
82+
// No Retry-After header, so return the response as is
83+
return response;
84+
}
85+
86+
// Parse the Retry-After value
87+
string retryAfterValue = string.Join(",", retryAfterValues);
88+
if (!int.TryParse(retryAfterValue, out int retryAfterSeconds) || retryAfterSeconds <= 0)
89+
{
90+
// Invalid Retry-After value, return the response as is
91+
return response;
92+
}
93+
94+
lastErrorMessage = $"Service temporarily unavailable (HTTP 503). Retry after {retryAfterSeconds} seconds.";
95+
96+
// Dispose the response before retrying
97+
response.Dispose();
98+
99+
// Reset the request content for the next attempt
100+
request.Content = null;
101+
102+
// Check if we've exceeded the timeout
103+
totalRetrySeconds += retryAfterSeconds;
104+
if (_retryTimeoutSeconds > 0 && totalRetrySeconds > _retryTimeoutSeconds)
105+
{
106+
// We've exceeded the timeout, so break out of the loop
107+
break;
108+
}
109+
110+
// Wait for the specified retry time
111+
await Task.Delay(TimeSpan.FromSeconds(retryAfterSeconds), cancellationToken);
112+
} while (!cancellationToken.IsCancellationRequested);
113+
114+
// If we get here, we've either exceeded the timeout or been cancelled
115+
if (cancellationToken.IsCancellationRequested)
116+
{
117+
throw new OperationCanceledException("Request cancelled during retry wait", cancellationToken);
118+
}
119+
120+
throw new DatabricksException(
121+
lastErrorMessage ?? "Service temporarily unavailable and retry timeout exceeded",
122+
AdbcStatusCode.IOError);
123+
}
124+
125+
/// <summary>
126+
/// Clones an HttpContent object so it can be reused for retries.
127+
/// per .net guidance, we should not reuse the http content across multiple
128+
/// request, as it maybe disposed.
129+
/// </summary>
130+
private static async Task<HttpContent> CloneHttpContentAsync(HttpContent content)
131+
{
132+
var ms = new MemoryStream();
133+
await content.CopyToAsync(ms);
134+
ms.Position = 0;
135+
136+
var clone = new StreamContent(ms);
137+
if (content.Headers != null)
138+
{
139+
foreach (var header in content.Headers)
140+
{
141+
clone.Headers.Add(header.Key, header.Value);
142+
}
143+
}
144+
return clone;
145+
}
146+
}
147+
}

csharp/test/Drivers/Databricks/DatabricksConnectionTest.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,11 @@ public InvalidConnectionParametersTestData()
307307
Add(new(new() { /*[SparkParameters.Type] = SparkServerTypeConstants.Databricks,*/ [SparkParameters.Token] = "abcdef", [AdbcOptions.Uri] = "http-//hostname.com" }, typeof(ArgumentException)));
308308
Add(new(new() { /*[SparkParameters.Type] = SparkServerTypeConstants.Databricks,*/ [SparkParameters.Token] = "abcdef", [AdbcOptions.Uri] = "httpxxz://hostname.com:1234567890" }, typeof(ArgumentException)));
309309
Add(new(new() { /*[SparkParameters.Type] = SparkServerTypeConstants.Databricks,*/ [SparkParameters.Token] = "abcdef", [SparkParameters.HostName] = "valid.server.com", [AdbcOptions.Uri] = "http://valid.hostname.com" }, typeof(ArgumentOutOfRangeException)));
310+
311+
// Tests for the new retry configuration parameters
312+
Add(new(new() { [SparkParameters.Type] = SparkServerTypeConstants.Http, [SparkParameters.HostName] = "valid.server.com", [AdbcOptions.Username] = "user", [AdbcOptions.Password] = "myPassword", [DatabricksParameters.TemporarilyUnavailableRetry] = "invalid" }, typeof(ArgumentOutOfRangeException)));
313+
Add(new(new() { [SparkParameters.Type] = SparkServerTypeConstants.Http, [SparkParameters.HostName] = "valid.server.com", [AdbcOptions.Username] = "user", [AdbcOptions.Password] = "myPassword", [DatabricksParameters.TemporarilyUnavailableRetryTimeout] = "invalid" }, typeof(ArgumentOutOfRangeException)));
314+
Add(new(new() { [SparkParameters.Type] = SparkServerTypeConstants.Http, [SparkParameters.HostName] = "valid.server.com", [AdbcOptions.Username] = "user", [AdbcOptions.Password] = "myPassword", [DatabricksParameters.TemporarilyUnavailableRetryTimeout] = "-1" }, typeof(ArgumentOutOfRangeException)));
310315
}
311316
}
312317
}

0 commit comments

Comments
 (0)