Skip to content

Commit 38daf16

Browse files
committed
fix: enrich DriverTimeoutException with per-node pool/channel diagnostics (DRIVER-540)
Add NodeDiagnostics public inner class to DriverTimeoutException with fields for in-flight counts and pool capacity, and generate a diagnostic suffix in the exception message at timeout time. Refactor all four request-handler timeout paths (CqlRequestHandler, CqlPrepareHandler, GraphRequestHandler, ContinuousRequestHandlerBase) to build List<NodeDiagnostics> instead of a raw message string. Update IT assertions that matched exact message strings to use hasMessageStartingWith() to accommodate the new suffix.
1 parent bd9b26a commit 38daf16

7 files changed

Lines changed: 330 additions & 24 deletions

File tree

core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.datastax.oss.driver.api.core.AllNodesFailedException;
2727
import com.datastax.oss.driver.api.core.CqlIdentifier;
2828
import com.datastax.oss.driver.api.core.DriverTimeoutException;
29+
import com.datastax.oss.driver.api.core.DriverTimeoutException.NodeDiagnostics;
2930
import com.datastax.oss.driver.api.core.NodeUnavailableException;
3031
import com.datastax.oss.driver.api.core.ProtocolVersion;
3132
import com.datastax.oss.driver.api.core.RequestThrottlingException;
@@ -62,6 +63,7 @@
6263
import com.datastax.oss.driver.internal.core.metadata.DefaultNode;
6364
import com.datastax.oss.driver.internal.core.metrics.NodeMetricUpdater;
6465
import com.datastax.oss.driver.internal.core.metrics.SessionMetricUpdater;
66+
import com.datastax.oss.driver.internal.core.pool.ChannelPool;
6567
import com.datastax.oss.driver.internal.core.session.DefaultSession;
6668
import com.datastax.oss.driver.internal.core.session.RepreparePayload;
6769
import com.datastax.oss.driver.internal.core.util.Loggers;
@@ -89,6 +91,8 @@
8991
import java.time.Duration;
9092
import java.util.AbstractMap;
9193
import java.util.ArrayDeque;
94+
import java.util.ArrayList;
95+
import java.util.Collections;
9296
import java.util.List;
9397
import java.util.Map;
9498
import java.util.Queue;
@@ -390,13 +394,36 @@ private Timeout scheduleGlobalTimeout() {
390394
}
391395
LOG.trace("[{}] Scheduling global timeout for pages in {}", logPrefix, globalTimeout);
392396
return timer.newTimeout(
393-
timeout ->
394-
abortGlobalRequestOrChosenCallback(
395-
new DriverTimeoutException("Query timed out after " + globalTimeout)),
397+
timeout -> {
398+
List<NodeDiagnostics> diagnostics = buildNodeDiagnostics();
399+
abortGlobalRequestOrChosenCallback(
400+
new DriverTimeoutException("Query timed out after " + globalTimeout, diagnostics));
401+
},
396402
globalTimeout.toNanos(),
397403
TimeUnit.NANOSECONDS);
398404
}
399405

