Skip to content

Commit 8f9b936

Browse files
committed
fix: improve driver-side timeout logging with pool/channel diagnostics (4.x)
Fixes: DRIVER-540 When a DriverTimeoutException fires, the exception message now includes diagnostic information captured at timeout time: - Which nodes had in-flight requests at the moment of timeout - Per-node channel in-flight count - Per-node pool in-flight, available IDs, and orphaned IDs This helps distinguish between two failure modes: - Pool contention: low available IDs / high pool in-flight — requests queuing in the driver before reaching the server - Server-side slowness: high channel in-flight, normal available IDs — requests were sent but the server did not respond in time - Orphaned stream ID stalls: high orphaned IDs from previous timeouts Changes applied to all four request handler types: - CqlRequestHandler (iterates inFlightCallbacks list) - GraphRequestHandler (iterates inFlightCallbacks list) - CqlPrepareHandler (single initialCallback) - ContinuousRequestHandlerBase (global timeout + per-page timeout) No logic changes; the new diagnostics are appended to the existing timeout message string. No changes to DriverTimeoutException API. Existing test assertions all use substring matching and continue to pass.
1 parent 52b0f4a commit 8f9b936

4 files changed

Lines changed: 149 additions & 20 deletions

File tree

core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java

Lines changed: 53 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import com.datastax.oss.driver.internal.core.metadata.DefaultNode;
6363
import com.datastax.oss.driver.internal.core.metrics.NodeMetricUpdater;
6464
import com.datastax.oss.driver.internal.core.metrics.SessionMetricUpdater;
65+
import com.datastax.oss.driver.internal.core.pool.ChannelPool;
6566
import com.datastax.oss.driver.internal.core.session.DefaultSession;
6667
import com.datastax.oss.driver.internal.core.session.RepreparePayload;
6768
import com.datastax.oss.driver.internal.core.util.Loggers;
@@ -92,6 +93,7 @@
9293
import java.util.List;
9394
import java.util.Map;
9495
import java.util.Queue;
96+
import java.util.StringJoiner;
9597
import java.util.concurrent.CancellationException;
9698
import java.util.concurrent.CompletableFuture;
9799
import java.util.concurrent.CompletionStage;
@@ -390,13 +392,41 @@ private Timeout scheduleGlobalTimeout() {
390392
}
391393
LOG.trace("[{}] Scheduling global timeout for pages in {}", logPrefix, globalTimeout);
392394
return timer.newTimeout(
393-
timeout ->
394-
abortGlobalRequestOrChosenCallback(
395-
new DriverTimeoutException("Query timed out after " + globalTimeout)),
395+
timeout -> {
396+
String message = buildGlobalTimeoutMessage(globalTimeout);
397+
abortGlobalRequestOrChosenCallback(new DriverTimeoutException(message));
398+
},
396399
globalTimeout.toNanos(),
397400
TimeUnit.NANOSECONDS);
398401
}
399402

