Skip to content

Commit c200bae

Browse files
committed
Fix SessionPool client leak on reconnection and query failures, and preserve server error messages
- Add SessionPoolDepletedException with diagnostic properties for pool depletion scenarios - Add ReconnectionFailedException for type-safe reconnection failure detection - Fix client leak when reconnection succeeds but retry operation fails - Add PoolHealthMetrics for thread-safe health monitoring - Add try-finally protection for Monitor locks in ConcurrentClientQueue - Remove silent failure pattern - always log connection failures - Add CurrentBatchRowCount() method with Obsolete attribute on RowCount() - Improve database switch error handling with partial failure detection
1 parent 54545a5 commit c200bae

File tree

8 files changed

+771
-72
lines changed

8 files changed

+771
-72
lines changed

docs/SessionPool_Exception_Handling.md

Lines changed: 487 additions & 0 deletions
Large diffs are not rendered by default.

src/Apache.IoTDB.Data/IoTDBDataReader.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ internal IoTDBDataReader(IoTDBCommand IoTDBCommand, SessionDataSet dataSet, bool
5656
_command = IoTDBCommand;
5757
_closeConnection = closeConnection;
5858
_fieldCount = dataSet.GetColumnNames().Count;
59-
_hasRows = dataSet.RowCount() > 0;
60-
_recordsAffected = dataSet.RowCount();
59+
_hasRows = dataSet.CurrentBatchRowCount() > 0;
60+
_recordsAffected = -1; // Total row count is unknown; use -1 per ADO.NET convention
6161

6262
_closed = _closeConnection;
6363
_metas = dataSet.GetColumnNames();

src/Apache.IoTDB/ConcurrentClientQueue.cs

Lines changed: 32 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ namespace Apache.IoTDB
2828
public class ConcurrentClientQueue
2929
{
3030
public ConcurrentQueue<Client> ClientQueue { get; }
31+
internal IPoolDiagnosticReporter DiagnosticReporter { get; set; }
3132

3233
public ConcurrentClientQueue(List<Client> clients)
3334
{
@@ -47,50 +48,51 @@ public void Return(Client client)
4748
Monitor.Exit(ClientQueue);
4849
Thread.Sleep(0);
4950
}
50-
int _ref = 0;
51-
public void AddRef()
52-
{
53-
lock (this)
54-
{
55-
_ref++;
56-
}
57-
}
58-
public int GetRef()
59-
{
60-
return _ref;
61-
}
62-
public void RemoveRef()
63-
{
64-
lock (this)
65-
{
66-
_ref--;
67-
}
68-
}
51+
private int _ref = 0;
52+
public void AddRef() => Interlocked.Increment(ref _ref);
53+
public int GetRef() => Volatile.Read(ref _ref);
54+
public void RemoveRef() => Interlocked.Decrement(ref _ref);
6955
public int Timeout { get; set; } = 10;
7056
public Client Take()
7157
{
7258
Client client = null;
7359
Monitor.Enter(ClientQueue);
74-
while (true)
60+
try
7561
{
76-
bool timeout = false;
77-
if (ClientQueue.IsEmpty)
62+
while (true)
7863
{
79-
timeout = !Monitor.Wait(ClientQueue, TimeSpan.FromSeconds(Timeout));
80-
}
81-
ClientQueue.TryDequeue(out client);
64+
bool timeout = false;
65+
if (ClientQueue.IsEmpty)
66+
{
67+
timeout = !Monitor.Wait(ClientQueue, TimeSpan.FromSeconds(Timeout));
68+
}
69+
ClientQueue.TryDequeue(out client);
8270

83-
if (client != null || timeout)
84-
{
85-
break;
71+
if (client != null || timeout)
72+
{
73+
break;
74+
}
8675
}
8776
}
88-
Monitor.Exit(ClientQueue);
77+
finally
78+
{
79+
Monitor.Exit(ClientQueue);
80+
}
8981
if (client == null)
9082
{
91-
throw new TimeoutException($"Connection pool is empty and wait time out({Timeout}s)!");
83+
var reasonPhrase = $"Connection pool is empty and wait time out({Timeout}s)";
84+
if (DiagnosticReporter != null)
85+
{
86+
throw DiagnosticReporter.BuildDepletionException(reasonPhrase);
87+
}
88+
throw new TimeoutException(reasonPhrase);
9289
}
9390
return client;
9491
}
9592
}
93+
94+
internal interface IPoolDiagnosticReporter
95+
{
96+
SessionPoolDepletedException BuildDepletionException(string reasonPhrase);
97+
}
9698
}

src/Apache.IoTDB/DataStructure/SessionDataSet.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,20 @@ public SessionDataSet(
108108
public IReadOnlyList<string> GetColumnNames() => _rpcDataSet._columnNameList;
109109
public IReadOnlyList<string> GetColumnTypes() => _rpcDataSet._columnTypeList;
110110

111+
/// <summary>
112+
/// Gets the number of rows in the current fetched batch (tsBlock).
113+
/// Note: This is NOT the total row count of the query result. Use HasNext() to check for more data.
114+
/// </summary>
115+
/// <returns>The number of rows in the current batch.</returns>
116+
public int CurrentBatchRowCount() => _rpcDataSet._tsBlockSize;
117+
118+
/// <summary>
119+
/// Gets the number of rows in the current fetched batch.
120+
/// </summary>
121+
/// <returns>The number of rows in the current batch.</returns>
122+
[Obsolete("Use CurrentBatchRowCount() instead. This method returns batch size, not total row count.")]
111123
public int RowCount() => _rpcDataSet._tsBlockSize;
124+
112125
public void ShowTableNames()
113126
{
114127
IReadOnlyList<string> columns = GetColumnNames();
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
using System.Threading;
21+
22+
namespace Apache.IoTDB
23+
{
24+
/// <summary>
25+
/// Encapsulates real-time health statistics for connection pool monitoring.
26+
/// Thread-safe implementation for concurrent access patterns.
27+
/// </summary>
28+
internal class PoolHealthMetrics
29+
{
30+
private int _reconnectionFailureTally;
31+
private readonly int _configuredMaxSize;
32+
33+
public PoolHealthMetrics(int configuredMaxSize)
34+
{
35+
_configuredMaxSize = configuredMaxSize;
36+
}
37+
38+
public void IncrementReconnectionFailures()
39+
{
40+
Interlocked.Increment(ref _reconnectionFailureTally);
41+
}
42+
43+
public void ResetAllCounters()
44+
{
45+
Interlocked.Exchange(ref _reconnectionFailureTally, 0);
46+
}
47+
48+
public int GetReconnectionFailureTally() => Volatile.Read(ref _reconnectionFailureTally);
49+
50+
public int GetConfiguredMaxSize() => _configuredMaxSize;
51+
}
52+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
using System;
21+
using Thrift;
22+
23+
namespace Apache.IoTDB
24+
{
25+
/// <summary>
26+
/// Exception thrown when all reconnection attempts to the server have failed.
27+
/// This exception is used internally to distinguish reconnection failures from other errors.
28+
/// </summary>
29+
internal class ReconnectionFailedException : TException
30+
{
31+
internal ReconnectionFailedException(string message)
32+
: base(message, null)
33+
{
34+
}
35+
36+
internal ReconnectionFailedException(string message, Exception innerException)
37+
: base(message, innerException)
38+
{
39+
}
40+
}
41+
}

0 commit comments

Comments
 (0)