Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions csharp/src/Drivers/Apache/Spark/SparkHttpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,17 @@ protected override void ValidateOptions()
? connectTimeoutMsValue
: throw new ArgumentOutOfRangeException(SparkParameters.ConnectTimeoutMilliseconds, connectTimeoutMs, $"must be a value of 0 (infinite) or between 1 .. {int.MaxValue}. default is 30000 milliseconds.");
}

TlsOptions = HiveServer2TlsImpl.GetHttpTlsOptions(Properties);
}

internal override IArrowArrayStream NewReader<T>(T statement, Schema schema, TGetResultSetMetadataResp? metadataResp = null) => new HiveServer2Reader(statement, schema, dataTypeConversion: statement.Connection.DataTypeConversion);

protected virtual HttpMessageHandler CreateHttpHandler()
{
return HiveServer2TlsImpl.NewHttpClientHandler(TlsOptions);
}

protected override TTransport CreateTransport()
{
// Assumption: parameters have already been validated.
Expand All @@ -160,8 +166,7 @@ protected override TTransport CreateTransport()
Uri baseAddress = GetBaseAddress(uri, hostName, path, port, SparkParameters.HostName, TlsOptions.IsTlsEnabled);
AuthenticationHeaderValue? authenticationHeaderValue = GetAuthenticationHeaderValue(authTypeValue, token, username, password, access_token);

HttpClientHandler httpClientHandler = HiveServer2TlsImpl.NewHttpClientHandler(TlsOptions);
HttpClient httpClient = new(httpClientHandler);
HttpClient httpClient = new(CreateHttpHandler());
httpClient.BaseAddress = baseAddress;
httpClient.DefaultRequestHeaders.Authorization = authenticationHeaderValue;
httpClient.DefaultRequestHeaders.UserAgent.ParseAdd(s_userAgent);
Expand Down
51 changes: 51 additions & 0 deletions csharp/src/Drivers/Databricks/DatabricksConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Apache.Arrow.Adbc.Drivers.Apache;
Expand All @@ -39,6 +40,8 @@ internal class DatabricksConnection : SparkHttpConnection
private bool _useCloudFetch = true;
private bool _canDecompressLz4 = true;
private long _maxBytesPerFile = DefaultMaxBytesPerFile;
private const bool DefaultRetryOnUnavailable= true;
private const int DefaultTemporarilyUnavailableRetryTimeout = 500;

public DatabricksConnection(IReadOnlyDictionary<string, string> properties) : base(properties)
{
Expand Down Expand Up @@ -122,6 +125,26 @@ private void ValidateProperties()
/// </summary>
internal long MaxBytesPerFile => _maxBytesPerFile;

/// <summary>
/// Gets a value indicating whether to retry requests that receive a 503 response with a Retry-After header.
/// </summary>
protected bool TemporarilyUnavailableRetry { get; private set; } = DefaultRetryOnUnavailable;

/// <summary>
/// Gets the maximum total time in seconds to retry 503 responses before failing.
/// </summary>
protected int TemporarilyUnavailableRetryTimeout { get; private set; } = DefaultTemporarilyUnavailableRetryTimeout;

protected override HttpMessageHandler CreateHttpHandler()
{
var baseHandler = base.CreateHttpHandler();
if (TemporarilyUnavailableRetry)
{
return new RetryHttpHandler(baseHandler, TemporarilyUnavailableRetryTimeout);
}
return baseHandler;
}

internal override IArrowArrayStream NewReader<T>(T statement, Schema schema, TGetResultSetMetadataResp? metadataResp = null)
{
// Get result format from metadata response if available
Expand Down Expand Up @@ -259,6 +282,34 @@ private string EscapeSqlString(string value)
return "`" + value.Replace("`", "``") + "`";
}

protected override void ValidateOptions()
{
base.ValidateOptions();

if (Properties.TryGetValue(DatabricksParameters.TemporarilyUnavailableRetry, out string? tempUnavailableRetryStr))
{
Comment thread
jadewang-db marked this conversation as resolved.
if (!bool.TryParse(tempUnavailableRetryStr, out bool tempUnavailableRetryValue))
{
throw new ArgumentOutOfRangeException(DatabricksParameters.TemporarilyUnavailableRetry, tempUnavailableRetryStr,
$"must be a value of false (disabled) or true (enabled). Default is true.");
}

TemporarilyUnavailableRetry = tempUnavailableRetryValue;
}


if(Properties.TryGetValue(DatabricksParameters.TemporarilyUnavailableRetryTimeout, out string? tempUnavailableRetryTimeoutStr))
{
if (!int.TryParse(tempUnavailableRetryTimeoutStr, out int tempUnavailableRetryTimeoutValue) ||
tempUnavailableRetryTimeoutValue < 0)
{
throw new ArgumentOutOfRangeException(DatabricksParameters.TemporarilyUnavailableRetryTimeout, tempUnavailableRetryTimeoutStr,
$"must be a value of 0 (retry indefinitely) or a positive integer representing seconds. Default is 900 seconds (15 minutes).");
}
TemporarilyUnavailableRetryTimeout = tempUnavailableRetryTimeoutValue;
}
}

protected override Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TGetSchemasResp response, CancellationToken cancellationToken = default) =>
Task.FromResult(response.DirectResults.ResultSetMetadata);
protected override Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TGetCatalogsResp response, CancellationToken cancellationToken = default) =>
Expand Down
69 changes: 69 additions & 0 deletions csharp/src/Drivers/Databricks/DatabricksException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System;

