Skip to content

Commit c2d9d18

Browse files
committed
fix: enrich DriverTimeoutException with per-node pool/channel diagnostics (DRIVER-540)
Add NodeDiagnostics public inner class to DriverTimeoutException with fields for in-flight counts and pool capacity, and generate a diagnostic suffix in the exception message at timeout time. Refactor all four request-handler timeout paths (CqlRequestHandler, CqlPrepareHandler, GraphRequestHandler, ContinuousRequestHandlerBase) to build List<NodeDiagnostics> instead of a raw message string. Update IT assertions that matched exact message strings to use hasMessageStartingWith() to accommodate the new suffix.
1 parent bd9b26a commit c2d9d18

7 files changed

Lines changed: 339 additions & 24 deletions

File tree

core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
*/
1818
package com.datastax.dse.driver.internal.core.cql.continuous;
1919

20+
import static com.datastax.oss.driver.api.core.DriverTimeoutException.UNAVAILABLE;
21+
2022
import com.datastax.dse.driver.api.core.DseProtocolVersion;
2123
import com.datastax.dse.driver.api.core.cql.continuous.ContinuousAsyncResultSet;
2224
import com.datastax.dse.driver.internal.core.DseProtocolFeature;
@@ -26,6 +28,7 @@
2628
import com.datastax.oss.driver.api.core.AllNodesFailedException;
2729
import com.datastax.oss.driver.api.core.CqlIdentifier;
2830
import com.datastax.oss.driver.api.core.DriverTimeoutException;
31+
import com.datastax.oss.driver.api.core.DriverTimeoutException.NodeDiagnostics;
2932
import com.datastax.oss.driver.api.core.NodeUnavailableException;
3033
import com.datastax.oss.driver.api.core.ProtocolVersion;
3134
import com.datastax.oss.driver.api.core.RequestThrottlingException;
@@ -62,6 +65,7 @@
6265
import com.datastax.oss.driver.internal.core.metadata.DefaultNode;
6366
import com.datastax.oss.driver.internal.core.metrics.NodeMetricUpdater;
6467
import com.datastax.oss.driver.internal.core.metrics.SessionMetricUpdater;
68+
import com.datastax.oss.driver.internal.core.pool.ChannelPool;
6569
import com.datastax.oss.driver.internal.core.session.DefaultSession;
6670
import com.datastax.oss.driver.internal.core.session.RepreparePayload;
6771
import com.datastax.oss.driver.internal.core.util.Loggers;
@@ -89,6 +93,8 @@
8993
import java.time.Duration;
9094
import java.util.AbstractMap;
9195
import java.util.ArrayDeque;
96+
import java.util.ArrayList;
97+
import java.util.Collections;
9298
import java.util.List;
9399
import java.util.Map;
94100
import java.util.Queue;
@@ -390,13 +396,32 @@ private Timeout scheduleGlobalTimeout() {
390396
}
391397
LOG.trace("[{}] Scheduling global timeout for pages in {}", logPrefix, globalTimeout);
392398
return timer.newTimeout(
393-
timeout ->
394-
abortGlobalRequestOrChosenCallback(
395-
new DriverTimeoutException("Query timed out after " + globalTimeout)),
399+
timeout -> {
400+
List<NodeDiagnostics> diagnostics = buildNodeDiagnostics();
401+
abortGlobalRequestOrChosenCallback(
402+
new DriverTimeoutException("Query timed out after " + globalTimeout, diagnostics));
403+
},
396404
globalTimeout.toNanos(),
397405
TimeUnit.NANOSECONDS);
398406
}
399407

408+
private List<NodeDiagnostics> buildNodeDiagnostics() {
409+
List<NodeResponseCallback> callbacks = inFlightCallbacks;
410+
List<NodeDiagnostics> result = new ArrayList<>(callbacks.size());
411+
for (NodeResponseCallback cb : callbacks) {
412+
int channelInFlight = cb.channel.getInFlight();
413+
ChannelPool pool = session.getPools().get(cb.node);
414+
result.add(
415+
NodeDiagnostics.of(
416+
cb.node.getEndPoint(),
417+
channelInFlight,
418+
pool != null ? pool.getInFlight() : UNAVAILABLE,
419+
pool != null ? pool.getAvailableIds() : UNAVAILABLE,
420+
pool != null ? pool.getOrphanedIds() : UNAVAILABLE));
421+
}
422+
return result;
423+
}
424+
400425
/**
401426
* Cancels the continuous paging request.
402427
*
@@ -718,9 +743,18 @@ private void onPageTimeout(int expectedPage) {
718743
lock.lock();
719744
try {
720745
if (state == expectedPage) {
746+
int channelInFlight = channel.getInFlight();
747+
ChannelPool pool = session.getPools().get(node);
721748
abort(
722749
new DriverTimeoutException(
723-
String.format("Timed out waiting for page %d", expectedPage)),
750+
"Timed out waiting for page " + expectedPage,
751+
Collections.singletonList(
752+
NodeDiagnostics.of(
753+
node.getEndPoint(),
754+
channelInFlight,
755+
pool != null ? pool.getInFlight() : UNAVAILABLE,
756+
pool != null ? pool.getAvailableIds() : UNAVAILABLE,
757+
pool != null ? pool.getOrphanedIds() : UNAVAILABLE))),
724758
false);
725759
} else {
726760
// Ignore timeout if the request has moved on in the interim.

core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandler.java

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
*/
1818
package com.datastax.dse.driver.internal.core.graph;
1919

