Skip to content

Commit 5b0e6bf

Browse files
committed
[To dev/1.3] Fix SessionPool client leak on reconnection and query failures, and preserve server error messages
1 parent 69f2337 commit 5b0e6bf

File tree

8 files changed

+771
-71
lines changed

8 files changed

+771
-71
lines changed

docs/SessionPool_Exception_Handling.md

Lines changed: 487 additions & 0 deletions
Large diffs are not rendered by default.

src/Apache.IoTDB.Data/IoTDBDataReader.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ internal IoTDBDataReader(IoTDBCommand IoTDBCommand, SessionDataSet dataSet, bool
5656
_command = IoTDBCommand;
5757
_closeConnection = closeConnection;
5858
_fieldCount = dataSet.ColumnNames.Count;
59-
_hasRows = dataSet.RowCount > 0;
60-
_recordsAffected = dataSet.RowCount;
59+
_hasRows = dataSet.CurrentBatchRowCount() > 0;
60+
_recordsAffected = -1; // Total row count is unknown; use -1 per ADO.NET convention
6161
_closed = _closeConnection;
6262
_metas = dataSet.ColumnNames;
6363
_dataSet = dataSet;

src/Apache.IoTDB/ConcurrentClientQueue.cs

Lines changed: 38 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ namespace Apache.IoTDB
2828
public class ConcurrentClientQueue
2929
{
3030
public ConcurrentQueue<Client> ClientQueue { get; }
31+
internal IPoolDiagnosticReporter DiagnosticReporter { get; set; }
3132

3233
public ConcurrentClientQueue(List<Client> clients)
3334
{
@@ -42,55 +43,62 @@ public ConcurrentClientQueue()
4243
public void Return(Client client)
4344
{
4445
Monitor.Enter(ClientQueue);
45-
ClientQueue.Enqueue(client);
46-
Monitor.PulseAll(ClientQueue); // wake up all threads waiting on the queue, refresh the waiting time
47-
Monitor.Exit(ClientQueue);
48-
Thread.Sleep(0);
49-
}
50-
int _ref = 0;
51-
public void AddRef()
52-
{
53-
lock (this)
46+
try
5447
{
55-
_ref++;
48+
ClientQueue.Enqueue(client);
49+
Monitor.PulseAll(ClientQueue); // wake up all threads waiting on the queue, refresh the waiting time
5650
}
57-
}
58-
public int GetRef()
59-
{
60-
return _ref;
61-
}
62-
public void RemoveRef()
63-
{
64-
lock (this)
51+
finally
6552
{
66-
_ref--;
53+
Monitor.Exit(ClientQueue);
6754
}
55+
Thread.Sleep(0);
6856
}
57+
private int _ref = 0;
58+
public void AddRef() => Interlocked.Increment(ref _ref);
59+
public int GetRef() => Volatile.Read(ref _ref);
60+
public void RemoveRef() => Interlocked.Decrement(ref _ref);
6961
public int Timeout { get; set; } = 10;
7062
public Client Take()
7163
{
7264
Client client = null;
7365
Monitor.Enter(ClientQueue);
74-
while (true)
66+
try
7567
{
76-
bool timeout = false;
77-
if (ClientQueue.IsEmpty)
68+
while (true)
7869
{
79-
timeout = !Monitor.Wait(ClientQueue, TimeSpan.FromSeconds(Timeout));
80-
}
81-
ClientQueue.TryDequeue(out client);
70+
bool timeout = false;
71+
if (ClientQueue.IsEmpty)
72+
{
73+
timeout = !Monitor.Wait(ClientQueue, TimeSpan.FromSeconds(Timeout));
74+
}
75+
ClientQueue.TryDequeue(out client);
8276

83-
if (client != null || timeout)
84-
{
85-
break;
77+
if (client != null || timeout)
78+
{
79+
break;
80+
}
8681
}
8782
}
88-
Monitor.Exit(ClientQueue);
83+
finally
84+
{
85+
Monitor.Exit(ClientQueue);
86+
}
8987
if (client == null)
9088
{
91-
throw new TimeoutException($"Connection pool is empty and wait time out({Timeout}s)!");
89+
var reasonPhrase = $"Connection pool is empty and wait time out({Timeout}s)";
90+
if (DiagnosticReporter != null)
91+
{
92+
throw DiagnosticReporter.BuildDepletionException(reasonPhrase);
93+
}
94+
throw new TimeoutException(reasonPhrase);
9295
}
9396
return client;
9497
}
9598
}
99+
100+
internal interface IPoolDiagnosticReporter
101+
{
102+
SessionPoolDepletedException BuildDepletionException(string reasonPhrase);
103+
}
96104
}

src/Apache.IoTDB/DataStructure/SessionDataSet.cs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,22 @@ public class SessionDataSet : System.IDisposable
5252
private int Flag => 0x80;
5353
private int DefaultTimeout => 10000;
5454
public int FetchSize { get; set; }
55+
56+
/// <summary>
57+
/// Gets the number of rows in the current fetched batch.
58+
/// Note: This is NOT the total row count of the query result. Use HasNext() to check for more data.
59+
/// </summary>
60+
/// <returns>The number of rows in the current batch.</returns>
61+
public int CurrentBatchRowCount() => _currentBatchRowCount;
62+
63+
/// <summary>
64+
/// Gets the number of rows in the current fetched batch.
65+
/// </summary>
66+
/// <returns>The number of rows in the current batch.</returns>
67+
[Obsolete("Use CurrentBatchRowCount() instead. This property returns batch size, not total row count.")]
5568
public int RowCount { get; set; }
69+
70+
private int _currentBatchRowCount;
5671
public SessionDataSet(string sql, TSExecuteStatementResp resp, Client client, ConcurrentClientQueue clientQueue, long statementId)
5772
{
5873
_clientQueue = clientQueue;
@@ -74,7 +89,8 @@ public SessionDataSet(string sql, TSExecuteStatementResp resp, Client client, Co
7489
// some internal variable
7590
_hasCatchedResult = false;
7691
_rowIndex = 0;
77-
RowCount = _queryDataset.Time.Length / sizeof(long);
92+
_currentBatchRowCount = _queryDataset.Time.Length / sizeof(long);
93+
RowCount = _currentBatchRowCount;
7894

7995
_columnNames = resp.Columns;
8096
_columnTypeLst = resp.DataTypeList;
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
using System.Threading;
21+
22+
namespace Apache.IoTDB
23+
{
24+
/// <summary>
25+
/// Encapsulates real-time health statistics for connection pool monitoring.
26+
/// Thread-safe implementation for concurrent access patterns.
27+
/// </summary>
28+
internal class PoolHealthMetrics
29+
{
30+
private int _reconnectionFailureTally;
31+
private readonly int _configuredMaxSize;
32+
33+
public PoolHealthMetrics(int configuredMaxSize)
34+
{
35+
_configuredMaxSize = configuredMaxSize;
36+
}
37+
38+
public void IncrementReconnectionFailures()
39+
{
40+
Interlocked.Increment(ref _reconnectionFailureTally);
41+
}
42+
43+
public void ResetAllCounters()
44+
{
45+
Interlocked.Exchange(ref _reconnectionFailureTally, 0);
46+
}
47+
48+
public int GetReconnectionFailureTally() => Volatile.Read(ref _reconnectionFailureTally);
49+
50+
public int GetConfiguredMaxSize() => _configuredMaxSize;
51+
}
52+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
using System;
21+
using Thrift;
22+
23+
namespace Apache.IoTDB
24+
{
25+
/// <summary>
26+
/// Exception thrown when all reconnection attempts to the server have failed.
27+
/// This exception is used internally to distinguish reconnection failures from other errors.
28+
/// </summary>
29+
internal class ReconnectionFailedException : TException
30+
{
31+
internal ReconnectionFailedException(string message)
32+
: base(message, null)
33+
{
34+
}
35+
36+
internal ReconnectionFailedException(string message, Exception innerException)
37+
: base(message, innerException)
38+
{
39+
}
40+
}
41+
}

0 commit comments

Comments
 (0)