Skip to content

Commit 26d96b1

Browse files
authored
[To dev/1.3] Fix SessionPool client leak on reconnection and query failures, and preserve server error messages (#45)
* [To dev/1.3] Fix SessionPool client leak on reconnection and query failures, and preserve server error messages * fix ByteBuffer build error ,use Array.Reverse()
1 parent 69f2337 commit 26d96b1

File tree

9 files changed

+815
-95
lines changed

9 files changed

+815
-95
lines changed

docs/SessionPool_Exception_Handling.md

Lines changed: 487 additions & 0 deletions
Large diffs are not rendered by default.

src/Apache.IoTDB.Data/IoTDBDataReader.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ internal IoTDBDataReader(IoTDBCommand IoTDBCommand, SessionDataSet dataSet, bool
5656
_command = IoTDBCommand;
5757
_closeConnection = closeConnection;
5858
_fieldCount = dataSet.ColumnNames.Count;
59-
_hasRows = dataSet.RowCount > 0;
60-
_recordsAffected = dataSet.RowCount;
59+
_hasRows = dataSet.CurrentBatchRowCount() > 0;
60+
_recordsAffected = -1; // Total row count is unknown; use -1 per ADO.NET convention
6161
_closed = _closeConnection;
6262
_metas = dataSet.ColumnNames;
6363
_dataSet = dataSet;

src/Apache.IoTDB/ConcurrentClientQueue.cs

Lines changed: 38 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ namespace Apache.IoTDB
2828
public class ConcurrentClientQueue
2929
{
3030
public ConcurrentQueue<Client> ClientQueue { get; }
31+
internal IPoolDiagnosticReporter DiagnosticReporter { get; set; }
3132

3233
public ConcurrentClientQueue(List<Client> clients)
3334
{
@@ -42,55 +43,62 @@ public ConcurrentClientQueue()
4243
public void Return(Client client)
4344
{
4445
Monitor.Enter(ClientQueue);
45-
ClientQueue.Enqueue(client);
46-
Monitor.PulseAll(ClientQueue); // wake up all threads waiting on the queue, refresh the waiting time
47-
Monitor.Exit(ClientQueue);
48-
Thread.Sleep(0);
49-
}
50-
int _ref = 0;
51-
public void AddRef()
52-
{
53-
lock (this)
46+
try
5447
{
55-
_ref++;
48+
ClientQueue.Enqueue(client);
49+
Monitor.PulseAll(ClientQueue); // wake up all threads waiting on the queue, refresh the waiting time
5650
}
57-
}
58-
public int GetRef()
59-
{
60-
return _ref;
61-
}
62-
public void RemoveRef()
63-
{
64-
lock (this)
51+
finally
6552
{
66-
_ref--;
53+
Monitor.Exit(ClientQueue);
6754
}
55+
Thread.Sleep(0);
6856
}
57+
private int _ref = 0;
58+
public void AddRef() => Interlocked.Increment(ref _ref);
59+
public int GetRef() => Volatile.Read(ref _ref);
60+
public void RemoveRef() => Interlocked.Decrement(ref _ref);
6961
public int Timeout { get; set; } = 10;
7062
public Client Take()
7163
{
7264
Client client = null;
7365
Monitor.Enter(ClientQueue);
74-
while (true)
66+
try
7567
{
76-
bool timeout = false;
77-
if (ClientQueue.IsEmpty)
68+
while (true)
7869
{
79-
timeout = !Monitor.Wait(ClientQueue, TimeSpan.FromSeconds(Timeout));
80-
}
81-
ClientQueue.TryDequeue(out client);
70+
bool timeout = false;
71+
if (ClientQueue.IsEmpty)
72+
{
73+
timeout = !Monitor.Wait(ClientQueue, TimeSpan.FromSeconds(Timeout));
74+
}
75+
ClientQueue.TryDequeue(out client);
8276

83-
if (client != null || timeout)
84-
{
85-
break;
77+
if (client != null || timeout)
78+
{
79+
break;
80+
}
8681
}
8782
}
88-
Monitor.Exit(ClientQueue);
83+
finally
84+
{
85+
Monitor.Exit(ClientQueue);
86+
}
8987
if (client == null)
9088
{
91-
throw new TimeoutException($"Connection pool is empty and wait time out({Timeout}s)!");
89+
var reasonPhrase = $"Connection pool is empty and wait time out({Timeout}s)";
90+
if (DiagnosticReporter != null)
91+
{
92+
throw DiagnosticReporter.BuildDepletionException(reasonPhrase);
93+
}
94+
throw new TimeoutException(reasonPhrase);
9295
}
9396
return client;
9497
}
9598
}
99+
100+
internal interface IPoolDiagnosticReporter
101+
{
102+
SessionPoolDepletedException BuildDepletionException(string reasonPhrase);
103+
}
96104
}

src/Apache.IoTDB/DataStructure/ByteBuffer.cs

Lines changed: 44 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
*/
1919

2020
using System;
21-
using System.Linq;
2221
using System.Text;
2322

2423
namespace Apache.IoTDB.DataStructure
@@ -70,9 +69,12 @@ public bool GetBool()
7069
public int GetInt()
7170
{
7271
var intBuff = _buffer[_readPos..(_readPos + 4)];
73-
if (_isLittleEndian) intBuff = intBuff.Reverse().ToArray();
72+
if (_isLittleEndian)
73+
{
74+
Array.Reverse(intBuff);
75+
}
7476
#if NET461_OR_GREATER || NETSTANDARD2_0
75-
var intValue = BitConverter.ToInt32(intBuff,0);
77+
var intValue = BitConverter.ToInt32(intBuff, 0);
7678
#else
7779
var intValue = BitConverter.ToInt32(intBuff);
7880
#endif
@@ -84,10 +86,12 @@ public int GetInt()
8486
public long GetLong()
8587
{
8688
var longBuff = _buffer[_readPos..(_readPos + 8)];
87-
88-
if (_isLittleEndian) longBuff = longBuff.Reverse().ToArray();
89+
if (_isLittleEndian)
90+
{
91+
Array.Reverse(longBuff);
92+
}
8993
#if NET461_OR_GREATER || NETSTANDARD2_0
90-
var longValue = BitConverter.ToInt64(longBuff,0);
94+
var longValue = BitConverter.ToInt64(longBuff, 0);
9195
#else
9296
var longValue = BitConverter.ToInt64(longBuff);
9397
#endif
@@ -99,10 +103,12 @@ public long GetLong()
99103
public float GetFloat()
100104
{
101105
var floatBuff = _buffer[_readPos..(_readPos + 4)];
102-
103-
if (_isLittleEndian) floatBuff = floatBuff.Reverse().ToArray();
106+
if (_isLittleEndian)
107+
{
108+
Array.Reverse(floatBuff);
109+
}
104110
#if NET461_OR_GREATER || NETSTANDARD2_0
105-
var floatValue = BitConverter.ToSingle(floatBuff,0);
111+
var floatValue = BitConverter.ToSingle(floatBuff, 0);
106112
#else
107113
var floatValue = BitConverter.ToSingle(floatBuff);
108114
#endif
@@ -113,10 +119,12 @@ public float GetFloat()
113119
public double GetDouble()
114120
{
115121
var doubleBuff = _buffer[_readPos..(_readPos + 8)];
116-
117-
if (_isLittleEndian) doubleBuff = doubleBuff.Reverse().ToArray();
122+
if (_isLittleEndian)
123+
{
124+
Array.Reverse(doubleBuff);
125+
}
118126
#if NET461_OR_GREATER || NETSTANDARD2_0
119-
var doubleValue = BitConverter.ToDouble(doubleBuff,0);
127+
var doubleValue = BitConverter.ToDouble(doubleBuff, 0);
120128
#else
121129
var doubleValue = BitConverter.ToDouble(doubleBuff);
122130
#endif
@@ -162,8 +170,10 @@ private void ExtendBuffer(int spaceNeed)
162170
public void AddBool(bool value)
163171
{
164172
var boolBuffer = BitConverter.GetBytes(value);
165-
166-
if (_isLittleEndian) boolBuffer = boolBuffer.Reverse().ToArray();
173+
if (_isLittleEndian)
174+
{
175+
Array.Reverse(boolBuffer);
176+
}
167177

168178
ExtendBuffer(boolBuffer.Length);
169179
boolBuffer.CopyTo(_buffer, _writePos);
@@ -173,8 +183,10 @@ public void AddBool(bool value)
173183
public void AddInt(int value)
174184
{
175185
var intBuff = BitConverter.GetBytes(value);
176-
177-
if (_isLittleEndian) intBuff = intBuff.Reverse().ToArray();
186+
if (_isLittleEndian)
187+
{
188+
Array.Reverse(intBuff);
189+
}
178190

179191
ExtendBuffer(intBuff.Length);
180192
intBuff.CopyTo(_buffer, _writePos);
@@ -184,8 +196,10 @@ public void AddInt(int value)
184196
public void AddLong(long value)
185197
{
186198
var longBuff = BitConverter.GetBytes(value);
187-
188-
if (_isLittleEndian) longBuff = longBuff.Reverse().ToArray();
199+
if (_isLittleEndian)
200+
{
201+
Array.Reverse(longBuff);
202+
}
189203

190204
ExtendBuffer(longBuff.Length);
191205
longBuff.CopyTo(_buffer, _writePos);
@@ -195,8 +209,10 @@ public void AddLong(long value)
195209
public void AddFloat(float value)
196210
{
197211
var floatBuff = BitConverter.GetBytes(value);
198-
199-
if (_isLittleEndian) floatBuff = floatBuff.Reverse().ToArray();
212+
if (_isLittleEndian)
213+
{
214+
Array.Reverse(floatBuff);
215+
}
200216

201217
ExtendBuffer(floatBuff.Length);
202218
floatBuff.CopyTo(_buffer, _writePos);
@@ -206,8 +222,10 @@ public void AddFloat(float value)
206222
public void AddDouble(double value)
207223
{
208224
var doubleBuff = BitConverter.GetBytes(value);
209-
210-
if (_isLittleEndian) doubleBuff = doubleBuff.Reverse().ToArray();
225+
if (_isLittleEndian)
226+
{
227+
Array.Reverse(doubleBuff);
228+
}
211229

212230
ExtendBuffer(doubleBuff.Length);
213231
doubleBuff.CopyTo(_buffer, _writePos);
@@ -237,8 +255,10 @@ public void AddBinary(byte[] value)
237255
public void AddChar(char value)
238256
{
239257
var charBuf = BitConverter.GetBytes(value);
240-
241-
if (_isLittleEndian) charBuf = charBuf.Reverse().ToArray();
258+
if (_isLittleEndian)
259+
{
260+
Array.Reverse(charBuf);
261+
}
242262

243263
ExtendBuffer(charBuf.Length);
244264
charBuf.CopyTo(_buffer, _writePos);

src/Apache.IoTDB/DataStructure/SessionDataSet.cs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,22 @@ public class SessionDataSet : System.IDisposable
5252
private int Flag => 0x80;
5353
private int DefaultTimeout => 10000;
5454
public int FetchSize { get; set; }
55+
56+
/// <summary>
57+
/// Gets the number of rows in the current fetched batch.
58+
/// Note: This is NOT the total row count of the query result. Use HasNext() to check for more data.
59+
/// </summary>
60+
/// <returns>The number of rows in the current batch.</returns>
61+
public int CurrentBatchRowCount() => _currentBatchRowCount;
62+
63+
/// <summary>
64+
/// Gets the number of rows in the current fetched batch.
65+
/// </summary>
66+
/// <returns>The number of rows in the current batch.</returns>
67+
[Obsolete("Use CurrentBatchRowCount() instead. This property returns batch size, not total row count.")]
5568
public int RowCount { get; set; }
69+
70+
private int _currentBatchRowCount;
5671
public SessionDataSet(string sql, TSExecuteStatementResp resp, Client client, ConcurrentClientQueue clientQueue, long statementId)
5772
{
5873
_clientQueue = clientQueue;
@@ -74,7 +89,8 @@ public SessionDataSet(string sql, TSExecuteStatementResp resp, Client client, Co
7489
// some internal variable
7590
_hasCatchedResult = false;
7691
_rowIndex = 0;
77-
RowCount = _queryDataset.Time.Length / sizeof(long);
92+
_currentBatchRowCount = _queryDataset.Time.Length / sizeof(long);
93+
RowCount = _currentBatchRowCount;
7894

7995
_columnNames = resp.Columns;
8096
_columnTypeLst = resp.DataTypeList;
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
using System.Threading;
21+
22+
namespace Apache.IoTDB
23+
{
24+
/// <summary>
25+
/// Encapsulates real-time health statistics for connection pool monitoring.
26+
/// Thread-safe implementation for concurrent access patterns.
27+
/// </summary>
28+
internal class PoolHealthMetrics
29+
{
30+
private int _reconnectionFailureTally;
31+
private readonly int _configuredMaxSize;
32+
33+
public PoolHealthMetrics(int configuredMaxSize)
34+
{
35+
_configuredMaxSize = configuredMaxSize;
36+
}
37+
38+
public void IncrementReconnectionFailures()
39+
{
40+
Interlocked.Increment(ref _reconnectionFailureTally);
41+
}
42+
43+
public void ResetAllCounters()
44+
{
45+
Interlocked.Exchange(ref _reconnectionFailureTally, 0);
46+
}
47+
48+
public int GetReconnectionFailureTally() => Volatile.Read(ref _reconnectionFailureTally);
49+
50+
public int GetConfiguredMaxSize() => _configuredMaxSize;
51+
}
52+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
using System;
21+
using Thrift;
22+
23+
namespace Apache.IoTDB
24+
{
25+
/// <summary>
26+
/// Exception thrown when all reconnection attempts to the server have failed.
27+
/// This exception is used internally to distinguish reconnection failures from other errors.
28+
/// </summary>
29+
internal class ReconnectionFailedException : TException
30+
{
31+
internal ReconnectionFailedException(string message)
32+
: base(message, null)
33+
{
34+
}
35+
36+
internal ReconnectionFailedException(string message, Exception innerException)
37+
: base(message, innerException)
38+
{
39+
}
40+
}
41+
}

0 commit comments

Comments
 (0)