Skip to content

Commit 674a785

Browse files
feat: add timestamps to MaximumRequestCallbackWaitTimeExceededException
1 parent dcc2a68 commit 674a785

3 files changed

Lines changed: 82 additions & 4 deletions

File tree

java-bigquerystorage/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -928,6 +928,7 @@ private ApiFuture<AppendRowsResponse> appendInternal(
928928
requestProfilerHook.startOperation(RequestProfiler.OperationName.WAIT_QUEUE, requestUniqueId);
929929
++this.inflightRequests;
930930
this.inflightBytes += requestWrapper.messageSize;
931+
requestWrapper.placedInWaitingQueueTime = Instant.now();
931932
waitingRequestQueue.addLast(requestWrapper);
932933
healthCheckMetrics.updateWindowedQueuedRequestsMax(
933934
waitingRequestQueue.size() + inflightRequestQueue.size(), queuedRetryCount.get());
@@ -1151,10 +1152,11 @@ private void appendLoop() {
11511152
hasMessageInWaitingQueue.await(100, TimeUnit.MILLISECONDS);
11521153
// Check whether we should error out the current append loop.
11531154
if (inflightRequestQueue.size() > 0) {
1154-
Instant sendInstant = inflightRequestQueue.getFirst().requestSendTimeStamp;
1155+
AppendRequestAndResponse firstRequest = inflightRequestQueue.getFirst();
1156+
Instant sendInstant = firstRequest.requestSendTimeStamp;
11551157
if (sendInstant != null) {
11561158
healthCheckMetrics.updateResponseWait(sendInstant);
1157-
throwIfWaitCallbackTooLong(sendInstant);
1159+
throwIfWaitCallbackTooLong(firstRequest);
11581160
}
11591161
}
11601162
healthCheckMetrics.periodicHealthCheck();
@@ -1187,6 +1189,9 @@ private void appendLoop() {
11871189
requestProfilerHook.endOperation(
11881190
RequestProfiler.OperationName.WAIT_QUEUE, requestWrapper.requestUniqueId);
11891191
waitForBackoffIfNecessary(requestWrapper);
1192+
if (requestWrapper.placedInInflightQueueTime == null) {
1193+
requestWrapper.placedInInflightQueueTime = Instant.now();
1194+
}
11901195
this.inflightRequestQueue.add(requestWrapper);
11911196
localQueue.addLast(requestWrapper);
11921197
healthCheckMetrics.updateRequestsSent(requestWrapper.messageSize);
@@ -1339,11 +1344,21 @@ private void cleanupConnectionAndRequests(boolean avoidBlocking) {
13391344
log.info("Append thread is done. Stream: " + streamName + " id: " + writerId);
13401345
}
13411346

1342-
private void throwIfWaitCallbackTooLong(Instant timeToCheck) {
1347+
private void throwIfWaitCallbackTooLong(AppendRequestAndResponse requestWrapper) {
1348+
Instant timeToCheck = requestWrapper.requestSendTimeStamp;
1349+
if (timeToCheck == null) {
1350+
return;
1351+
}
13431352
Duration milliSinceLastCallback = Duration.between(timeToCheck, Instant.now());
13441353
if (milliSinceLastCallback.compareTo(MAXIMUM_REQUEST_CALLBACK_WAIT_TIME) > 0) {
13451354
throw new Exceptions.MaximumRequestCallbackWaitTimeExceededException(
1346-
milliSinceLastCallback, writerId, MAXIMUM_REQUEST_CALLBACK_WAIT_TIME);
1355+
milliSinceLastCallback,
1356+
writerId,
1357+
MAXIMUM_REQUEST_CALLBACK_WAIT_TIME,
1358+
requestWrapper.requestReceivedTime,
1359+
requestWrapper.placedInWaitingQueueTime,
1360+
requestWrapper.placedInInflightQueueTime,
1361+
requestWrapper.dispatchTimes);
13471362
}
13481363
}
13491364

@@ -1824,6 +1839,11 @@ static final class AppendRequestAndResponse {
18241839
// If a response is no longer expected this is set back to null.
18251840
Instant requestSendTimeStamp;
18261841

1842+
final Instant requestReceivedTime;
1843+
Instant placedInWaitingQueueTime;
1844+
Instant placedInInflightQueueTime;
1845+
final List<Instant> dispatchTimes = new ArrayList<>();
1846+
18271847
AppendRequestAndResponse(
18281848
AppendRowsRequest message,
18291849
StreamWriter streamWriter,
@@ -1852,10 +1872,12 @@ static final class AppendRequestAndResponse {
18521872
this.retryAlgorithm = null;
18531873
}
18541874
this.recordBatchRowCount = recordBatchRowCount;
1875+
this.requestReceivedTime = Instant.now();
18551876
}
18561877

18571878
void setRequestSendQueueTime() {
18581879
requestSendTimeStamp = Instant.now();
1880+
dispatchTimes.add(requestSendTimeStamp);
18591881
}
18601882
}
18611883

java-bigquerystorage/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,16 @@
1616
package com.google.cloud.bigquery.storage.v1;
1717

1818
import com.google.api.gax.grpc.GrpcStatusCode;
19+
import com.google.common.collect.ImmutableList;
1920
import com.google.common.collect.ImmutableMap;
2021
import com.google.protobuf.Any;
2122
import com.google.protobuf.InvalidProtocolBufferException;
2223
import io.grpc.Status;
2324
import io.grpc.StatusRuntimeException;
2425
import io.grpc.protobuf.StatusProto;
2526
import java.time.Duration;
27+
import java.time.Instant;
28+
import java.util.List;
2629
import java.util.Map;
2730
import java.util.regex.Matcher;
2831
import java.util.regex.Pattern;
@@ -428,9 +431,25 @@ public static class MaximumRequestCallbackWaitTimeExceededException extends Runt
428431
private final Duration callbackWaitTime;
429432
private final String writerId;
430433
private final Duration callbackWaitTimeLimit;
434+
private final Instant requestReceivedTime;
435+
private final Instant placedInWaitingQueueTime;
436+
private final Instant placedInInflightQueueTime;
437+
private final ImmutableList<Instant> dispatchTimes;
431438

439+
@Deprecated
432440
public MaximumRequestCallbackWaitTimeExceededException(
433441
Duration callbackWaitTime, String writerId, Duration callbackWaitTimeLimit) {
442+
this(callbackWaitTime, writerId, callbackWaitTimeLimit, null, null, null, ImmutableList.of());
443+
}
444+
445+
public MaximumRequestCallbackWaitTimeExceededException(
446+
Duration callbackWaitTime,
447+
String writerId,
448+
Duration callbackWaitTimeLimit,
449+
@Nullable Instant requestReceivedTime,
450+
@Nullable Instant placedInWaitingQueueTime,
451+
@Nullable Instant placedInInflightQueueTime,
452+
@Nullable List<Instant> dispatchTimes) {
434453
super(
435454
String.format(
436455
"Request has waited in inflight queue for %sms for writer %s, "
@@ -439,6 +458,11 @@ public MaximumRequestCallbackWaitTimeExceededException(
439458
this.callbackWaitTime = callbackWaitTime;
440459
this.writerId = writerId;
441460
this.callbackWaitTimeLimit = callbackWaitTimeLimit;
461+
this.requestReceivedTime = requestReceivedTime;
462+
this.placedInWaitingQueueTime = placedInWaitingQueueTime;
463+
this.placedInInflightQueueTime = placedInInflightQueueTime;
464+
this.dispatchTimes =
465+
dispatchTimes == null ? ImmutableList.of() : ImmutableList.copyOf(dispatchTimes);
442466
}
443467

444468
public Duration getCallbackWaitTime() {
@@ -452,6 +476,25 @@ public String getWriterId() {
452476
public Duration getCallbackWaitTimeLimit() {
453477
return callbackWaitTimeLimit;
454478
}
479+
480+
@Nullable
481+
public Instant getRequestReceivedTime() {
482+
return requestReceivedTime;
483+
}
484+
485+
@Nullable
486+
public Instant getPlacedInWaitingQueueTime() {
487+
return placedInWaitingQueueTime;
488+
}
489+
490+
@Nullable
491+
public Instant getPlacedInInflightQueueTime() {
492+
return placedInInflightQueueTime;
493+
}
494+
495+
public ImmutableList<Instant> getDispatchTimes() {
496+
return dispatchTimes;
497+
}
455498
}
456499

457500
private Exceptions() {}

java-bigquerystorage/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1442,6 +1442,19 @@ void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws Exceptio
14421442
assertThat(ex.getCause()).hasMessageThat().contains("Request has waited in inflight queue");
14431443
assertThat(ex.getCause())
14441444
.isInstanceOf(Exceptions.MaximumRequestCallbackWaitTimeExceededException.class);
1445+
Exceptions.MaximumRequestCallbackWaitTimeExceededException mace =
1446+
(Exceptions.MaximumRequestCallbackWaitTimeExceededException) ex.getCause();
1447+
assertThat(mace.getRequestReceivedTime()).isNotNull();
1448+
assertThat(mace.getPlacedInWaitingQueueTime()).isNotNull();
1449+
assertThat(mace.getPlacedInInflightQueueTime()).isNotNull();
1450+
assertThat(mace.getDispatchTimes()).isNotNull();
1451+
assertThat(mace.getDispatchTimes()).isNotEmpty();
1452+
assertThat(mace.getRequestReceivedTime().isAfter(mace.getPlacedInWaitingQueueTime()))
1453+
.isFalse();
1454+
assertThat(mace.getPlacedInWaitingQueueTime().isAfter(mace.getPlacedInInflightQueueTime()))
1455+
.isFalse();
1456+
assertThat(mace.getPlacedInInflightQueueTime().isAfter(mace.getDispatchTimes().get(0)))
1457+
.isFalse();
14451458
} else {
14461459
assertThat(ex.getCause())
14471460
.hasMessageThat()

0 commit comments

Comments
 (0)