Skip to content

Commit 8ed26d8

Browse files
committed
fix: enrich timeout diagnostics
1 parent 4c22d90 commit 8ed26d8

12 files changed

Lines changed: 980 additions & 44 deletions

driver-core/src/main/java/com/datastax/driver/core/ArrayBackedResultSet.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,9 @@ public Message.Request request() {
411411
@Override
412412
public void register(RequestHandler handler) {}
413413

414+
@Override
415+
public void registerReadTimeoutMillis(long readTimeoutMillis) {}
416+
414417
@Override
415418
public void onSet(
416419
Connection connection,

driver-core/src/main/java/com/datastax/driver/core/Connection.java

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -962,6 +962,9 @@ ResponseHandler write(
962962
throws ConnectionException, BusyConnectionException {
963963

964964
ResponseHandler handler = new ResponseHandler(this, statementReadTimeoutMillis, callback);
965+
if (callback instanceof RequestHandler.Callback) {
966+
((RequestHandler.Callback) callback).registerReadTimeoutMillis(handler.readTimeoutMillis);
967+
}
965968
dispatcher.add(handler);
966969

967970
Message.Request request = callback.request().setStreamId(handler.streamId);
@@ -1062,6 +1065,97 @@ public ProtocolFeatureStore getProtocolFeatureStore() {
10621065
return protocolFeatureStore;
10631066
}
10641067

1068+
OperationTimedOutException newTimeoutException(long elapsedTimeoutNanos, int retryCount) {
1069+
return newTimeoutException(
1070+
null,
1071+
factory.getReadTimeoutMillis(),
1072+
elapsedTimeoutNanos,
1073+
retryCount,
1074+
OperationTimedOutException.UNAVAILABLE);
1075+
}
1076+
1077+
OperationTimedOutException newTimeoutException(
1078+
long configuredTimeoutMs,
1079+
long elapsedTimeoutNanos,
1080+
int retryCount,
1081+
int speculativeExecutionIndex) {
1082+
return newTimeoutException(
1083+
null, configuredTimeoutMs, elapsedTimeoutNanos, retryCount, speculativeExecutionIndex);
1084+
}
1085+
1086+
OperationTimedOutException newTimeoutException(
1087+
String message,
1088+
long configuredTimeoutMs,
1089+
long elapsedTimeoutNanos,
1090+
int retryCount,
1091+
int speculativeExecutionIndex) {
1092+
HostConnectionPool pool = ownerPool();
1093+
ShardingInfo.ConnectionShardingInfo connectionShardingInfo =
1094+
protocolFeatureStore == null ? null : protocolFeatureStore.getConnectionShardingInfo();
1095+
ShardingInfo hostShardingInfo =
1096+
connectionShardingInfo != null
1097+
? connectionShardingInfo.shardingInfo
1098+
: pool != null ? pool.host.getShardingInfo() : null;
1099+
int connectionShardId =
1100+
connectionShardingInfo != null ? shardId() : OperationTimedOutException.UNAVAILABLE;
1101+
int hostShardsCount =
1102+
hostShardingInfo != null
1103+
? hostShardingInfo.getShardsCount()
1104+
: OperationTimedOutException.UNAVAILABLE;
1105+
int poolPendingBorrows =
1106+
pool != null ? pool.pendingBorrowCount.get() : OperationTimedOutException.UNAVAILABLE;
1107+
int poolPendingBorrowsForShard =
1108+
pool != null && connectionShardId != OperationTimedOutException.UNAVAILABLE
1109+
? pool.pendingBorrowCountForShard(connectionShardId)
1110+
: OperationTimedOutException.UNAVAILABLE;
1111+
int poolTotalInFlight =
1112+
pool != null ? pool.totalInFlight.get() : OperationTimedOutException.UNAVAILABLE;
1113+
int poolOpenConnectionsForShard =
1114+
pool != null && connectionShardId != OperationTimedOutException.UNAVAILABLE
1115+
? pool.openConnectionCountForShard(connectionShardId)
1116+
: OperationTimedOutException.UNAVAILABLE;
1117+
int poolMaxConnectionsPerShard =
1118+
pool != null ? pool.maxConnectionsPerShard() : OperationTimedOutException.UNAVAILABLE;
1119+
long elapsedTimeoutMs = TimeUnit.NANOSECONDS.toMillis(elapsedTimeoutNanos);
1120+
1121+
if (message == null) {
1122+
return new OperationTimedOutException(
1123+
endPoint,
1124+
configuredTimeoutMs,
1125+
elapsedTimeoutMs,
1126+
retryCount,
1127+
speculativeExecutionIndex,
1128+
inFlight.get(),
1129+
connectionShardId,
1130+
hostShardsCount,
1131+
poolPendingBorrows,
1132+
poolPendingBorrowsForShard,
1133+
poolTotalInFlight,
1134+
poolOpenConnectionsForShard,
1135+
poolMaxConnectionsPerShard);
1136+
}
1137+
return new OperationTimedOutException(
1138+
endPoint,
1139+
message,
1140+
configuredTimeoutMs,
1141+
elapsedTimeoutMs,
1142+
retryCount,
1143+
speculativeExecutionIndex,
1144+
inFlight.get(),
1145+
connectionShardId,
1146+
hostShardsCount,
1147+
poolPendingBorrows,
1148+
poolPendingBorrowsForShard,
1149+
poolTotalInFlight,
1150+
poolOpenConnectionsForShard,
1151+
poolMaxConnectionsPerShard);
1152+
}
1153+
1154+
private HostConnectionPool ownerPool() {
1155+
Owner owner = ownerRef.get();
1156+
return owner instanceof HostConnectionPool ? (HostConnectionPool) owner : null;
1157+
}
1158+
10651159
/**
10661160
* If the connection is part of a pool, return it to the pool. The connection should generally not
10671161
* be reused after that.
@@ -1814,6 +1908,11 @@ public void register(RequestHandler handler) {
18141908
// noop, we don't care about the handler here so far
18151909
}
18161910

1911+
@Override
1912+
public void registerReadTimeoutMillis(long readTimeoutMillis) {
1913+
// noop, direct connection writes keep the legacy timeout exception shape
1914+
}
1915+
18171916
@Override
18181917
public Message.Request request() {
18191918
return request;

driver-core/src/main/java/com/datastax/driver/core/DefaultResultSetFuture.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ class DefaultResultSetFuture extends AbstractFuture<ResultSet>
4141
private final ProtocolVersion protocolVersion;
4242
private final Message.Request request;
4343
private volatile RequestHandler handler;
44+
private volatile long readTimeoutMillis = OperationTimedOutException.UNAVAILABLE;
4445

4546
DefaultResultSetFuture(
4647
SessionManager session, ProtocolVersion protocolVersion, Message.Request request) {
@@ -54,6 +55,11 @@ public void register(RequestHandler handler) {
5455
this.handler = handler;
5556
}
5657

58+
@Override
59+
public void registerReadTimeoutMillis(long readTimeoutMillis) {
60+
this.readTimeoutMillis = readTimeoutMillis;
61+
}
62+
5763
@Override
5864
public Message.Request request() {
5965
return request;
@@ -293,7 +299,9 @@ public boolean onTimeout(Connection connection, long latency, int retryCount) {
293299
// RequestHandler).
294300
// So just set an exception for the final result, which should be handled correctly by said
295301
// internal call.
296-
setException(new OperationTimedOutException(connection.endPoint));
302+
setException(
303+
connection.newTimeoutException(
304+
readTimeoutMillis, latency, retryCount, OperationTimedOutException.UNAVAILABLE));
297305
return true;
298306
}
299307

driver-core/src/main/java/com/datastax/driver/core/HostConnectionPool.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.datastax.driver.core.exceptions.AuthenticationException;
3030
import com.datastax.driver.core.exceptions.BusyPoolException;
3131
import com.datastax.driver.core.exceptions.ConnectionException;
32+
import com.datastax.driver.core.exceptions.OperationTimedOutException;
3233
import com.datastax.driver.core.exceptions.UnsupportedProtocolVersionException;
3334
import com.datastax.driver.core.utils.MoreFutures;
3435
import com.google.common.annotations.VisibleForTesting;
@@ -90,6 +91,7 @@ class HostConnectionPool implements Connection.Owner {
9091
@VisibleForTesting Set<Connection>[] trash;
9192

9293
private Queue<PendingBorrow>[] pendingBorrows;
94+
private AtomicInteger[] pendingBorrowCountByShard;
9395
final AtomicInteger pendingBorrowCount = new AtomicInteger();
9496

9597
private AtomicInteger[] scheduledForCreation;
@@ -208,6 +210,29 @@ public void tempBlockAdvShardAwareness(long millis) {
208210
Math.max(System.currentTimeMillis() + millis, advShardAwarenessBlockedUntil);
209211
}
210212

213+
int pendingBorrowCountForShard(int shardId) {
214+
if (pendingBorrowCountByShard == null
215+
|| shardId < 0
216+
|| shardId >= pendingBorrowCountByShard.length) {
217+
return OperationTimedOutException.UNAVAILABLE;
218+
}
219+
return pendingBorrowCountByShard[shardId].get();
220+
}
221+
222+
int openConnectionCountForShard(int shardId) {
223+
if (connections == null || shardId < 0 || shardId >= connections.length) {
224+
return OperationTimedOutException.UNAVAILABLE;
225+
}
226+
return connections[shardId].size();
227+
}
228+
229+
int maxConnectionsPerShard() {
230+
if (connections == null) {
231+
return OperationTimedOutException.UNAVAILABLE;
232+
}
233+
return maxConnectionsPerShard;
234+
}
235+
211236
private final ConnectionTasksSharedState connectionTasksSharedState =
212237
new ConnectionTasksSharedState();
213238

@@ -306,11 +331,13 @@ ListenableFuture<Void> initAsyncWithConnection(Connection reusedConnection) {
306331
open = new AtomicInteger[shardsCount];
307332
trash = new Set[shardsCount];
308333
pendingBorrows = new Queue[shardsCount];
334+
pendingBorrowCountByShard = new AtomicInteger[shardsCount];
309335
for (int i = 0; i < shardsCount; ++i) {
310336
this.connections[i] = new CopyOnWriteArrayList<Connection>();
311337
scheduledForCreation[i] = new AtomicInteger();
312338
open[i] = new AtomicInteger();
313339
trash[i] = new CopyOnWriteArraySet<Connection>();
340+
pendingBorrowCountByShard[i] = new AtomicInteger();
314341
pendingBorrows[i] = new ConcurrentLinkedQueue<PendingBorrow>();
315342
}
316343

@@ -626,6 +653,7 @@ private ListenableFuture<Connection> enqueue(
626653
}
627654

628655
PendingBorrow pendingBorrow = new PendingBorrow(timeout, unit, timeoutsExecutor);
656+
pendingBorrowCountByShard[shardId].incrementAndGet();
629657
pendingBorrows[shardId].add(pendingBorrow);
630658

631659
// If we raced with shutdown, make sure the future will be completed. This has no effect if it
@@ -690,6 +718,7 @@ private void dequeue(final Connection connection) {
690718
connection.inFlight.decrementAndGet();
691719
} else {
692720
pendingBorrowCount.decrementAndGet();
721+
pendingBorrowCountByShard[connection.shardId()].decrementAndGet();
693722
// Ensure that the keyspace set on the connection is the one set on the pool state, in the
694723
// general case it will be.
695724
ListenableFuture<Connection> setKeyspaceFuture =

driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,8 @@ void onSet(
319319
long latency);
320320

321321
void register(RequestHandler handler);
322+
323+
void registerReadTimeoutMillis(long readTimeoutMillis);
322324
}
323325

324326
/**
@@ -943,11 +945,19 @@ public boolean onTimeout(Connection connection, long latency, int retryCount) {
943945
queryStateRef.get());
944946
return false;
945947
}
948+
long configuredTimeoutMs =
949+
connectionHandler != null
950+
? connectionHandler.readTimeoutMillis
951+
: OperationTimedOutException.UNAVAILABLE;
952+
OperationTimedOutException timeoutException =
953+
connection.newTimeoutException(
954+
"Timed out waiting for response to PREPARE message",
955+
configuredTimeoutMs,
956+
latency,
957+
retryCount,
958+
position);
946959
connection.release();
947-
logError(
948-
connection.endPoint,
949-
new OperationTimedOutException(
950-
connection.endPoint, "Timed out waiting for response to PREPARE message"));
960+
logError(connection.endPoint, timeoutException);
951961
retry(false, null);
952962
return true;
953963
}
@@ -1006,25 +1016,13 @@ public boolean onTimeout(Connection connection, long latency, int retryCount) {
10061016
}
10071017

10081018
Host queriedHost = current;
1009-
1010-
HostConnectionPool pool = queriedHost == null ? null : manager.pools.get(queriedHost);
10111019
long configuredTimeoutMs =
10121020
connectionHandler != null
10131021
? connectionHandler.readTimeoutMillis
10141022
: OperationTimedOutException.UNAVAILABLE;
1015-
int connInFlight = connection.inFlight.get();
1016-
int poolPendingBorrows =
1017-
pool != null ? pool.pendingBorrowCount.get() : OperationTimedOutException.UNAVAILABLE;
1018-
int poolTotalInFlight =
1019-
pool != null ? pool.totalInFlight.get() : OperationTimedOutException.UNAVAILABLE;
10201023

10211024
OperationTimedOutException timeoutException =
1022-
new OperationTimedOutException(
1023-
connection.endPoint,
1024-
configuredTimeoutMs,
1025-
connInFlight,
1026-
poolPendingBorrows,
1027-
poolTotalInFlight);
1025+
connection.newTimeoutException(configuredTimeoutMs, latency, retryCount, position);
10281026

10291027
try {
10301028
connection.release();

0 commit comments

Comments
 (0)