Skip to content

Commit 441eb42

Browse files
nikagradkropachev
authored andcommitted
fix: enrich DriverTimeoutException with per-node pool/channel diagnostics (DRIVER-540)
Add NodeDiagnostics public inner class to DriverTimeoutException with fields for in-flight counts and pool capacity, and generate a diagnostic suffix in the exception message at timeout time. Refactor all four request-handler timeout paths (CqlRequestHandler, CqlPrepareHandler, GraphRequestHandler, ContinuousRequestHandlerBase) to build List<NodeDiagnostics> instead of a raw message string. Update IT assertions that matched exact message strings to use hasMessageStartingWith() to accommodate the new suffix.
1 parent bd9b26a commit 441eb42

9 files changed

Lines changed: 334 additions & 28 deletions

File tree

core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
*/
1818
package com.datastax.dse.driver.internal.core.cql.continuous;
1919

20+
import static com.datastax.oss.driver.api.core.DriverTimeoutException.UNAVAILABLE;
21+
2022
import com.datastax.dse.driver.api.core.DseProtocolVersion;
2123
import com.datastax.dse.driver.api.core.cql.continuous.ContinuousAsyncResultSet;
2224
import com.datastax.dse.driver.internal.core.DseProtocolFeature;
@@ -26,6 +28,7 @@
2628
import com.datastax.oss.driver.api.core.AllNodesFailedException;
2729
import com.datastax.oss.driver.api.core.CqlIdentifier;
2830
import com.datastax.oss.driver.api.core.DriverTimeoutException;
31+
import com.datastax.oss.driver.api.core.DriverTimeoutException.NodeDiagnostics;
2932
import com.datastax.oss.driver.api.core.NodeUnavailableException;
3033
import com.datastax.oss.driver.api.core.ProtocolVersion;
3134
import com.datastax.oss.driver.api.core.RequestThrottlingException;
@@ -62,6 +65,7 @@
6265
import com.datastax.oss.driver.internal.core.metadata.DefaultNode;
6366
import com.datastax.oss.driver.internal.core.metrics.NodeMetricUpdater;
6467
import com.datastax.oss.driver.internal.core.metrics.SessionMetricUpdater;
68+
import com.datastax.oss.driver.internal.core.pool.ChannelPool;
6569
import com.datastax.oss.driver.internal.core.session.DefaultSession;
6670
import com.datastax.oss.driver.internal.core.session.RepreparePayload;
6771
import com.datastax.oss.driver.internal.core.util.Loggers;
@@ -390,13 +394,32 @@ private Timeout scheduleGlobalTimeout() {
390394
}
391395
LOG.trace("[{}] Scheduling global timeout for pages in {}", logPrefix, globalTimeout);
392396
return timer.newTimeout(
393-
timeout ->
394-
abortGlobalRequestOrChosenCallback(
395-
new DriverTimeoutException("Query timed out after " + globalTimeout)),
397+
timeout -> {
398+
NodeDiagnostics diagnostics = buildNodeDiagnostics();
399+
abortGlobalRequestOrChosenCallback(
400+
new DriverTimeoutException("Query timed out after " + globalTimeout, diagnostics));
401+
},
396402
globalTimeout.toNanos(),
397403
TimeUnit.NANOSECONDS);
398404
}
399405