20+
import static com.datastax.oss.driver.api.core.DriverTimeoutException.UNAVAILABLE;
21+
2022
import com.datastax.dse.driver.api.core.graph.AsyncGraphResultSet;
2123
import com.datastax.dse.driver.api.core.graph.GraphNode;
2224
import com.datastax.dse.driver.api.core.graph.GraphStatement;
@@ -26,6 +28,7 @@
2628
import com.datastax.oss.driver.api.core.AllNodesFailedException;
2729
import com.datastax.oss.driver.api.core.DriverException;
2830
import com.datastax.oss.driver.api.core.DriverTimeoutException;
31+
import com.datastax.oss.driver.api.core.DriverTimeoutException.NodeDiagnostics;
2932
import com.datastax.oss.driver.api.core.NodeUnavailableException;
3033
import com.datastax.oss.driver.api.core.RequestThrottlingException;
3134
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
@@ -56,6 +59,7 @@
5659
import com.datastax.oss.driver.internal.core.metadata.DefaultNode;
5760
import com.datastax.oss.driver.internal.core.metrics.NodeMetricUpdater;
5861
import com.datastax.oss.driver.internal.core.metrics.SessionMetricUpdater;
62+
import com.datastax.oss.driver.internal.core.pool.ChannelPool;
5963
import com.datastax.oss.driver.internal.core.session.DefaultSession;
6064
import com.datastax.oss.driver.internal.core.tracker.NoopRequestTracker;
6165
import com.datastax.oss.driver.internal.core.tracker.RequestLogger;
@@ -77,6 +81,7 @@
7781
import java.time.Duration;
7882
import java.util.AbstractMap;
7983
import java.util.ArrayDeque;
84+
import java.util.ArrayList;
8085
import java.util.List;
8186
import java.util.Map;
8287
import java.util.Queue;
@@ -209,12 +214,15 @@ private Timeout scheduleTimeout(Duration timeoutDuration) {
209214
if (timeoutDuration != null && timeoutDuration.toNanos() > 0) {
210215
try {
211216
return this.timer.newTimeout(
212-
(Timeout timeout1) ->
213-
setFinalError(
214-
initialStatement,
215-
new DriverTimeoutException("Query timed out after " + timeoutDuration),
216-
null,
217-
NO_SUCCESSFUL_EXECUTION),
217+
(Timeout timeout1) -> {
218+
List<NodeDiagnostics> diagnostics = buildNodeDiagnostics();
219+
setFinalError(
220+
initialStatement,
221+
new DriverTimeoutException(
222+
"Query timed out after " + timeoutDuration, diagnostics),
223+
null,
224+
NO_SUCCESSFUL_EXECUTION);
225+
},
218226
timeoutDuration.toNanos(),
219227
TimeUnit.NANOSECONDS);
220228
} catch (IllegalStateException e) {
@@ -229,6 +237,23 @@ private Timeout scheduleTimeout(Duration timeoutDuration) {
229237
return null;
230238
}
231239

240+
private List<NodeDiagnostics> buildNodeDiagnostics() {
241+
List<NodeResponseCallback> callbacks = inFlightCallbacks;
242+
List<NodeDiagnostics> result = new ArrayList<>(callbacks.size());
243+
for (NodeResponseCallback cb : callbacks) {
244+
int channelInFlight = cb.channel.getInFlight();
245+
ChannelPool pool = session.getPools().get(cb.node);
246+
result.add(
247+
NodeDiagnostics.of(
248+
cb.node.getEndPoint(),
249+
channelInFlight,
250+
pool != null ? pool.getInFlight() : UNAVAILABLE,
251+
pool != null ? pool.getAvailableIds() : UNAVAILABLE,
252+
pool != null ? pool.getOrphanedIds() : UNAVAILABLE));
253+
}
254+
return result;
255+
}
256+
232257
/**
233258
* Sends the request to the next available node.
234259
*

0 commit comments

Comments
 (0)