Skip to content

Commit 7cb8c0b

Browse files
sudhiremmadiSudhir Emmadi
andauthored
feat(csharp/src/Drivers): Enable setting BatchSizeStopCondition, MaxMessageSize & MaxFrameSize (#3684)
Co-authored-by: Sudhir Emmadi <emmadisudhir@microsoft.com>
1 parent 87c49e9 commit 7cb8c0b

15 files changed

Lines changed: 67 additions & 23 deletions

csharp/src/Drivers/Apache/ApacheParameters.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ public class ApacheParameters
2424
{
2525
public const string PollTimeMilliseconds = "adbc.apache.statement.polltime_ms";
2626
public const string BatchSize = "adbc.apache.statement.batch_size";
27+
public const string BatchSizeStopCondition = "adbc.apache.statement.batch_size_stop_condition";
2728
public const string QueryTimeoutSeconds = "adbc.apache.statement.query_timeout_s";
2829

2930
/// <summary>

csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
using Apache.Arrow.Ipc;
3636
using Apache.Arrow.Types;
3737
using Apache.Hive.Service.Rpc.Thrift;
38+
using Thrift;
3839
using Thrift.Protocol;
3940
using Thrift.Transport;
4041

@@ -44,6 +45,7 @@ internal abstract class HiveServer2Connection : TracingConnection
4445
{
4546
internal const bool InfoVendorSql = true;
4647
internal const long BatchSizeDefault = 50000;
48+
internal const bool EnableBatchSizeStopConditionDefault = false;
4749
internal const int PollTimeMillisecondsDefault = 500;
4850
internal static readonly string s_assemblyName = ApacheUtility.GetAssemblyName(typeof(HiveServer2Connection));
4951
internal static readonly string s_assemblyVersion = ApacheUtility.GetAssemblyVersion(typeof(HiveServer2Connection));
@@ -1717,5 +1719,23 @@ private static void ThrowErrorResponse(TStatus status, AdbcStatusCode adbcStatus
17171719
throw new HiveServer2Exception(status.ErrorMessage, adbcStatusCode)
17181720
.SetSqlState(status.SqlState)
17191721
.SetNativeError(status.ErrorCode);
1722+
1723+
protected TConfiguration GetTconfiguration()
1724+
{
1725+
var thriftConfig = new TConfiguration();
1726+
1727+
Properties.TryGetValue(ThriftTransportSizeConstants.MaxMessageSize, out string? maxMessageSize);
1728+
if (int.TryParse(maxMessageSize, out int maxMessageSizeValue) && maxMessageSizeValue > 0)
1729+
{
1730+
thriftConfig.MaxMessageSize = maxMessageSizeValue;
1731+
}
1732+
1733+
Properties.TryGetValue(ThriftTransportSizeConstants.MaxFrameSize, out string? maxFrameSize);
1734+
if (int.TryParse(maxFrameSize, out int maxFrameSizeValue) && maxFrameSizeValue > 0)
1735+
{
1736+
thriftConfig.MaxFrameSize = maxFrameSizeValue;
1737+
}
1738+
return thriftConfig;
1739+
}
17201740
}
17211741
}

csharp/src/Drivers/Apache/Hive2/HiveServer2ExtendedConnection.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,7 @@ internal override IArrowArrayStream NewReader<T>(T statement, Schema schema, IRe
6666
statement,
6767
schema,
6868
response,
69-
dataTypeConversion: statement.Connection.DataTypeConversion,
70-
enableBatchSizeStopCondition: false);
69+
dataTypeConversion: statement.Connection.DataTypeConversion);
7170

7271
internal override void SetPrecisionScaleAndTypeName(
7372
short colType,

csharp/src/Drivers/Apache/Hive2/HiveServer2HttpConnection.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ protected override TTransport CreateTransport()
150150
httpClient.DefaultRequestHeaders.AcceptEncoding.Add(new StringWithQualityHeaderValue("identity"));
151151
httpClient.DefaultRequestHeaders.ExpectContinue = false;
152152

153-
TConfiguration config = new();
153+
TConfiguration config = GetTconfiguration();
154154
THttpTransport transport = new(httpClient, config)
155155
{
156156
// This value can only be set before the first call/request. So if a new value for query timeout

csharp/src/Drivers/Apache/Hive2/HiveServer2Parameters.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,12 @@ public static class StandardTlsOptions
6565
public const string DisableServerCertificateValidation = "adbc.standard_options.tls.disable_server_certificate_validation";
6666
}
6767

68+
public static class ThriftTransportSizeConstants
69+
{
70+
public const string MaxMessageSize = "adbc.apache.thrift.client.max.message.size";
71+
public const string MaxFrameSize = "adbc.apache.thrift.client.max.frame.size";
72+
}
73+
6874
public static class HttpProxyOptions
6975
{
7076
/// <summary>

csharp/src/Drivers/Apache/Hive2/HiveServer2Reader.cs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,6 @@ internal class HiveServer2Reader : TracingReader
6060
private bool _disposed;
6161
private bool _hasNoMoreData = false;
6262
private readonly DataTypeConversion _dataTypeConversion;
63-
// Flag to enable/disable stopping reading based on batch size condition
64-
private readonly bool _enableBatchSizeStopCondition;
6563
private static readonly IReadOnlyDictionary<ArrowTypeId, Func<StringArray, IArrowType, IArrowArray>> s_arrowStringConverters =
6664
new Dictionary<ArrowTypeId, Func<StringArray, IArrowType, IArrowArray>>()
6765
{
@@ -79,14 +77,12 @@ public HiveServer2Reader(
7977
IHiveServer2Statement statement,
8078
Schema schema,
8179
IResponse response,
82-
DataTypeConversion dataTypeConversion,
83-
bool enableBatchSizeStopCondition = true) : base(statement)
80+
DataTypeConversion dataTypeConversion) : base(statement)
8481
{
8582
_statement = statement;
8683
Schema = schema;
8784
_response = response;
8885
_dataTypeConversion = dataTypeConversion;
89-
_enableBatchSizeStopCondition = enableBatchSizeStopCondition;
9086
}
9187

9288
public override Schema Schema { get; }
@@ -114,7 +110,7 @@ public HiveServer2Reader(
114110
int rowCount = GetRowCount(response.Results, columnCount);
115111
activity?.AddEvent(SemanticConventions.Messaging.Batch.Response, [new(SemanticConventions.Db.Response.ReturnedRows, rowCount)]);
116112

117-
if ((_enableBatchSizeStopCondition && _statement.BatchSize > 0 && rowCount < _statement.BatchSize) || rowCount == 0)
113+
if ((_statement.EnableBatchSizeStopCondition && _statement.BatchSize > 0 && rowCount < _statement.BatchSize) || rowCount == 0)
118114
{
119115
// This is the last batch
120116
_hasNoMoreData = true;

csharp/src/Drivers/Apache/Hive2/HiveServer2StandardConnection.cs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
using System.Threading;
2424
using System.Threading.Tasks;
2525
using Apache.Hive.Service.Rpc.Thrift;
26+
using Thrift;
2627
using Thrift.Protocol;
2728
using Thrift.Transport;
2829
using Thrift.Transport.Client;
@@ -108,6 +109,8 @@ protected override TTransport CreateTransport()
108109
bool connectClient = false;
109110
int portValue = int.Parse(port!);
110111

112+
TConfiguration thriftConfig = GetTconfiguration();
113+
111114
// TLS setup
112115
TTransport baseTransport;
113116
if (TlsOptions.IsTlsEnabled)
@@ -120,16 +123,16 @@ protected override TTransport CreateTransport()
120123

121124
if (IPAddress.TryParse(hostName!, out var ipAddress))
122125
{
123-
baseTransport = new TTlsSocketTransport(ipAddress, portValue, config: new(), 0, trustedCert, certValidator);
126+
baseTransport = new TTlsSocketTransport(ipAddress, portValue, config: thriftConfig, 0, trustedCert, certValidator);
124127
}
125128
else
126129
{
127-
baseTransport = new TTlsSocketTransport(hostName!, portValue, config: new(), 0, trustedCert, certValidator);
130+
baseTransport = new TTlsSocketTransport(hostName!, portValue, config: thriftConfig, 0, trustedCert, certValidator);
128131
}
129132
}
130133
else
131134
{
132-
baseTransport = new TSocketTransport(hostName!, portValue, connectClient, config: new());
135+
baseTransport = new TSocketTransport(hostName!, portValue, connectClient, config: thriftConfig);
133136
}
134137

135138
TBufferedTransport bufferedTransport = new TBufferedTransport(baseTransport);
@@ -148,7 +151,7 @@ protected override TTransport CreateTransport()
148151
}
149152

150153
PlainSaslMechanism saslMechanism = new(username, password);
151-
TSaslTransport saslTransport = new(bufferedTransport, saslMechanism, config: new());
154+
TSaslTransport saslTransport = new(bufferedTransport, saslMechanism, config: thriftConfig);
152155
return new TFramedTransport(saslTransport);
153156

154157
default:

csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,12 @@ public override void SetOption(string key, string value)
271271
case ApacheParameters.BatchSize:
272272
UpdateBatchSizeIfValid(key, value);
273273
break;
274+
case ApacheParameters.BatchSizeStopCondition:
275+
if (ApacheUtility.BooleanIsValid(key, value, out bool enableBatchSizeStopCondition))
276+
{
277+
EnableBatchSizeStopCondition = enableBatchSizeStopCondition;
278+
}
279+
break;
274280
case ApacheParameters.QueryTimeoutSeconds:
275281
if (ApacheUtility.QueryTimeoutIsValid(key, value, out int queryTimeoutSeconds))
276282
{
@@ -352,6 +358,8 @@ protected async Task<IResponse> ExecuteStatementAsync(CancellationToken cancella
352358

353359
public virtual long BatchSize { get; protected set; } = HiveServer2Connection.BatchSizeDefault;
354360

361+
public bool EnableBatchSizeStopCondition { get; protected set; } = HiveServer2Connection.EnableBatchSizeStopConditionDefault;
362+
355363
public int QueryTimeoutSeconds
356364
{
357365
// Coordinate updates with the connection

csharp/src/Drivers/Apache/Hive2/IHiveServer2Statement.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,11 @@ internal interface IHiveServer2Statement : ITracingStatement
5454
/// </summary>
5555
long BatchSize { get; }
5656

57+
/// <summary>
58+
/// Flag to enable/disable stopping reading based on batch size condition
59+
/// </summary>
60+
bool EnableBatchSizeStopCondition { get; }
61+
5762
/// <summary>
5863
/// Gets the connection associated with this statement.
5964
/// </summary>

csharp/src/Drivers/Apache/Hive2/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ but can also be passed in the call to `AdbcDatabase.Connect`.
6565
| `adbc.proxy_options.proxy_uid` | Username for proxy authentication. Only feature-complete in Spark driver. Required when proxy_auth is True | |
6666
| `adbc.proxy_options.proxy_pwd` | Password for proxy authentication. Only feature-complete in Spark driver. Required when proxy_auth is True | |
6767
| `adbc.telemetry.trace_parent` | The [trace parent](https://www.w3.org/TR/trace-context/#traceparent-header) identifier for an existing [trace context](https://www.w3.org/TR/trace-context/) \(span/activity\) in a tracing system. This option is most likely to be set using `Statement.SetOption` to set the trace parent for driver interaction with a specific `Statement`. However, it can also be set using `Driver.Open`, `Database.Connect` or `Connection.SetOption` to set the trace parent for all interactions with the driver on that specific `Connection`. | |
68+
| `adbc.apache.statement.batch_size_stop_condition` | Flag to enable/disable stopping reading based on batch size condition | `False` |
6869

6970
## Timeout Configuration
7071

0 commit comments

Comments
 (0)