406+
@Nullable
407+
private NodeDiagnostics buildNodeDiagnostics() {
408+
List<NodeResponseCallback> callbacks = inFlightCallbacks;
409+
if (callbacks.isEmpty()) {
410+
return null;
411+
}
412+
NodeResponseCallback cb = callbacks.get(0);
413+
int channelInFlight = cb.channel.getInFlight();
414+
ChannelPool pool = session.getPools().get(cb.node);
415+
return NodeDiagnostics.of(
416+
cb.node.getEndPoint(),
417+
channelInFlight,
418+
pool != null ? pool.getInFlight() : UNAVAILABLE,
419+
pool != null ? pool.getAvailableIds() : UNAVAILABLE,
420+
pool != null ? pool.getOrphanedIds() : UNAVAILABLE);
421+
}
422+
400423
/**
401424
* Cancels the continuous paging request.
402425
*
@@ -718,9 +741,17 @@ private void onPageTimeout(int expectedPage) {
718741
lock.lock();
719742
try {
720743
if (state == expectedPage) {
744+
int channelInFlight = channel.getInFlight();
745+
ChannelPool pool = session.getPools().get(node);
721746
abort(
722747
new DriverTimeoutException(
723-
String.format("Timed out waiting for page %d", expectedPage)),
748+
"Timed out waiting for page " + expectedPage,
749+
NodeDiagnostics.of(
750+
node.getEndPoint(),
751+
channelInFlight,
752+
pool != null ? pool.getInFlight() : UNAVAILABLE,
753+
pool != null ? pool.getAvailableIds() : UNAVAILABLE,
754+
pool != null ? pool.getOrphanedIds() : UNAVAILABLE)),
724755
false);
725756
} else {
726757
// Ignore timeout if the request has moved on in the interim.

core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandler.java

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
*/
1818
package com.datastax.dse.driver.internal.core.graph;
1919