403+
private String buildGlobalTimeoutMessage(Duration timeoutDuration) {
404+
List<NodeResponseCallback> callbacks = inFlightCallbacks;
405+
if (callbacks.isEmpty()) {
406+
return "Query timed out after " + timeoutDuration;
407+
}
408+
StringJoiner nodesInfo = new StringJoiner(", ");
409+
for (NodeResponseCallback cb : callbacks) {
410+
int channelInFlight = cb.channel.getInFlight();
411+
ChannelPool pool = session.getPools().get(cb.node);
412+
if (pool != null) {
413+
nodesInfo.add(
414+
String.format(
415+
"%s [channel in-flight: %d, pool in-flight: %d, pool available ids: %d, pool orphaned ids: %d]",
416+
cb.node.getEndPoint(),
417+
channelInFlight,
418+
pool.getInFlight(),
419+
pool.getAvailableIds(),
420+
pool.getOrphanedIds()));
421+
} else {
422+
nodesInfo.add(
423+
String.format(
424+
"%s [channel in-flight: %d, pool: n/a]", cb.node.getEndPoint(), channelInFlight));
425+
}
426+
}
427+
return "Query timed out after " + timeoutDuration + " — nodes in flight: " + nodesInfo;
428+
}
429+
400430
/**
401431
* Cancels the continuous paging request.
402432
*
@@ -718,10 +748,26 @@ private void onPageTimeout(int expectedPage) {
718748
lock.lock();
719749
try {
720750
if (state == expectedPage) {
721-
abort(
722-
new DriverTimeoutException(
723-
String.format("Timed out waiting for page %d", expectedPage)),
724-
false);
751+
int channelInFlight = channel.getInFlight();
752+
ChannelPool pool = session.getPools().get(node);
753+
String message;
754+
if (pool != null) {
755+
message =
756+
String.format(
757+
"Timed out waiting for page %d — %s [channel in-flight: %d, pool in-flight: %d, pool available ids: %d, pool orphaned ids: %d]",
758+
expectedPage,
759+
node.getEndPoint(),
760+
channelInFlight,
761+
pool.getInFlight(),
762+
pool.getAvailableIds(),
763+
pool.getOrphanedIds());
764+
} else {
765+
message =
766+
String.format(
767+
"Timed out waiting for page %d — %s [channel in-flight: %d, pool: n/a]",
768+
expectedPage, node.getEndPoint(), channelInFlight);
769+
}
770+
abort(new DriverTimeoutException(message), false);
725771
} else {
726772
// Ignore timeout if the request has moved on in the interim.
727773
LOG.trace(

core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandler.java

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import com.datastax.oss.driver.internal.core.metadata.DefaultNode;
5757
import com.datastax.oss.driver.internal.core.metrics.NodeMetricUpdater;
5858
import com.datastax.oss.driver.internal.core.metrics.SessionMetricUpdater;
59+
import com.datastax.oss.driver.internal.core.pool.ChannelPool;
5960
import com.datastax.oss.driver.internal.core.session.DefaultSession;
6061
import com.datastax.oss.driver.internal.core.tracker.NoopRequestTracker;
6162
import com.datastax.oss.driver.internal.core.tracker.RequestLogger;
@@ -80,6 +81,7 @@
8081
import java.util.List;
8182
import java.util.Map;
8283
import java.util.Queue;
84+
import java.util.StringJoiner;
8385
import java.util.concurrent.CancellationException;
8486
import java.util.concurrent.CompletableFuture;
8587
import java.util.concurrent.CompletionStage;
@@ -209,12 +211,14 @@ private Timeout scheduleTimeout(Duration timeoutDuration) {
209211
if (timeoutDuration != null && timeoutDuration.toNanos() > 0) {
210212
try {
211213
return this.timer.newTimeout(
212-
(Timeout timeout1) ->
213-
setFinalError(
214-
initialStatement,
215-
new DriverTimeoutException("Query timed out after " + timeoutDuration),
216-
null,
217-
NO_SUCCESSFUL_EXECUTION),
214+
(Timeout timeout1) -> {
215+
String message = buildTimeoutMessage(timeoutDuration);
216+
setFinalError(
217+
initialStatement,
218+
new DriverTimeoutException(message),
219+
null,
220+
NO_SUCCESSFUL_EXECUTION);
221+
},
218222
timeoutDuration.toNanos(),
219223
TimeUnit.NANOSECONDS);
220224
} catch (IllegalStateException e) {
@@ -229,6 +233,33 @@ private Timeout scheduleTimeout(Duration timeoutDuration) {
229233
return null;
230234
}
231235

236+
private String buildTimeoutMessage(Duration timeoutDuration) {
237+
List<NodeResponseCallback> callbacks = inFlightCallbacks;
238+
if (callbacks.isEmpty()) {
239+
return "Query timed out after " + timeoutDuration;
240+
}
241+
StringJoiner nodesInfo = new StringJoiner(", ");
242+
for (NodeResponseCallback cb : callbacks) {
243+
int channelInFlight = cb.channel.getInFlight();
244+
ChannelPool pool = session.getPools().get(cb.node);
245+
if (pool != null) {
246+
nodesInfo.add(
247+
String.format(
248+
"%s [channel in-flight: %d, pool in-flight: %d, pool available ids: %d, pool orphaned ids: %d]",
249+
cb.node.getEndPoint(),
250+
channelInFlight,
251+
pool.getInFlight(),
252+
pool.getAvailableIds(),
253+
pool.getOrphanedIds()));
254+
} else {
255+
nodesInfo.add(
256+
String.format(
257+
"%s [channel in-flight: %d, pool: n/a]", cb.node.getEndPoint(), channelInFlight));
258+
}
259+
}
260+
return "Query timed out after " + timeoutDuration + " — nodes in flight: " + nodesInfo;
261+
}
262+
232263
/**
233264
* Sends the request to the next available node.
234265
*

core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlPrepareHandler.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import com.datastax.oss.driver.internal.core.channel.DriverChannel;
4646
import com.datastax.oss.driver.internal.core.channel.ResponseCallback;
4747
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
48+
import com.datastax.oss.driver.internal.core.pool.ChannelPool;
4849
import com.datastax.oss.driver.internal.core.session.DefaultSession;
4950
import com.datastax.oss.driver.internal.core.util.Loggers;
5051
import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures;
@@ -165,7 +166,8 @@ private Timeout scheduleTimeout(Duration timeoutDuration) {
165166
if (timeoutDuration.toNanos() > 0) {
166167
return this.timer.newTimeout(
167168
(Timeout timeout1) -> {
168-
setFinalError(new DriverTimeoutException("Query timed out after " + timeoutDuration));
169+
String message = buildTimeoutMessage(timeoutDuration);
170+
setFinalError(new DriverTimeoutException(message));
169171
if (initialCallback != null) {
170172
initialCallback.cancel();
171173
}
@@ -177,6 +179,29 @@ private Timeout scheduleTimeout(Duration timeoutDuration) {
177179
}
178180
}
179181

182+
private String buildTimeoutMessage(Duration timeoutDuration) {
183+
InitialPrepareCallback cb = initialCallback;
184+
if (cb == null) {
185+
return "Query timed out after " + timeoutDuration;
186+
}
187+
int channelInFlight = cb.channel.getInFlight();
188+
ChannelPool pool = session.getPools().get(cb.node);
189+
if (pool != null) {
190+
return String.format(
191+
"Query timed out after %s — %s [channel in-flight: %d, pool in-flight: %d, pool available ids: %d, pool orphaned ids: %d]",
192+
timeoutDuration,
193+
cb.node.getEndPoint(),
194+
channelInFlight,
195+
pool.getInFlight(),
196+
pool.getAvailableIds(),
197+
pool.getOrphanedIds());
198+
} else {
199+
return String.format(
200+
"Query timed out after %s — %s [channel in-flight: %d, pool: n/a]",
201+
timeoutDuration, cb.node.getEndPoint(), channelInFlight);
202+
}
203+
}
204+
180205
private void cancelTimeout() {
181206
if (this.scheduledTimeout != null) {
182207
this.scheduledTimeout.cancel();

core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import com.datastax.oss.driver.internal.core.metadata.token.TokenLong64;
6767
import com.datastax.oss.driver.internal.core.metrics.NodeMetricUpdater;
6868
import com.datastax.oss.driver.internal.core.metrics.SessionMetricUpdater;
69+
import com.datastax.oss.driver.internal.core.pool.ChannelPool;
6970
import com.datastax.oss.driver.internal.core.protocol.TabletInfo;
7071
import com.datastax.oss.driver.internal.core.session.DefaultSession;
7172
import com.datastax.oss.driver.internal.core.session.RepreparePayload;
@@ -97,6 +98,7 @@
9798
import java.util.List;
9899
import java.util.Map;
99100
import java.util.Queue;
101+
import java.util.StringJoiner;
100102
import java.util.concurrent.CancellationException;
101103
import java.util.concurrent.CompletableFuture;
102104
import java.util.concurrent.CompletionStage;
@@ -224,12 +226,10 @@ private Timeout scheduleTimeout(Duration timeoutDuration) {
224226
if (timeoutDuration.toNanos() > 0) {
225227
try {
226228
return this.timer.newTimeout(
227-
(Timeout timeout1) ->
228-
setFinalError(
229-
initialStatement,
230-
new DriverTimeoutException("Query timed out after " + timeoutDuration),
231-
null,
232-
-1),
229+
(Timeout timeout1) -> {
230+
String message = buildTimeoutMessage(timeoutDuration);
231+
setFinalError(initialStatement, new DriverTimeoutException(message), null, -1);
232+
},
233233
timeoutDuration.toNanos(),
234234
TimeUnit.NANOSECONDS);
235235
} catch (IllegalStateException e) {
@@ -244,6 +244,33 @@ private Timeout scheduleTimeout(Duration timeoutDuration) {
244244
return null;
245245
}
246246

247+
private String buildTimeoutMessage(Duration timeoutDuration) {
248+
List<NodeResponseCallback> callbacks = inFlightCallbacks;
249+
if (callbacks.isEmpty()) {
250+
return "Query timed out after " + timeoutDuration;
251+
}
252+
StringJoiner nodesInfo = new StringJoiner(", ");
253+
for (NodeResponseCallback cb : callbacks) {
254+
int channelInFlight = cb.channel.getInFlight();
255+
ChannelPool pool = session.getPools().get(cb.node);
256+
if (pool != null) {
257+
nodesInfo.add(
258+
String.format(
259+
"%s [channel in-flight: %d, pool in-flight: %d, pool available ids: %d, pool orphaned ids: %d]",
260+
cb.node.getEndPoint(),
261+
channelInFlight,
262+
pool.getInFlight(),
263+
pool.getAvailableIds(),
264+
pool.getOrphanedIds()));
265+
} else {
266+
nodesInfo.add(
267+
String.format(
268+
"%s [channel in-flight: %d, pool: n/a]", cb.node.getEndPoint(), channelInFlight));
269+
}
270+
}
271+
return "Query timed out after " + timeoutDuration + " — nodes in flight: " + nodesInfo;
272+
}
273+
247274
private Token getRoutingToken(Statement statement) {
248275
Token token = statement.getRoutingToken();
249276
if (token != null) {

0 commit comments

Comments
 (0)