Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package com.datastax.dse.driver.internal.core.cql.continuous;

import static com.datastax.oss.driver.api.core.DriverTimeoutException.UNAVAILABLE;

import com.datastax.dse.driver.api.core.DseProtocolVersion;
import com.datastax.dse.driver.api.core.cql.continuous.ContinuousAsyncResultSet;
import com.datastax.dse.driver.internal.core.DseProtocolFeature;
Expand All @@ -26,6 +28,7 @@
import com.datastax.oss.driver.api.core.AllNodesFailedException;
import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.DriverTimeoutException;
import com.datastax.oss.driver.api.core.DriverTimeoutException.NodeDiagnostics;
import com.datastax.oss.driver.api.core.NodeUnavailableException;
import com.datastax.oss.driver.api.core.ProtocolVersion;
import com.datastax.oss.driver.api.core.RequestThrottlingException;
Expand Down Expand Up @@ -62,6 +65,7 @@
import com.datastax.oss.driver.internal.core.metadata.DefaultNode;
import com.datastax.oss.driver.internal.core.metrics.NodeMetricUpdater;
import com.datastax.oss.driver.internal.core.metrics.SessionMetricUpdater;
import com.datastax.oss.driver.internal.core.pool.ChannelPool;
import com.datastax.oss.driver.internal.core.session.DefaultSession;
import com.datastax.oss.driver.internal.core.session.RepreparePayload;
import com.datastax.oss.driver.internal.core.util.Loggers;
Expand Down Expand Up @@ -390,13 +394,32 @@ private Timeout scheduleGlobalTimeout() {
}
LOG.trace("[{}] Scheduling global timeout for pages in {}", logPrefix, globalTimeout);
return timer.newTimeout(
timeout ->
abortGlobalRequestOrChosenCallback(
new DriverTimeoutException("Query timed out after " + globalTimeout)),
timeout -> {
NodeDiagnostics diagnostics = buildNodeDiagnostics();
abortGlobalRequestOrChosenCallback(
new DriverTimeoutException("Query timed out after " + globalTimeout, diagnostics));
},
globalTimeout.toNanos(),
TimeUnit.NANOSECONDS);
}

@Nullable
private NodeDiagnostics buildNodeDiagnostics() {
List<NodeResponseCallback> callbacks = inFlightCallbacks;
if (callbacks.isEmpty()) {
return null;
}
NodeResponseCallback cb = callbacks.get(0);
int channelInFlight = cb.channel.getInFlight();
ChannelPool pool = session.getPools().get(cb.node);
return NodeDiagnostics.of(
cb.node.getEndPoint(),
channelInFlight,
pool != null ? pool.getInFlight() : UNAVAILABLE,
pool != null ? pool.getAvailableIds() : UNAVAILABLE,
pool != null ? pool.getOrphanedIds() : UNAVAILABLE);
}

