Skip to content

Commit a3a39d8

Browse files
authored
feat(csharp/src/Drivers/Databricks): Make Cloud Fetch options configurable at the connection level (#2691)
- Currently, cloud fetch enablement is only configurable at the statement level, and this is not exposed to clients outside of arrow-adbc repo - Make these options configurable at the driver/connection level Tested E2E by adding a log for the result set format and whether lz4 compression is enabled - `{ "adbc.databricks.cloudfetch.enabled", "false" }` - `DatabricksConnection.NewReader: resultFormat=ARROW_BASED_SET, isLz4Compressed=True` - `{ "adbc.databricks.cloudfetch.enabled", "false" }, { "adbc.databricks.cloudfetch.lz4.enabled", "false" },` - `DatabricksConnection.NewReader: resultFormat=ARROW_BASED_SET, isLz4Compressed=False` - empty - `DatabricksConnection.NewReader: resultFormat=URL_BASED_SET, isLz4Compressed=True` - `{ "adbc.databricks.cloudfetch.lz4.enabled", "false" },` - `DatabricksConnection.NewReader: resultFormat=URL_BASED_SET, isLz4Compressed=False` Also tested with `dotnet test --filter "FullyQualifiedName~CloudFetchE2ETest"`
1 parent 6a60a13 commit a3a39d8

6 files changed

Lines changed: 128 additions & 30 deletions

File tree

csharp/src/Drivers/Databricks/DatabricksConnection.cs

Lines changed: 78 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,72 @@ internal class DatabricksConnection : SparkHttpConnection
3333
{
3434
private bool _applySSPWithQueries = false;
3535

36+
// CloudFetch configuration
37+
private const long DefaultMaxBytesPerFile = 20 * 1024 * 1024; // 20MB
38+
39+
private bool _useCloudFetch = true;
40+
private bool _canDecompressLz4 = true;
41+
private long _maxBytesPerFile = DefaultMaxBytesPerFile;
42+
3643
public DatabricksConnection(IReadOnlyDictionary<string, string> properties) : base(properties)
3744
{
38-
if (Properties.TryGetValue(DatabricksParameters.ApplySSPWithQueries, out string? applySSPWithQueriesStr) &&
39-
bool.TryParse(applySSPWithQueriesStr, out bool applySSPWithQueriesValue))
45+
ValidateProperties();
46+
}
47+
48+
private void ValidateProperties()
49+
{
50+
if (Properties.TryGetValue(DatabricksParameters.ApplySSPWithQueries, out string? applySSPWithQueriesStr))
51+
{
52+
if (bool.TryParse(applySSPWithQueriesStr, out bool applySSPWithQueriesValue))
53+
{
54+
_applySSPWithQueries = applySSPWithQueriesValue;
55+
}
56+
else
57+
{
58+
throw new ArgumentException($"Parameter '{DatabricksParameters.ApplySSPWithQueries}' value '{applySSPWithQueriesStr}' could not be parsed. Valid values are 'true' and 'false'.");
59+
}
60+
}
61+
62+
// Parse CloudFetch options from connection properties
63+
if (Properties.TryGetValue(DatabricksParameters.UseCloudFetch, out string? useCloudFetchStr))
64+
{
65+
if (bool.TryParse(useCloudFetchStr, out bool useCloudFetchValue))
66+
{
67+
_useCloudFetch = useCloudFetchValue;
68+
}
69+
else
70+
{
71+
throw new ArgumentException($"Parameter '{DatabricksParameters.UseCloudFetch}' value '{useCloudFetchStr}' could not be parsed. Valid values are 'true' and 'false'.");
72+
}
73+
}
74+
75+
if (Properties.TryGetValue(DatabricksParameters.CanDecompressLz4, out string? canDecompressLz4Str))
76+
{
77+
if (bool.TryParse(canDecompressLz4Str, out bool canDecompressLz4Value))
78+
{
79+
_canDecompressLz4 = canDecompressLz4Value;
80+
}
81+
else
82+
{
83+
throw new ArgumentException($"Parameter '{DatabricksParameters.CanDecompressLz4}' value '{canDecompressLz4Str}' could not be parsed. Valid values are 'true' and 'false'.");
84+
}
85+
}
86+
87+
if (Properties.TryGetValue(DatabricksParameters.MaxBytesPerFile, out string? maxBytesPerFileStr))
4088
{
41-
_applySSPWithQueries = applySSPWithQueriesValue;
89+
if (!long.TryParse(maxBytesPerFileStr, out long maxBytesPerFileValue))
90+
{
91+
throw new ArgumentException($"Parameter '{DatabricksParameters.MaxBytesPerFile}' value '{maxBytesPerFileStr}' could not be parsed. Valid values are positive integers.");
92+
}
93+
94+
if (maxBytesPerFileValue <= 0)
95+
{
96+
throw new ArgumentOutOfRangeException(
97+
nameof(Properties),
98+
maxBytesPerFileValue,
99+
$"Parameter '{DatabricksParameters.MaxBytesPerFile}' value must be a positive integer.");
100+
}
101+
_maxBytesPerFile = maxBytesPerFileValue;
42102
}
43103
}
44104

@@ -47,6 +107,21 @@ public DatabricksConnection(IReadOnlyDictionary<string, string> properties) : ba
47107
/// </summary>
48108
internal bool ApplySSPWithQueries => _applySSPWithQueries;
49109

110+
/// <summary>
111+
/// Gets whether CloudFetch is enabled.
112+
/// </summary>
113+
internal bool UseCloudFetch => _useCloudFetch;
114+
115+
/// <summary>
116+
/// Gets whether LZ4 decompression is enabled.
117+
/// </summary>
118+
internal bool CanDecompressLz4 => _canDecompressLz4;
119+
120+
/// <summary>
121+
/// Gets the maximum bytes per file for CloudFetch.
122+
/// </summary>
123+
internal long MaxBytesPerFile => _maxBytesPerFile;
124+
50125
internal override IArrowArrayStream NewReader<T>(T statement, Schema schema, TGetResultSetMetadataResp? metadataResp = null)
51126
{
52127
// Get result format from metadata response if available

csharp/src/Drivers/Databricks/DatabricksParameters.cs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,24 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
2525
public class DatabricksParameters : SparkParameters
2626
{
2727
// CloudFetch configuration parameters
28+
/// <summary>
29+
/// Whether to use CloudFetch for retrieving results.
30+
/// Default value is true if not specified.
31+
/// </summary>
32+
public const string UseCloudFetch = "adbc.databricks.cloudfetch.enabled";
33+
34+
/// <summary>
35+
/// Whether the client can decompress LZ4 compressed results.
36+
/// Default value is true if not specified.
37+
/// </summary>
38+
public const string CanDecompressLz4 = "adbc.databricks.cloudfetch.lz4.enabled";
39+
40+
/// <summary>
41+
/// Maximum bytes per file for CloudFetch.
42+
/// Default value is 20MB if not specified.
43+
/// </summary>
44+
public const string MaxBytesPerFile = "adbc.databricks.cloudfetch.max_bytes_per_file";
45+
2846
/// <summary>
2947
/// Maximum number of retry attempts for CloudFetch downloads.
3048
/// Default value is 3 if not specified.

csharp/src/Drivers/Databricks/DatabricksStatement.cs

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -27,18 +27,17 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
2727
/// </summary>
2828
internal class DatabricksStatement : SparkStatement
2929
{
30-
// Default maximum bytes per file for CloudFetch
31-
private const long DefaultMaxBytesPerFile = 20 * 1024 * 1024; // 20MB
32-
33-
// CloudFetch configuration
34-
private bool useCloudFetch = true;
35-
private bool canDecompressLz4 = true;
36-
private long maxBytesPerFile = DefaultMaxBytesPerFile;
30+
private bool useCloudFetch;
31+
private bool canDecompressLz4;
32+
private long maxBytesPerFile;
3733

3834
public DatabricksStatement(DatabricksConnection connection)
3935
: base(connection)
4036
{
41-
37+
// Inherit CloudFetch settings from connection
38+
useCloudFetch = connection.UseCloudFetch;
39+
canDecompressLz4 = connection.CanDecompressLz4;
40+
maxBytesPerFile = connection.MaxBytesPerFile;
4241
}
4342

4443
protected override void SetStatementProperties(TExecuteStatementReq statement)
@@ -55,7 +54,7 @@ public override void SetOption(string key, string value)
5554
{
5655
switch (key)
5756
{
58-
case Options.UseCloudFetch:
57+
case DatabricksParameters.UseCloudFetch:
5958
if (bool.TryParse(value, out bool useCloudFetchValue))
6059
{
6160
this.useCloudFetch = useCloudFetchValue;
@@ -65,7 +64,7 @@ public override void SetOption(string key, string value)
6564
throw new ArgumentException($"Invalid value for {key}: {value}. Expected a boolean value.");
6665
}
6766
break;
68-
case Options.CanDecompressLz4:
67+
case DatabricksParameters.CanDecompressLz4:
6968
if (bool.TryParse(value, out bool canDecompressLz4Value))
7069
{
7170
this.canDecompressLz4 = canDecompressLz4Value;
@@ -75,7 +74,7 @@ public override void SetOption(string key, string value)
7574
throw new ArgumentException($"Invalid value for {key}: {value}. Expected a boolean value.");
7675
}
7776
break;
78-
case Options.MaxBytesPerFile:
77+
case DatabricksParameters.MaxBytesPerFile:
7978
if (long.TryParse(value, out long maxBytesPerFileValue))
8079
{
8180
this.maxBytesPerFile = maxBytesPerFileValue;
@@ -132,16 +131,5 @@ internal void SetMaxBytesPerFile(long maxBytesPerFile)
132131
{
133132
this.maxBytesPerFile = maxBytesPerFile;
134133
}
135-
136-
/// <summary>
137-
/// Provides the constant string key values to the <see cref="AdbcStatement.SetOption(string, string)" /> method.
138-
/// </summary>
139-
public sealed class Options : ApacheParameters
140-
{
141-
// CloudFetch options
142-
public const string UseCloudFetch = "adbc.databricks.cloudfetch.enabled";
143-
public const string CanDecompressLz4 = "adbc.databricks.cloudfetch.lz4.enabled";
144-
public const string MaxBytesPerFile = "adbc.databricks.cloudfetch.max_bytes_per_file";
145-
}
146134
}
147135
}

csharp/test/Drivers/Databricks/CloudFetchE2ETest.cs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,25 @@ public async Task TestRealDatabricksCloudFetchLargeResultSet()
5050
await TestRealDatabricksCloudFetchLargeQuery("SELECT * FROM main.tpcds_sf10_delta.catalog_sales LIMIT 1000000", 1000000);
5151
}
5252

53-
private async Task TestRealDatabricksCloudFetchLargeQuery(string query, int rowCount)
53+
[Fact]
54+
public async Task TestRealDatabricksNoCloudFetchSmallResultSet()
55+
{
56+
await TestRealDatabricksCloudFetchLargeQuery("SELECT * FROM range(1000)", 1000, false);
57+
}
58+
59+
[Fact]
60+
public async Task TestRealDatabricksNoCloudFetchLargeResultSet()
61+
{
62+
await TestRealDatabricksCloudFetchLargeQuery("SELECT * FROM main.tpcds_sf10_delta.catalog_sales LIMIT 1000000", 1000000, false);
63+
}
64+
65+
private async Task TestRealDatabricksCloudFetchLargeQuery(string query, int rowCount, bool useCloudFetch = true)
5466
{
5567
// Create a statement with CloudFetch enabled
5668
var statement = Connection.CreateStatement();
57-
statement.SetOption(DatabricksStatement.Options.UseCloudFetch, "true");
58-
statement.SetOption(DatabricksStatement.Options.CanDecompressLz4, "true");
59-
statement.SetOption(DatabricksStatement.Options.MaxBytesPerFile, "10485760"); // 10MB
69+
statement.SetOption(DatabricksParameters.UseCloudFetch, useCloudFetch.ToString());
70+
statement.SetOption(DatabricksParameters.CanDecompressLz4, "true");
71+
statement.SetOption(DatabricksParameters.MaxBytesPerFile, "10485760"); // 10MB
6072

6173
// Execute a query that generates a large result set using range function
6274
statement.SqlQuery = query;

csharp/test/Drivers/Databricks/DatabricksConnectionTest.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
using Apache.Arrow.Adbc.Drivers.Apache;
2323
using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
2424
using Apache.Arrow.Adbc.Drivers.Apache.Spark;
25+
using Apache.Arrow.Adbc.Drivers.Databricks;
2526
using Thrift.Transport;
2627
using Xunit;
2728
using Xunit.Abstractions;
@@ -295,6 +296,10 @@ public InvalidConnectionParametersTestData()
295296
Add(new([], typeof(ArgumentException)));
296297
Add(new(new() { [SparkParameters.Type] = " " }, typeof(ArgumentException)));
297298
Add(new(new() { [SparkParameters.Type] = "xxx" }, typeof(ArgumentException)));
299+
Add(new(new() { [SparkParameters.HostName] = "valid.server.com", [SparkParameters.Token] = "abcdef", [DatabricksParameters.UseCloudFetch] = "notabool" }, typeof(ArgumentException)));
300+
Add(new(new() { [SparkParameters.HostName] = "valid.server.com", [SparkParameters.Token] = "abcdef", [DatabricksParameters.CanDecompressLz4] = "notabool"}, typeof(ArgumentException)));
301+
Add(new(new() { [SparkParameters.HostName] = "valid.server.com", [SparkParameters.Token] = "abcdef", [DatabricksParameters.MaxBytesPerFile] = "notanumber" }, typeof(ArgumentException)));
302+
Add(new(new() { [SparkParameters.HostName] = "valid.server.com", [SparkParameters.Token] = "abcdef", [DatabricksParameters.MaxBytesPerFile] = "-100" }, typeof(ArgumentOutOfRangeException)));
298303
Add(new(new() { /*[SparkParameters.Type] = SparkServerTypeConstants.Databricks,*/ [SparkParameters.HostName] = "valid.server.com", [SparkParameters.Token] = "abcdef", [SparkParameters.Port] = "-1" }, typeof(ArgumentOutOfRangeException)));
299304
Add(new(new() { /*[SparkParameters.Type] = SparkServerTypeConstants.Databricks,*/ [SparkParameters.HostName] = "valid.server.com", [SparkParameters.Token] = "abcdef", [SparkParameters.Port] = IPEndPoint.MinPort.ToString(CultureInfo.InvariantCulture) }, typeof(ArgumentOutOfRangeException)));
300305
Add(new(new() { /*[SparkParameters.Type] = SparkServerTypeConstants.Databricks,*/ [SparkParameters.HostName] = "valid.server.com", [SparkParameters.Token] = "abcdef", [SparkParameters.Port] = (IPEndPoint.MaxPort + 1).ToString(CultureInfo.InvariantCulture) }, typeof(ArgumentOutOfRangeException)));

csharp/test/Drivers/Databricks/StatementTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public async Task LZ4DecompressionCapabilityTest(bool useCloudFetch, string conf
4444
using var statement = connection.CreateStatement();
4545

4646
// Set options for LZ4 decompression (enabled by default) and CloudFetch as specified
47-
statement.SetOption(DatabricksStatement.Options.UseCloudFetch, useCloudFetch.ToString().ToLower());
47+
statement.SetOption(DatabricksParameters.UseCloudFetch, useCloudFetch.ToString().ToLower());
4848
OutputHelper?.WriteLine($"CloudFetch is {(useCloudFetch ? "enabled" : "disabled")}");
4949
OutputHelper?.WriteLine("LZ4 decompression capability is enabled by default");
5050

0 commit comments

Comments
 (0)