Skip to content

Commit 87e0a76

Browse files
fix: data race in ConnectionWorker.
RACE DESCRIPTION: Requests are added to inflightRequestQueue before actually being sent. A race occurs when a response arrives on a separate thread, triggering a retry process that clears inflightRequestQueue and nullifies requestSendTimeStamp. Simultaneously, the appendLoop thread might try to send a request from the localQueue, setting requestSendTimeStamp to the current time, leading to conflicting writes. FIX DESCRIPTION: The fix is to ensure only the appendLoop thread modifies requestSendTimeStamp. This thread confinement prevents conflicts with other threads, like the response thread. We can stop nullifying requestSendTimeStamp because appendLoop will always update it to the current time just before sending. The check for exceeding response timeouts happens earlier in the appendLoop, so this change does not cause issues.
1 parent ce4969b commit 87e0a76

1 file changed

Lines changed: 1 addition & 3 deletions

File tree

  • java-bigquerystorage/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1

java-bigquerystorage/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -878,7 +878,6 @@ private void appendLoop() {
878878
// Consider the backend latency as completed for the current request.
879879
requestProfilerHook.endOperation(
880880
RequestProfiler.OperationName.RESPONSE_LATENCY, requestWrapper.requestUniqueId);
881-
requestWrapper.requestSendTimeStamp = null;
882881
requestProfilerHook.startOperation(
883882
RequestProfiler.OperationName.WAIT_QUEUE, requestWrapper.requestUniqueId);
884883
waitingRequestQueue.addFirst(requestWrapper);
@@ -1454,7 +1453,6 @@ private void doneCallback(Throwable finalStatus) {
14541453
private AppendRequestAndResponse pollInflightRequestQueue(boolean pollLast) {
14551454
AppendRequestAndResponse requestWrapper =
14561455
pollLast ? inflightRequestQueue.pollLast() : inflightRequestQueue.poll();
1457-
requestWrapper.requestSendTimeStamp = null;
14581456
--this.inflightRequests;
14591457
this.inflightBytes -= requestWrapper.messageSize;
14601458
this.inflightReduced.signal();
@@ -1501,7 +1499,7 @@ static final class AppendRequestAndResponse {
15011499
long recordBatchRowCount = -1;
15021500

15031501
// Time at which request was last sent over the network.
1504-
// If a response is no longer expected this is set back to null.
1502+
// This is set ONLY by the appendLoop thread.
15051503
Instant requestSendTimeStamp;
15061504

15071505
AppendRequestAndResponse(

0 commit comments

Comments
 (0)