20+
import static com.datastax.oss.driver.api.core.DriverTimeoutException.UNAVAILABLE;
21+
2022
import com.datastax.dse.driver.api.core.graph.AsyncGraphResultSet;
2123
import com.datastax.dse.driver.api.core.graph.GraphNode;
2224
import com.datastax.dse.driver.api.core.graph.GraphStatement;
@@ -26,6 +28,7 @@
2628
import com.datastax.oss.driver.api.core.AllNodesFailedException;
2729
import com.datastax.oss.driver.api.core.DriverException;
2830
import com.datastax.oss.driver.api.core.DriverTimeoutException;
31+
import com.datastax.oss.driver.api.core.DriverTimeoutException.NodeDiagnostics;
2932
import com.datastax.oss.driver.api.core.NodeUnavailableException;
3033
import com.datastax.oss.driver.api.core.RequestThrottlingException;
3134
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
@@ -56,6 +59,7 @@
5659
import com.datastax.oss.driver.internal.core.metadata.DefaultNode;
5760
import com.datastax.oss.driver.internal.core.metrics.NodeMetricUpdater;
5861
import com.datastax.oss.driver.internal.core.metrics.SessionMetricUpdater;
62+
import com.datastax.oss.driver.internal.core.pool.ChannelPool;
5963
import com.datastax.oss.driver.internal.core.session.DefaultSession;
6064
import com.datastax.oss.driver.internal.core.tracker.NoopRequestTracker;
6165
import com.datastax.oss.driver.internal.core.tracker.RequestLogger;
@@ -68,6 +72,7 @@
6872
import com.datastax.oss.protocol.internal.response.result.Rows;
6973
import com.datastax.oss.protocol.internal.response.result.Void;
7074
import edu.umd.cs.findbugs.annotations.NonNull;
75+
import edu.umd.cs.findbugs.annotations.Nullable;
7176
import io.netty.handler.codec.EncoderException;
7277
import io.netty.util.Timeout;
7378
import io.netty.util.Timer;
@@ -209,12 +214,15 @@ private Timeout scheduleTimeout(Duration timeoutDuration) {
209214
if (timeoutDuration != null && timeoutDuration.toNanos() > 0) {
210215
try {
211216
return this.timer.newTimeout(
212-
(Timeout timeout1) ->
213-
setFinalError(
214-
initialStatement,
215-
new DriverTimeoutException("Query timed out after " + timeoutDuration),
216-
null,
217-
NO_SUCCESSFUL_EXECUTION),
217+
(Timeout timeout1) -> {
218+
NodeDiagnostics diagnostics = buildNodeDiagnostics();
219+
setFinalError(
220+
initialStatement,
221+
new DriverTimeoutException(
222+
"Query timed out after " + timeoutDuration, diagnostics),
223+
null,
224+
NO_SUCCESSFUL_EXECUTION);
225+
},
218226
timeoutDuration.toNanos(),
219227
TimeUnit.NANOSECONDS);
220228
} catch (IllegalStateException e) {
@@ -229,6 +237,23 @@ private Timeout scheduleTimeout(Duration timeoutDuration) {
229237
return null;
230238
}
231239

240+
@Nullable
241+
private NodeDiagnostics buildNodeDiagnostics() {
242+
List<NodeResponseCallback> callbacks = inFlightCallbacks;
243+
if (callbacks.isEmpty()) {
244+
return null;
245+
}
246+
NodeResponseCallback cb = callbacks.get(0);
247+
int channelInFlight = cb.channel.getInFlight();
248+
ChannelPool pool = session.getPools().get(cb.node);
249+
return NodeDiagnostics.of(
250+
cb.node.getEndPoint(),
251+
channelInFlight,
252+
pool != null ? pool.getInFlight() : UNAVAILABLE,
253+
pool != null ? pool.getAvailableIds() : UNAVAILABLE,
254+
pool != null ? pool.getOrphanedIds() : UNAVAILABLE);
255+
}
256+
232257
/**
233258
* Sends the request to the next available node.
234259
*

core/src/main/java/com/datastax/oss/driver/api/core/DriverTimeoutException.java

Lines changed: 205 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,222 @@
1818
package com.datastax.oss.driver.api.core;
1919

2020
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
21+
import com.datastax.oss.driver.api.core.metadata.EndPoint;
2122
import edu.umd.cs.findbugs.annotations.NonNull;
23+
import edu.umd.cs.findbugs.annotations.Nullable;
2224

23-
/** Thrown when a driver request timed out. */
25+
/**
26+
* Thrown when a driver request timed out.
27+
*
28+
* <p>When thrown from the request execution path the exception carries a per-node diagnostic
29+
* snapshot captured at the moment the timer fires (see {@link #getNodeDiagnostics()}). This
30+
* information is also embedded in the exception message for easy log-based diagnosis.
31+
*/
2432
public class DriverTimeoutException extends DriverException {
33+
34+
/**
35+
* Sentinel value used in {@link NodeDiagnostics} fields when the corresponding data was not
36+
* available at the time the exception was created (e.g. the pool had already been removed).
37+
*/
38+
public static final int UNAVAILABLE = -1;
39+
40+
/**
41+
* Per-node diagnostic snapshot captured at timeout time.
42+
*
43+
* <p>Fields:
44+
*
45+
* <ul>
46+
* <li>{@link #getChannelInFlight()}: requests currently awaiting a response on the specific
47+
* connection used for this request.
48+
* <li>{@link #getPoolInFlight()}: total in-flight across all connections to this host ({@link
49+
* #UNAVAILABLE} if the pool was already removed).
50+
* <li>{@link #getPoolAvailableIds()}: remaining stream IDs available to send new requests; a
51+
* low value indicates pool contention ({@link #UNAVAILABLE} if pool was already removed).
52+
* <li>{@link #getPoolOrphanedIds()}: stream IDs from previously timed-out or cancelled requests
53+
* that cannot be released yet; a high value indicates stale stream ID accumulation ({@link
54+
* #UNAVAILABLE} if pool was already removed).
55+
* </ul>
56+
*
57+
* <p><b>Diagnosing failure modes:</b>
58+
*
59+
* <ul>
60+
* <li>{@code poolAvailableIds} near zero → pool contention; requests queuing inside the driver
61+
* before reaching the server.
62+
* <li>{@code poolAvailableIds} normal + high {@code channelInFlight} → server is slow; requests
63+
* were sent but not answered within the timeout.
64+
* <li>High {@code poolOrphanedIds} → previous timeouts consumed stream IDs that the driver is
65+
* still waiting to reclaim.
66+
* </ul>
67+
*/
68+
public static final class NodeDiagnostics {
69+
70+
@NonNull private final EndPoint endPoint;
71+
private final int channelInFlight;
72+
private final int poolInFlight;
73+
private final int poolAvailableIds;
74+
private final int poolOrphanedIds;
75+
76+
/**
77+
* Creates a full diagnostic snapshot (pool was available at timeout time).
78+
*
79+
* @param endPoint the endpoint of the node.
80+
* @param channelInFlight in-flight count on the specific channel.
81+
* @param poolInFlight total in-flight across the pool for this host.
82+
* @param poolAvailableIds remaining stream IDs available in the pool.
83+
* @param poolOrphanedIds orphaned stream IDs in the pool.
84+
*/
85+
public NodeDiagnostics(
86+
@NonNull EndPoint endPoint,
87+
int channelInFlight,
88+
int poolInFlight,
89+
int poolAvailableIds,
90+
int poolOrphanedIds) {
91+
this.endPoint = endPoint;
92+
this.channelInFlight = channelInFlight;
93+
this.poolInFlight = poolInFlight;
94+
this.poolAvailableIds = poolAvailableIds;
95+
this.poolOrphanedIds = poolOrphanedIds;
96+
}
97+
98+
/**
99+
* Creates a partial diagnostic snapshot for when the pool was unavailable at timeout time. The
100+
* pool-related fields ({@link #getPoolInFlight()}, {@link #getPoolAvailableIds()}, {@link
101+
* #getPoolOrphanedIds()}) will be {@link DriverTimeoutException#UNAVAILABLE}.
102+
*
103+
* @param endPoint the endpoint of the node.
104+
* @param channelInFlight in-flight count on the specific channel.
105+
*/
106+
public NodeDiagnostics(@NonNull EndPoint endPoint, int channelInFlight) {
107+
this(endPoint, channelInFlight, UNAVAILABLE, UNAVAILABLE, UNAVAILABLE);
108+
}
109+
110+
/**
111+
* Creates a diagnostic snapshot using pre-computed pool stats. Pass {@link
112+
* DriverTimeoutException#UNAVAILABLE} for pool fields when the pool was not available at
113+
* timeout time.
114+
*
115+
* @param endPoint the endpoint of the node.
116+
* @param channelInFlight in-flight count on the specific channel.
117+
* @param poolInFlight total in-flight across the pool, or {@link
118+
* DriverTimeoutException#UNAVAILABLE}.
119+
* @param poolAvailableIds remaining stream IDs in the pool, or {@link
120+
* DriverTimeoutException#UNAVAILABLE}.
121+
* @param poolOrphanedIds orphaned stream IDs in the pool, or {@link
122+
* DriverTimeoutException#UNAVAILABLE}.
123+
*/
124+
@NonNull
125+
public static NodeDiagnostics of(
126+
@NonNull EndPoint endPoint,
127+
int channelInFlight,
128+
int poolInFlight,
129+
int poolAvailableIds,
130+
int poolOrphanedIds) {
131+
return new NodeDiagnostics(
132+
endPoint, channelInFlight, poolInFlight, poolAvailableIds, poolOrphanedIds);
133+
}
134+
135+
/** Returns the endpoint of the node that had in-flight requests at timeout time. */
136+
@NonNull
137+
public EndPoint getEndPoint() {
138+
return endPoint;
139+
}
140+
141+
/**
142+
* Returns the number of in-flight requests on the specific connection at timeout time, or
143+
* {@link DriverTimeoutException#UNAVAILABLE} if not available.
144+
*/
145+
public int getChannelInFlight() {
146+
return channelInFlight;
147+
}
148+
149+
/**
150+
* Returns the total number of in-flight requests across all connections to this host at timeout
151+
* time, or {@link DriverTimeoutException#UNAVAILABLE} if the pool was no longer available.
152+
*/
153+
public int getPoolInFlight() {
154+
return poolInFlight;
155+
}
156+
157+
/**
158+
* Returns the number of remaining stream IDs available in the pool at timeout time, or {@link
159+
* DriverTimeoutException#UNAVAILABLE} if the pool was no longer available. A low value
160+
* indicates pool contention.
161+
*/
162+
public int getPoolAvailableIds() {
163+
return poolAvailableIds;
164+
}
165+
166+
/**
167+
* Returns the number of orphaned stream IDs in the pool at timeout time, or {@link
168+
* DriverTimeoutException#UNAVAILABLE} if the pool was no longer available. A high value
169+
* indicates stale stream ID accumulation from previous timeouts.
170+
*/
171+
public int getPoolOrphanedIds() {
172+
return poolOrphanedIds;
173+
}
174+
175+
@Override
176+
public String toString() {
177+
if (poolInFlight == UNAVAILABLE) {
178+
return String.format("%s [channel in-flight: %d, pool: n/a]", endPoint, channelInFlight);
179+
}
180+
return String.format(
181+
"%s [channel in-flight: %d, pool in-flight: %d, pool available ids: %d, pool orphaned ids: %d]",
182+
endPoint, channelInFlight, poolInFlight, poolAvailableIds, poolOrphanedIds);
183+
}
184+
}
185+
186+
@Nullable private final NodeDiagnostics nodeDiagnostics;
187+
188+
/**
189+
* Creates an exception with a plain message and no node diagnostics. Used for cases where the
190+
* diagnostic data is unavailable (e.g. no nodes were in-flight at timeout time).
191+
*
192+
* @param message the exception message.
193+
*/
25194
public DriverTimeoutException(@NonNull String message) {
26-
this(message, null);
195+
this(message, (NodeDiagnostics) null);
27196
}
28197

29-
private DriverTimeoutException(String message, ExecutionInfo executionInfo) {
198+
/**
199+
* Creates an exception with per-node diagnostic context captured at timeout time. The message is
200+
* generated automatically from {@code baseMessage} and the diagnostic data.
201+
*
202+
* @param baseMessage the base timeout message (e.g. {@code "Query timed out after PT0.5S"}).
203+
* @param nodeDiagnostics per-node diagnostic snapshot; may be {@code null} if unavailable, in
204+
* which case no node information is appended to the message.
205+
*/
206+
public DriverTimeoutException(
207+
@NonNull String baseMessage, @Nullable NodeDiagnostics nodeDiagnostics) {
208+
this(buildMessage(baseMessage, nodeDiagnostics), nodeDiagnostics, null);
209+
}
210+
211+
private DriverTimeoutException(
212+
String message, @Nullable NodeDiagnostics nodeDiagnostics, ExecutionInfo executionInfo) {
30213
super(message, executionInfo, null, true);
214+
this.nodeDiagnostics = nodeDiagnostics;
215+
}
216+
217+
/**
218+
* Returns the per-node diagnostic snapshot captured at timeout time, or {@code null} if not
219+
* available.
220+
*/
221+
@Nullable
222+
public NodeDiagnostics getNodeDiagnostics() {
223+
return nodeDiagnostics;
31224
}
32225

33226
@NonNull
34227
@Override
35228
public DriverException copy() {
36-
return new DriverTimeoutException(getMessage(), getExecutionInfo());
229+
return new DriverTimeoutException(getMessage(), nodeDiagnostics, getExecutionInfo());
230+
}
231+
232+
private static String buildMessage(
233+
@NonNull String baseMessage, @Nullable NodeDiagnostics nodeDiagnostics) {
234+
if (nodeDiagnostics == null) {
235+
return baseMessage;
236+
}
237+
return baseMessage + " — node in flight: " + nodeDiagnostics;
37238
}
38239
}

0 commit comments

Comments
 (0)