namespace Apache.Arrow.Adbc.Drivers.Databricks
{
public class DatabricksException : AdbcException
{
private string? _sqlState;
private int _nativeError;

public DatabricksException()
{
}

public DatabricksException(string message) : base(message)
{
}

public DatabricksException(string message, AdbcStatusCode statusCode) : base(message, statusCode)
{
}

public DatabricksException(string message, Exception innerException) : base(message, innerException)
{
}

public DatabricksException(string message, AdbcStatusCode statusCode, Exception innerException) : base(message, statusCode, innerException)
{
}

public override string? SqlState
{
get { return _sqlState; }
}

public override int NativeError
{
get { return _nativeError; }
}

internal DatabricksException SetSqlState(string sqlState)
{
_sqlState = sqlState;
return this;
}

internal DatabricksException SetNativeError(int nativeError)
{
_nativeError = nativeError;
return this;
}
}
}
10 changes: 10 additions & 0 deletions csharp/src/Drivers/Databricks/DatabricksParameters.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,16 @@ public class DatabricksParameters : SparkParameters
/// and value "true" will result in executing "set use_cached_result=true" on the server.
/// </summary>
public const string ServerSidePropertyPrefix = "adbc.databricks.SSP_";
/// Controls whether to retry requests that receive a 503 response with a Retry-After header.
/// Default value is true (enabled). Set to false to disable retry behavior.
/// </summary>
public const string TemporarilyUnavailableRetry = "adbc.spark.temporarily_unavailable_retry";

/// <summary>
/// Maximum total time in seconds to retry 503 responses before failing.
/// Default value is 900 seconds (15 minutes). Set to 0 to retry indefinitely.
/// </summary>
public const string TemporarilyUnavailableRetryTimeout = "adbc.spark.temporarily_unavailable_retry_timeout";
}

/// <summary>
Expand Down
147 changes: 147 additions & 0 deletions csharp/src/Drivers/Databricks/RetryHttpHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System;
using System.Net;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using System.IO;