/**
* Cancels the continuous paging request.
*
Expand Down Expand Up @@ -718,9 +741,17 @@ private void onPageTimeout(int expectedPage) {
lock.lock();
try {
if (state == expectedPage) {
int channelInFlight = channel.getInFlight();
ChannelPool pool = session.getPools().get(node);
abort(
new DriverTimeoutException(
String.format("Timed out waiting for page %d", expectedPage)),
"Timed out waiting for page " + expectedPage,
NodeDiagnostics.of(
node.getEndPoint(),
channelInFlight,
pool != null ? pool.getInFlight() : UNAVAILABLE,
pool != null ? pool.getAvailableIds() : UNAVAILABLE,
pool != null ? pool.getOrphanedIds() : UNAVAILABLE)),
false);
} else {
// Ignore timeout if the request has moved on in the interim.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package com.datastax.dse.driver.internal.core.graph;

import static com.datastax.oss.driver.api.core.DriverTimeoutException.UNAVAILABLE;

import com.datastax.dse.driver.api.core.graph.AsyncGraphResultSet;
import com.datastax.dse.driver.api.core.graph.GraphNode;
import com.datastax.dse.driver.api.core.graph.GraphStatement;
Expand All @@ -26,6 +28,7 @@
import com.datastax.oss.driver.api.core.AllNodesFailedException;
import com.datastax.oss.driver.api.core.DriverException;
import com.datastax.oss.driver.api.core.DriverTimeoutException;
import com.datastax.oss.driver.api.core.DriverTimeoutException.NodeDiagnostics;
import com.datastax.oss.driver.api.core.NodeUnavailableException;
import com.datastax.oss.driver.api.core.RequestThrottlingException;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
Expand Down Expand Up @@ -56,6 +59,7 @@
import com.datastax.oss.driver.internal.core.metadata.DefaultNode;
import com.datastax.oss.driver.internal.core.metrics.NodeMetricUpdater;
import com.datastax.oss.driver.internal.core.metrics.SessionMetricUpdater;
import com.datastax.oss.driver.internal.core.pool.ChannelPool;
import com.datastax.oss.driver.internal.core.session.DefaultSession;
import com.datastax.oss.driver.internal.core.tracker.NoopRequestTracker;
import com.datastax.oss.driver.internal.core.tracker.RequestLogger;
Expand All @@ -68,6 +72,7 @@
import com.datastax.oss.protocol.internal.response.result.Rows;
import com.datastax.oss.protocol.internal.response.result.Void;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import io.netty.handler.codec.EncoderException;
import io.netty.util.Timeout;
import io.netty.util.Timer;
Expand Down Expand Up @@ -209,12 +214,15 @@ private Timeout scheduleTimeout(Duration timeoutDuration) {
if (timeoutDuration != null && timeoutDuration.toNanos() > 0) {
try {
return this.timer.newTimeout(
(Timeout timeout1) ->
setFinalError(
initialStatement,
new DriverTimeoutException("Query timed out after " + timeoutDuration),
null,
NO_SUCCESSFUL_EXECUTION),
(Timeout timeout1) -> {
NodeDiagnostics diagnostics = buildNodeDiagnostics();
setFinalError(
initialStatement,
new DriverTimeoutException(
"Query timed out after " + timeoutDuration, diagnostics),
null,
NO_SUCCESSFUL_EXECUTION);
},
timeoutDuration.toNanos(),
TimeUnit.NANOSECONDS);
} catch (IllegalStateException e) {
Expand All @@ -229,6 +237,23 @@ private Timeout scheduleTimeout(Duration timeoutDuration) {
return null;
}

@Nullable
private NodeDiagnostics buildNodeDiagnostics() {
List<NodeResponseCallback> callbacks = inFlightCallbacks;
if (callbacks.isEmpty()) {
return null;
}
NodeResponseCallback cb = callbacks.get(0);
int channelInFlight = cb.channel.getInFlight();
ChannelPool pool = session.getPools().get(cb.node);
return NodeDiagnostics.of(
cb.node.getEndPoint(),
channelInFlight,
pool != null ? pool.getInFlight() : UNAVAILABLE,
pool != null ? pool.getAvailableIds() : UNAVAILABLE,
pool != null ? pool.getOrphanedIds() : UNAVAILABLE);
}

/**
* Sends the request to the next available node.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,222 @@
package com.datastax.oss.driver.api.core;

import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
import com.datastax.oss.driver.api.core.metadata.EndPoint;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;

/** Thrown when a driver request timed out. */
/**
* Thrown when a driver request timed out.
*
* <p>When thrown from the request execution path the exception carries a per-node diagnostic
* snapshot captured at the moment the timer fires (see {@link #getNodeDiagnostics()}). This
* information is also embedded in the exception message for easy log-based diagnosis.
*/
public class DriverTimeoutException extends DriverException {

/**
* Sentinel value used in {@link NodeDiagnostics} fields when the corresponding data was not
* available at the time the exception was created (e.g. the pool had already been removed).
*/
public static final int UNAVAILABLE = -1;

/**
* Per-node diagnostic snapshot captured at timeout time.
*
* <p>Fields:
*
* <ul>
* <li>{@link #getChannelInFlight()}: requests currently awaiting a response on the specific
* connection used for this request.
* <li>{@link #getPoolInFlight()}: total in-flight across all connections to this host ({@link
* #UNAVAILABLE} if the pool was already removed).
* <li>{@link #getPoolAvailableIds()}: remaining stream IDs available to send new requests; a
* low value indicates pool contention ({@link #UNAVAILABLE} if pool was already removed).
* <li>{@link #getPoolOrphanedIds()}: stream IDs from previously timed-out or cancelled requests
* that cannot be released yet; a high value indicates stale stream ID accumulation ({@link
* #UNAVAILABLE} if pool was already removed).
* </ul>
*
* <p><b>Diagnosing failure modes:</b>
*
* <ul>
* <li>{@code poolAvailableIds} near zero → pool contention; requests queuing inside the driver
* before reaching the server.
* <li>{@code poolAvailableIds} normal + high {@code channelInFlight} → server is slow; requests
* were sent but not answered within the timeout.
* <li>High {@code poolOrphanedIds} → previous timeouts consumed stream IDs that the driver is
* still waiting to reclaim.
* </ul>
*/
public static final class NodeDiagnostics {

@NonNull private final EndPoint endPoint;
private final int channelInFlight;
private final int poolInFlight;
private final int poolAvailableIds;
private final int poolOrphanedIds;

/**
* Creates a full diagnostic snapshot (pool was available at timeout time).
*
* @param endPoint the endpoint of the node.
* @param channelInFlight in-flight count on the specific channel.
* @param poolInFlight total in-flight across the pool for this host.
* @param poolAvailableIds remaining stream IDs available in the pool.
* @param poolOrphanedIds orphaned stream IDs in the pool.
*/
public NodeDiagnostics(
@NonNull EndPoint endPoint,
int channelInFlight,
int poolInFlight,
int poolAvailableIds,
int poolOrphanedIds) {
this.endPoint = endPoint;
this.channelInFlight = channelInFlight;
this.poolInFlight = poolInFlight;
this.poolAvailableIds = poolAvailableIds;
this.poolOrphanedIds = poolOrphanedIds;
}

/**
* Creates a partial diagnostic snapshot for when the pool was unavailable at timeout time. The
* pool-related fields ({@link #getPoolInFlight()}, {@link #getPoolAvailableIds()}, {@link
* #getPoolOrphanedIds()}) will be {@link DriverTimeoutException#UNAVAILABLE}.
*
* @param endPoint the endpoint of the node.
* @param channelInFlight in-flight count on the specific channel.
*/
public NodeDiagnostics(@NonNull EndPoint endPoint, int channelInFlight) {
this(endPoint, channelInFlight, UNAVAILABLE, UNAVAILABLE, UNAVAILABLE);
}

/**
* Creates a diagnostic snapshot using pre-computed pool stats. Pass {@link
* DriverTimeoutException#UNAVAILABLE} for pool fields when the pool was not available at
* timeout time.
*
* @param endPoint the endpoint of the node.
* @param channelInFlight in-flight count on the specific channel.
* @param poolInFlight total in-flight across the pool, or {@link
* DriverTimeoutException#UNAVAILABLE}.
* @param poolAvailableIds remaining stream IDs in the pool, or {@link
* DriverTimeoutException#UNAVAILABLE}.
* @param poolOrphanedIds orphaned stream IDs in the pool, or {@link
* DriverTimeoutException#UNAVAILABLE}.
*/
@NonNull
public static NodeDiagnostics of(
@NonNull EndPoint endPoint,
int channelInFlight,
int poolInFlight,
int poolAvailableIds,
int poolOrphanedIds) {
return new NodeDiagnostics(
endPoint, channelInFlight, poolInFlight, poolAvailableIds, poolOrphanedIds);
}

/** Returns the endpoint of the node that had in-flight requests at timeout time. */
@NonNull
public EndPoint getEndPoint() {
return endPoint;
}

/**
* Returns the number of in-flight requests on the specific connection at timeout time, or
* {@link DriverTimeoutException#UNAVAILABLE} if not available.
*/
public int getChannelInFlight() {
return channelInFlight;
}

/**
* Returns the total number of in-flight requests across all connections to this host at timeout
* time, or {@link DriverTimeoutException#UNAVAILABLE} if the pool was no longer available.
*/
public int getPoolInFlight() {
return poolInFlight;
}

/**
* Returns the number of remaining stream IDs available in the pool at timeout time, or {@link
* DriverTimeoutException#UNAVAILABLE} if the pool was no longer available. A low value
* indicates pool contention.
*/
public int getPoolAvailableIds() {
return poolAvailableIds;
}

/**
* Returns the number of orphaned stream IDs in the pool at timeout time, or {@link
* DriverTimeoutException#UNAVAILABLE} if the pool was no longer available. A high value
* indicates stale stream ID accumulation from previous timeouts.
*/
public int getPoolOrphanedIds() {
return poolOrphanedIds;
}

@Override
public String toString() {
if (poolInFlight == UNAVAILABLE) {
return String.format("%s [channel in-flight: %d, pool: n/a]", endPoint, channelInFlight);
}
return String.format(
"%s [channel in-flight: %d, pool in-flight: %d, pool available ids: %d, pool orphaned ids: %d]",
endPoint, channelInFlight, poolInFlight, poolAvailableIds, poolOrphanedIds);
}
}

@Nullable private final NodeDiagnostics nodeDiagnostics;

/**
* Creates an exception with a plain message and no node diagnostics. Used for cases where the
* diagnostic data is unavailable (e.g. no nodes were in-flight at timeout time).
*
* @param message the exception message.
*/
public DriverTimeoutException(@NonNull String message) {
this(message, null);
this(message, (NodeDiagnostics) null);
}

private DriverTimeoutException(String message, ExecutionInfo executionInfo) {
/**
* Creates an exception with per-node diagnostic context captured at timeout time. The message is
* generated automatically from {@code baseMessage} and the diagnostic data.
*
* @param baseMessage the base timeout message (e.g. {@code "Query timed out after PT0.5S"}).
* @param nodeDiagnostics per-node diagnostic snapshot; may be {@code null} if unavailable, in
* which case no node information is appended to the message.
*/
public DriverTimeoutException(
@NonNull String baseMessage, @Nullable NodeDiagnostics nodeDiagnostics) {
this(buildMessage(baseMessage, nodeDiagnostics), nodeDiagnostics, null);
}

private DriverTimeoutException(
String message, @Nullable NodeDiagnostics nodeDiagnostics, ExecutionInfo executionInfo) {
super(message, executionInfo, null, true);
this.nodeDiagnostics = nodeDiagnostics;
}
Comment thread
nikagra marked this conversation as resolved.

/**
* Returns the per-node diagnostic snapshot captured at timeout time, or {@code null} if not
* available.
*/
@Nullable
public NodeDiagnostics getNodeDiagnostics() {
return nodeDiagnostics;
}

@NonNull
@Override
public DriverException copy() {
return new DriverTimeoutException(getMessage(), getExecutionInfo());
return new DriverTimeoutException(getMessage(), nodeDiagnostics, getExecutionInfo());
}

private static String buildMessage(
@NonNull String baseMessage, @Nullable NodeDiagnostics nodeDiagnostics) {
if (nodeDiagnostics == null) {
return baseMessage;
}
return baseMessage + " — node in flight: " + nodeDiagnostics;
}
Comment thread
nikagra marked this conversation as resolved.
}
Loading
Loading