406+
private List<NodeDiagnostics> buildNodeDiagnostics() {
407+
List<NodeResponseCallback> callbacks = inFlightCallbacks;
408+
List<NodeDiagnostics> result = new ArrayList<>(callbacks.size());
409+
for (NodeResponseCallback cb : callbacks) {
410+
int channelInFlight = cb.channel.getInFlight();
411+
ChannelPool pool = session.getPools().get(cb.node);
412+
if (pool != null) {
413+
result.add(
414+
new NodeDiagnostics(
415+
cb.node.getEndPoint(),
416+
channelInFlight,
417+
pool.getInFlight(),
418+
pool.getAvailableIds(),
419+
pool.getOrphanedIds()));
420+
} else {
421+
result.add(new NodeDiagnostics(cb.node.getEndPoint(), channelInFlight));
422+
}
423+
}
424+
return result;
425+
}
426+
400427
/**
401428
* Cancels the continuous paging request.
402429
*
@@ -718,9 +745,24 @@ private void onPageTimeout(int expectedPage) {
718745
lock.lock();
719746
try {
720747
if (state == expectedPage) {
748+
int channelInFlight = channel.getInFlight();
749+
ChannelPool pool = session.getPools().get(node);
750+
NodeDiagnostics nodeDiagnostics;
751+
if (pool != null) {
752+
nodeDiagnostics =
753+
new NodeDiagnostics(
754+
node.getEndPoint(),
755+
channelInFlight,
756+
pool.getInFlight(),
757+
pool.getAvailableIds(),
758+
pool.getOrphanedIds());
759+
} else {
760+
nodeDiagnostics = new NodeDiagnostics(node.getEndPoint(), channelInFlight);
761+
}
721762
abort(
722763
new DriverTimeoutException(
723-
String.format("Timed out waiting for page %d", expectedPage)),
764+
"Timed out waiting for page " + expectedPage,
765+
Collections.singletonList(nodeDiagnostics)),
724766
false);
725767
} else {
726768
// Ignore timeout if the request has moved on in the interim.

core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandler.java

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.datastax.oss.driver.api.core.AllNodesFailedException;
2727
import com.datastax.oss.driver.api.core.DriverException;
2828
import com.datastax.oss.driver.api.core.DriverTimeoutException;
29+
import com.datastax.oss.driver.api.core.DriverTimeoutException.NodeDiagnostics;
2930
import com.datastax.oss.driver.api.core.NodeUnavailableException;
3031
import com.datastax.oss.driver.api.core.RequestThrottlingException;
3132
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
@@ -56,6 +57,7 @@
5657
import com.datastax.oss.driver.internal.core.metadata.DefaultNode;
5758
import com.datastax.oss.driver.internal.core.metrics.NodeMetricUpdater;
5859
import com.datastax.oss.driver.internal.core.metrics.SessionMetricUpdater;
60+
import com.datastax.oss.driver.internal.core.pool.ChannelPool;
5961
import com.datastax.oss.driver.internal.core.session.DefaultSession;
6062
import com.datastax.oss.driver.internal.core.tracker.NoopRequestTracker;
6163
import com.datastax.oss.driver.internal.core.tracker.RequestLogger;
@@ -77,6 +79,7 @@
7779
import java.time.Duration;
7880
import java.util.AbstractMap;
7981
import java.util.ArrayDeque;
82+
import java.util.ArrayList;
8083
import java.util.List;
8184
import java.util.Map;
8285
import java.util.Queue;
@@ -209,12 +212,15 @@ private Timeout scheduleTimeout(Duration timeoutDuration) {
209212
if (timeoutDuration != null && timeoutDuration.toNanos() > 0) {
210213
try {
211214
return this.timer.newTimeout(
212-
(Timeout timeout1) ->
213-
setFinalError(
214-
initialStatement,
215-
new DriverTimeoutException("Query timed out after " + timeoutDuration),
216-
null,
217-
NO_SUCCESSFUL_EXECUTION),
215+
(Timeout timeout1) -> {
216+
List<NodeDiagnostics> diagnostics = buildNodeDiagnostics();
217+
setFinalError(
218+
initialStatement,
219+
new DriverTimeoutException(
220+
"Query timed out after " + timeoutDuration, diagnostics),
221+
null,
222+
NO_SUCCESSFUL_EXECUTION);
223+
},
218224
timeoutDuration.toNanos(),
219225
TimeUnit.NANOSECONDS);
220226
} catch (IllegalStateException e) {
@@ -229,6 +235,27 @@ private Timeout scheduleTimeout(Duration timeoutDuration) {
229235
return null;
230236
}
231237

238+
private List<NodeDiagnostics> buildNodeDiagnostics() {
239+
List<NodeResponseCallback> callbacks = inFlightCallbacks;
240+
List<NodeDiagnostics> result = new ArrayList<>(callbacks.size());
241+
for (NodeResponseCallback cb : callbacks) {
242+
int channelInFlight = cb.channel.getInFlight();
243+
ChannelPool pool = session.getPools().get(cb.node);
244+
if (pool != null) {
245+
result.add(
246+
new NodeDiagnostics(
247+
cb.node.getEndPoint(),
248+
channelInFlight,
249+
pool.getInFlight(),
250+
pool.getAvailableIds(),
251+
pool.getOrphanedIds()));
252+
} else {
253+
result.add(new NodeDiagnostics(cb.node.getEndPoint(), channelInFlight));
254+
}
255+
}
256+
return result;
257+
}
258+
232259
/**
233260
* Sends the request to the next available node.
234261
*

core/src/main/java/com/datastax/oss/driver/api/core/DriverTimeoutException.java

Lines changed: 186 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,203 @@
1818
package com.datastax.oss.driver.api.core;
1919

2020
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
21+
import com.datastax.oss.driver.api.core.metadata.EndPoint;
2122
import edu.umd.cs.findbugs.annotations.NonNull;
23+
import java.util.Collections;
24+
import java.util.List;
25+
import java.util.StringJoiner;
2226

23-
/** Thrown when a driver request timed out. */
27+
/**
28+
* Thrown when a driver request timed out.
29+
*
30+
* <p>When thrown from the request execution path the exception carries per-node diagnostic
31+
* information captured at the moment the timer fires (see {@link #getNodeDiagnostics()}). This
32+
* information is also embedded in the exception message for easy log-based diagnosis.
33+
*/
2434
public class DriverTimeoutException extends DriverException {
35+
36+
/**
37+
* Sentinel value used in {@link NodeDiagnostics} fields when the corresponding data was not
38+
* available at the time the exception was created (e.g. the pool had already been removed).
39+
*/
40+
public static final int UNAVAILABLE = -1;
41+
42+
/**
43+
* Per-node diagnostic snapshot captured at timeout time.
44+
*
45+
* <p>Fields:
46+
*
47+
* <ul>
48+
* <li>{@link #channelInFlight}: requests currently awaiting a response on the specific
49+
* connection used for this request.
50+
* <li>{@link #poolInFlight}: total in-flight across all connections to this host ({@link
51+
* #UNAVAILABLE} if the pool was already removed).
52+
* <li>{@link #poolAvailableIds}: remaining stream IDs available to send new requests; a low
53+
* value indicates pool contention ({@link #UNAVAILABLE} if pool was already removed).
54+
* <li>{@link #poolOrphanedIds}: stream IDs from previously timed-out or cancelled requests that
55+
* cannot be released yet; a high value indicates stale stream ID accumulation ({@link
56+
* #UNAVAILABLE} if pool was already removed).
57+
* </ul>
58+
*
59+
* <p><b>Diagnosing failure modes:</b>
60+
*
61+
* <ul>
62+
* <li>{@code poolAvailableIds} near zero → pool contention; requests queuing inside the driver
63+
* before reaching the server.
64+
* <li>{@code poolAvailableIds} normal + high {@code channelInFlight} → server is slow; requests
65+
* were sent but not answered within the timeout.
66+
* <li>High {@code poolOrphanedIds} → previous timeouts consumed stream IDs that the driver is
67+
* still waiting to reclaim.
68+
* </ul>
69+
*/
70+
public static final class NodeDiagnostics {
71+
72+
@NonNull private final EndPoint endPoint;
73+
private final int channelInFlight;
74+
private final int poolInFlight;
75+
private final int poolAvailableIds;
76+
private final int poolOrphanedIds;
77+
78+
/**
79+
* Creates a full diagnostic snapshot (pool was available at timeout time).
80+
*
81+
* @param endPoint the endpoint of the node.
82+
* @param channelInFlight in-flight count on the specific channel.
83+
* @param poolInFlight total in-flight across the pool for this host.
84+
* @param poolAvailableIds remaining stream IDs available in the pool.
85+
* @param poolOrphanedIds orphaned stream IDs in the pool.
86+
*/
87+
public NodeDiagnostics(
88+
@NonNull EndPoint endPoint,
89+
int channelInFlight,
90+
int poolInFlight,
91+
int poolAvailableIds,
92+
int poolOrphanedIds) {
93+
this.endPoint = endPoint;
94+
this.channelInFlight = channelInFlight;
95+
this.poolInFlight = poolInFlight;
96+
this.poolAvailableIds = poolAvailableIds;
97+
this.poolOrphanedIds = poolOrphanedIds;
98+
}
99+
100+
/**
101+
* Creates a partial diagnostic snapshot for when the pool was unavailable at timeout time. The
102+
* pool-related fields ({@link #getPoolInFlight()}, {@link #getPoolAvailableIds()}, {@link
103+
* #getPoolOrphanedIds()}) will be {@link DriverTimeoutException#UNAVAILABLE}.
104+
*
105+
* @param endPoint the endpoint of the node.
106+
* @param channelInFlight in-flight count on the specific channel.
107+
*/
108+
public NodeDiagnostics(@NonNull EndPoint endPoint, int channelInFlight) {
109+
this(endPoint, channelInFlight, UNAVAILABLE, UNAVAILABLE, UNAVAILABLE);
110+
}
111+
112+
/** Returns the endpoint of the node that had in-flight requests at timeout time. */
113+
@NonNull
114+
public EndPoint getEndPoint() {
115+
return endPoint;
116+
}
117+
118+
/**
119+
* Returns the number of in-flight requests on the specific connection at timeout time, or
120+
* {@link DriverTimeoutException#UNAVAILABLE} if not available.
121+
*/
122+
public int getChannelInFlight() {
123+
return channelInFlight;
124+
}
125+
126+
/**
127+
* Returns the total number of in-flight requests across all connections to this host at timeout
128+
* time, or {@link DriverTimeoutException#UNAVAILABLE} if the pool was no longer available.
129+
*/
130+
public int getPoolInFlight() {
131+
return poolInFlight;
132+
}
133+
134+
/**
135+
* Returns the number of remaining stream IDs available in the pool at timeout time, or {@link
136+
* DriverTimeoutException#UNAVAILABLE} if the pool was no longer available. A low value
137+
* indicates pool contention.
138+
*/
139+
public int getPoolAvailableIds() {
140+
return poolAvailableIds;
141+
}
142+
143+
/**
144+
* Returns the number of orphaned stream IDs in the pool at timeout time, or {@link
145+
* DriverTimeoutException#UNAVAILABLE} if the pool was no longer available. A high value
146+
* indicates stale stream ID accumulation from previous timeouts.
147+
*/
148+
public int getPoolOrphanedIds() {
149+
return poolOrphanedIds;
150+
}
151+
152+
@Override
153+
public String toString() {
154+
if (poolInFlight == UNAVAILABLE) {
155+
return String.format("%s [channel in-flight: %d, pool: n/a]", endPoint, channelInFlight);
156+
}
157+
return String.format(
158+
"%s [channel in-flight: %d, pool in-flight: %d, pool available ids: %d, pool orphaned ids: %d]",
159+
endPoint, channelInFlight, poolInFlight, poolAvailableIds, poolOrphanedIds);
160+
}
161+
}
162+
163+
@NonNull private final List<NodeDiagnostics> nodeDiagnostics;
164+
165+
/**
166+
* Creates an exception with a plain message and no node diagnostics. Used for cases where the
167+
* diagnostic data is unavailable (e.g. no nodes were in-flight at timeout time).
168+
*
169+
* @param message the exception message.
170+
*/
25171
public DriverTimeoutException(@NonNull String message) {
26-
this(message, null);
172+
this(message, Collections.emptyList(), null);
173+
}
174+
175+
/**
176+
* Creates an exception with per-node diagnostic context captured at timeout time. The message is
177+
* generated automatically from {@code baseMessage} and the diagnostic data.
178+
*
179+
* @param baseMessage the base timeout message (e.g. {@code "Query timed out after PT0.5S"}).
180+
* @param nodeDiagnostics per-node diagnostic snapshots; must not be null but may be empty, in
181+
* which case no node information is appended to the message.
182+
*/
183+
public DriverTimeoutException(
184+
@NonNull String baseMessage, @NonNull List<NodeDiagnostics> nodeDiagnostics) {
185+
this(buildMessage(baseMessage, nodeDiagnostics), nodeDiagnostics, null);
27186
}
28187

29-
private DriverTimeoutException(String message, ExecutionInfo executionInfo) {
188+
private DriverTimeoutException(
189+
String message, @NonNull List<NodeDiagnostics> nodeDiagnostics, ExecutionInfo executionInfo) {
30190
super(message, executionInfo, null, true);
191+
this.nodeDiagnostics = Collections.unmodifiableList(nodeDiagnostics);
192+
}
193+
194+
/**
195+
* Returns the per-node diagnostic snapshots captured at timeout time, or an empty list if not
196+
* available.
197+
*/
198+
@NonNull
199+
public List<NodeDiagnostics> getNodeDiagnostics() {
200+
return nodeDiagnostics;
31201
}
32202

33203
@NonNull
34204
@Override
35205
public DriverException copy() {
36-
return new DriverTimeoutException(getMessage(), getExecutionInfo());
206+
return new DriverTimeoutException(getMessage(), nodeDiagnostics, getExecutionInfo());
207+
}
208+
209+
private static String buildMessage(
210+
@NonNull String baseMessage, @NonNull List<NodeDiagnostics> nodeDiagnostics) {
211+
if (nodeDiagnostics.isEmpty()) {
212+
return baseMessage;
213+
}
214+
StringJoiner nodesInfo = new StringJoiner(", ");
215+
for (NodeDiagnostics nd : nodeDiagnostics) {
216+
nodesInfo.add(nd.toString());
217+
}
218+
return baseMessage + " — nodes in flight: " + nodesInfo;
37219
}
38220
}

0 commit comments

Comments
 (0)