namespace Apache.Arrow.Adbc.Drivers.Databricks
{
/// <summary>
/// HTTP handler that implements retry behavior for 503 responses with Retry-After headers.
/// </summary>
internal class RetryHttpHandler : DelegatingHandler
{
private readonly int _retryTimeoutSeconds;

/// <summary>
/// Initializes a new instance of the <see cref="RetryHttpHandler"/> class.
/// </summary>
/// <param name="innerHandler">The inner handler to delegate to.</param>
/// <param name="retryEnabled">Whether retry behavior is enabled.</param>
/// <param name="retryTimeoutSeconds">Maximum total time in seconds to retry before failing.</param>
public RetryHttpHandler(HttpMessageHandler innerHandler, int retryTimeoutSeconds)
: base(innerHandler)
{
_retryTimeoutSeconds = retryTimeoutSeconds;
}

/// <summary>
/// Sends an HTTP request to the inner handler with retry logic for 503 responses.
/// </summary>
protected override async Task<HttpResponseMessage> SendAsync(
HttpRequestMessage request,
CancellationToken cancellationToken)
{
// Clone the request content if it's not null so we can reuse it for retries
var requestContentClone = request.Content != null
? await CloneHttpContentAsync(request.Content)
: null;

HttpResponseMessage response;
string? lastErrorMessage = null;
DateTime startTime = DateTime.UtcNow;
int totalRetrySeconds = 0;

do
{
// Set the content for each attempt (if needed)
if (requestContentClone != null && request.Content == null)
{
request.Content = await CloneHttpContentAsync(requestContentClone);
}

response = await base.SendAsync(request, cancellationToken);

// If it's not a 503 response, return immediately
if (response.StatusCode != HttpStatusCode.ServiceUnavailable)
{
return response;
}

// Check for Retry-After header
if (!response.Headers.TryGetValues("Retry-After", out var retryAfterValues))
{
// No Retry-After header, so return the response as is
return response;
}

// Parse the Retry-After value
string retryAfterValue = string.Join(",", retryAfterValues);
if (!int.TryParse(retryAfterValue, out int retryAfterSeconds) || retryAfterSeconds <= 0)
{
// Invalid Retry-After value, return the response as is
return response;
}

lastErrorMessage = $"Service temporarily unavailable (HTTP 503). Retry after {retryAfterSeconds} seconds.";

// Dispose the response before retrying
response.Dispose();

// Reset the request content for the next attempt
request.Content = null;

// Check if we've exceeded the timeout
totalRetrySeconds += retryAfterSeconds;
if (_retryTimeoutSeconds > 0 && totalRetrySeconds > _retryTimeoutSeconds)
{
// We've exceeded the timeout, so break out of the loop
break;
}

// Wait for the specified retry time
await Task.Delay(TimeSpan.FromSeconds(retryAfterSeconds), cancellationToken);
} while (!cancellationToken.IsCancellationRequested);

// If we get here, we've either exceeded the timeout or been cancelled
if (cancellationToken.IsCancellationRequested)
{
throw new OperationCanceledException("Request cancelled during retry wait", cancellationToken);
}

throw new DatabricksException(
lastErrorMessage ?? "Service temporarily unavailable and retry timeout exceeded",
AdbcStatusCode.IOError);
}

/// <summary>
/// Clones an HttpContent object so it can be reused for retries.
/// per .net guidance, we should not reuse the http content across multiple
/// request, as it maybe disposed.
/// </summary>
private static async Task<HttpContent> CloneHttpContentAsync(HttpContent content)
{
var ms = new MemoryStream();
await content.CopyToAsync(ms);
ms.Position = 0;

var clone = new StreamContent(ms);
if (content.Headers != null)
{
foreach (var header in content.Headers)
{
clone.Headers.Add(header.Key, header.Value);
}
}
return clone;
}
}
}
5 changes: 5 additions & 0 deletions csharp/test/Drivers/Databricks/DatabricksConnectionTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,11 @@ public InvalidConnectionParametersTestData()
Add(new(new() { /*[SparkParameters.Type] = SparkServerTypeConstants.Databricks,*/ [SparkParameters.Token] = "abcdef", [AdbcOptions.Uri] = "http-//hostname.com" }, typeof(ArgumentException)));
Add(new(new() { /*[SparkParameters.Type] = SparkServerTypeConstants.Databricks,*/ [SparkParameters.Token] = "abcdef", [AdbcOptions.Uri] = "httpxxz://hostname.com:1234567890" }, typeof(ArgumentException)));
Add(new(new() { /*[SparkParameters.Type] = SparkServerTypeConstants.Databricks,*/ [SparkParameters.Token] = "abcdef", [SparkParameters.HostName] = "valid.server.com", [AdbcOptions.Uri] = "http://valid.hostname.com" }, typeof(ArgumentOutOfRangeException)));

// Tests for the new retry configuration parameters
Add(new(new() { [SparkParameters.Type] = SparkServerTypeConstants.Http, [SparkParameters.HostName] = "valid.server.com", [AdbcOptions.Username] = "user", [AdbcOptions.Password] = "myPassword", [DatabricksParameters.TemporarilyUnavailableRetry] = "invalid" }, typeof(ArgumentOutOfRangeException)));
Add(new(new() { [SparkParameters.Type] = SparkServerTypeConstants.Http, [SparkParameters.HostName] = "valid.server.com", [AdbcOptions.Username] = "user", [AdbcOptions.Password] = "myPassword", [DatabricksParameters.TemporarilyUnavailableRetryTimeout] = "invalid" }, typeof(ArgumentOutOfRangeException)));
Add(new(new() { [SparkParameters.Type] = SparkServerTypeConstants.Http, [SparkParameters.HostName] = "valid.server.com", [AdbcOptions.Username] = "user", [AdbcOptions.Password] = "myPassword", [DatabricksParameters.TemporarilyUnavailableRetryTimeout] = "-1" }, typeof(ArgumentOutOfRangeException)));
}
}
}
Expand Down
Loading
Loading