Skip to content

Commit 0d1d8f6

Browse files
authored
test(datastore): Retry QuerySplitter's getSplits for flaky calls (#13390)
Fixes: #11981
1 parent 62bbaa1 commit 0d1d8f6

4 files changed

Lines changed: 208 additions & 12 deletions

File tree

java-datastore/datastore-v1-proto-client/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,11 @@
107107
</exclusion>
108108
</exclusions>
109109
</dependency>
110+
<dependency>
111+
<groupId>com.google.api</groupId>
112+
<artifactId>gax</artifactId>
113+
<scope>test</scope>
114+
</dependency>
110115
</dependencies>
111116

112117
<build>

java-datastore/datastore-v1-proto-client/src/test/java/com/google/datastore/v1/client/it/ITDatastoreProtoClientTest.java

Lines changed: 99 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,16 @@
1818
import static com.google.datastore.v1.client.DatastoreHelper.makeFilter;
1919
import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
2020

21+
import com.google.api.core.ApiClock;
22+
import com.google.api.core.ApiFuture;
23+
import com.google.api.core.NanoClock;
24+
import com.google.api.gax.retrying.BasicResultRetryAlgorithm;
25+
import com.google.api.gax.retrying.DirectRetryingExecutor;
26+
import com.google.api.gax.retrying.ExponentialRetryAlgorithm;
27+
import com.google.api.gax.retrying.ResultRetryAlgorithmWithContext;
28+
import com.google.api.gax.retrying.RetryAlgorithm;
29+
import com.google.api.gax.retrying.RetrySettings;
30+
import com.google.api.gax.retrying.RetryingFuture;
2131
import com.google.common.truth.Truth;
2232
import com.google.datastore.v1.Filter;
2333
import com.google.datastore.v1.KindExpression;
@@ -27,9 +37,13 @@
2737
import com.google.datastore.v1.client.Datastore;
2838
import com.google.datastore.v1.client.DatastoreException;
2939
import com.google.datastore.v1.client.DatastoreHelper;
40+
import com.google.rpc.Code;
3041
import java.io.IOException;
3142
import java.security.GeneralSecurityException;
43+
import java.time.Duration;
3244
import java.util.List;
45+
import java.util.concurrent.Callable;
46+
import java.util.concurrent.ExecutionException;
3347
import org.junit.Before;
3448
import org.junit.Test;
3549

@@ -48,7 +62,7 @@ public void setUp() throws GeneralSecurityException, IOException {
4862
}
4963

5064
@Test
51-
public void testQuerySplitterWithDefaultDb() throws DatastoreException {
65+
public void testQuerySplitterWithDefaultDb() throws Exception {
5266
Filter propertyFilter =
5367
makeFilter("foo", PropertyFilter.Operator.EQUAL, makeValue("value")).build();
5468
Query query =
@@ -59,8 +73,7 @@ public void testQuerySplitterWithDefaultDb() throws DatastoreException {
5973

6074
PARTITION = PartitionId.newBuilder().setProjectId(PROJECT_ID).build();
6175

62-
List<Query> splits =
63-
DatastoreHelper.getQuerySplitter().getSplits(query, PARTITION, 2, DATASTORE);
76+
List<Query> splits = getSplitsWithRetry(query, PARTITION, 2, DATASTORE);
6477
Truth.assertThat(splits).isNotEmpty();
6578
splits.forEach(
6679
split -> {
@@ -70,7 +83,7 @@ public void testQuerySplitterWithDefaultDb() throws DatastoreException {
7083
}
7184

7285
@Test
73-
public void testQuerySplitterWithDb() throws DatastoreException {
86+
public void testQuerySplitterWithDb() throws Exception {
7487
Filter propertyFilter =
7588
makeFilter("foo", PropertyFilter.Operator.EQUAL, makeValue("value")).build();
7689
Query query =
@@ -81,8 +94,7 @@ public void testQuerySplitterWithDb() throws DatastoreException {
8194

8295
PARTITION = PartitionId.newBuilder().setProjectId(PROJECT_ID).setDatabaseId("test-db").build();
8396

84-
List<Query> splits =
85-
DatastoreHelper.getQuerySplitter().getSplits(query, PARTITION, 2, DATASTORE);
97+
List<Query> splits = getSplitsWithRetry(query, PARTITION, 2, DATASTORE);
8698

8799
Truth.assertThat(splits).isNotEmpty();
88100
splits.forEach(
@@ -91,4 +103,85 @@ public void testQuerySplitterWithDb() throws DatastoreException {
91103
Truth.assertThat(split.getFilter()).isEqualTo(propertyFilter);
92104
});
93105
}
106+
107+
/**
108+
* A generic helper method that executes a {@link Callable} with retries using the GAX retrying
109+
* framework.
110+
*
111+
* <p>It configures a {@link DirectRetryingExecutor} with the provided {@link RetrySettings} and
112+
* the custom {@link ResultRetryAlgorithmWithContext}.
113+
*
114+
* @param callable the action to execute
115+
* @param retrySettings the retry configuration (backoff, max attempts, timeouts)
116+
* @param resultRetryAlgorithm the algorithm to determine if a failed attempt should be retried
117+
* @return the result of the callable execution
118+
* @throws Exception if the execution fails after all retry attempts.
119+
*/
120+
private static <V> V runWithRetry(
121+
Callable<V> callable,
122+
RetrySettings retrySettings,
123+
ResultRetryAlgorithmWithContext<V> resultRetryAlgorithm)
124+
throws Exception {
125+
ApiClock clock = NanoClock.getDefaultClock();
126+
// We must wrap the result algorithm and timed algorithm into a RetryAlgorithm
127+
// as required by DirectRetryingExecutor.
128+
RetryAlgorithm<V> retryAlgorithm =
129+
new RetryAlgorithm<>(
130+
resultRetryAlgorithm, new ExponentialRetryAlgorithm(retrySettings, clock));
131+
132+
DirectRetryingExecutor<V> executor = new DirectRetryingExecutor<>(retryAlgorithm);
133+
RetryingFuture<V> future = executor.createFuture(callable);
134+
135+
ApiFuture<V> submittedFuture = executor.submit(future);
136+
137+
try {
138+
return submittedFuture.get();
139+
} catch (ExecutionException e) {
140+
Throwable cause = e.getCause();
141+
// submittedFuture.get() wraps any exception thrown during execution in an ExecutionException.
142+
// We unwrap and rethrow the actual cause (Exception or Error) directly so that test failures
143+
// report the root cause (e.g., DatastoreException or AssertionError) instead of the wrapper.
144+
if (cause instanceof Exception) {
145+
throw (Exception) cause;
146+
}
147+
if (cause instanceof Error) {
148+
throw (Error) cause;
149+
}
150+
throw e;
151+
} catch (InterruptedException e) {
152+
// Restore the interrupted status before rethrowing, as per Java concurrency best practices.
153+
Thread.currentThread().interrupt();
154+
throw e;
155+
}
156+
}
157+
158+
// This low-level Datastore client (proto-over-HTTP) does not have built-in retry logic
159+
// (unlike the high-level google-cloud-datastore gRPC client). We must explicitly retry
160+
// here to handle transient backend errors (such as Code.INTERNAL auth issues).
161+
// We reuse GAX retrying utilities here in the test to implement this backoff/retry.
162+
private static List<Query> getSplitsWithRetry(
163+
Query query, PartitionId partition, int numSplits, Datastore datastore) throws Exception {
164+
// Fail fast configuration to avoid long wait times during test failures
165+
RetrySettings retrySettings =
166+
RetrySettings.newBuilder()
167+
.setMaxAttempts(3)
168+
.setInitialRetryDelayDuration(Duration.ofMillis(200))
169+
.setRetryDelayMultiplier(1.5)
170+
.setMaxRetryDelayDuration(Duration.ofMillis(500))
171+
.setTotalTimeoutDuration(Duration.ofSeconds(2))
172+
.build();
173+
return runWithRetry(
174+
() -> DatastoreHelper.getQuerySplitter().getSplits(query, partition, numSplits, datastore),
175+
retrySettings,
176+
new BasicResultRetryAlgorithm<List<Query>>() {
177+
@Override
178+
public boolean shouldRetry(Throwable prevThrowable, List<Query> prevResult) {
179+
if (prevThrowable instanceof DatastoreException) {
180+
DatastoreException de = (DatastoreException) prevThrowable;
181+
return de.getCode() == Code.INTERNAL;
182+
}
183+
return false;
184+
}
185+
});
186+
}
94187
}

java-datastore/google-cloud-datastore-utils/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,11 @@
8484
</exclusion>
8585
</exclusions>
8686
</dependency>
87+
<dependency>
88+
<groupId>com.google.api</groupId>
89+
<artifactId>gax</artifactId>
90+
<scope>test</scope>
91+
</dependency>
8792
</dependencies>
8893

8994
<build>

java-datastore/google-cloud-datastore-utils/src/test/java/com/google/datastore/utils/it/ITDatastoreProtoClientTest.java

Lines changed: 99 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,28 @@
1818
import static com.google.datastore.utils.DatastoreHelper.makeFilter;
1919
import static com.google.datastore.utils.DatastoreHelper.makeValue;
2020

21+
import com.google.api.core.ApiClock;
22+
import com.google.api.core.ApiFuture;
23+
import com.google.api.core.NanoClock;
24+
import com.google.api.gax.retrying.BasicResultRetryAlgorithm;
25+
import com.google.api.gax.retrying.DirectRetryingExecutor;
26+
import com.google.api.gax.retrying.ExponentialRetryAlgorithm;
27+
import com.google.api.gax.retrying.ResultRetryAlgorithmWithContext;
28+
import com.google.api.gax.retrying.RetryAlgorithm;
29+
import com.google.api.gax.retrying.RetrySettings;
30+
import com.google.api.gax.retrying.RetryingFuture;
2131
import com.google.common.truth.Truth;
2232
import com.google.datastore.utils.Datastore;
2333
import com.google.datastore.utils.DatastoreException;
2434
import com.google.datastore.utils.DatastoreHelper;
2535
import com.google.datastore.v1.*;
36+
import com.google.rpc.Code;
2637
import java.io.IOException;
2738
import java.security.GeneralSecurityException;
39+
import java.time.Duration;
2840
import java.util.List;
41+
import java.util.concurrent.Callable;
42+
import java.util.concurrent.ExecutionException;
2943
import org.junit.Before;
3044
import org.junit.Test;
3145

@@ -44,7 +58,7 @@ public void setUp() throws GeneralSecurityException, IOException {
4458
}
4559

4660
@Test
47-
public void testQuerySplitterWithDefaultDb() throws DatastoreException {
61+
public void testQuerySplitterWithDefaultDb() throws Exception {
4862
Filter propertyFilter =
4963
makeFilter("foo", PropertyFilter.Operator.EQUAL, makeValue("value")).build();
5064
Query query =
@@ -55,8 +69,7 @@ public void testQuerySplitterWithDefaultDb() throws DatastoreException {
5569

5670
PARTITION = PartitionId.newBuilder().setProjectId(PROJECT_ID).build();
5771

58-
List<Query> splits =
59-
DatastoreHelper.getQuerySplitter().getSplits(query, PARTITION, 2, DATASTORE);
72+
List<Query> splits = getSplitsWithRetry(query, PARTITION, 2, DATASTORE);
6073
Truth.assertThat(splits).isNotEmpty();
6174
splits.forEach(
6275
split -> {
@@ -66,7 +79,7 @@ public void testQuerySplitterWithDefaultDb() throws DatastoreException {
6679
}
6780

6881
@Test
69-
public void testQuerySplitterWithDb() throws DatastoreException {
82+
public void testQuerySplitterWithDb() throws Exception {
7083
Filter propertyFilter =
7184
makeFilter("foo", PropertyFilter.Operator.EQUAL, makeValue("value")).build();
7285
Query query =
@@ -77,8 +90,7 @@ public void testQuerySplitterWithDb() throws DatastoreException {
7790

7891
PARTITION = PartitionId.newBuilder().setProjectId(PROJECT_ID).setDatabaseId("test-db").build();
7992

80-
List<Query> splits =
81-
DatastoreHelper.getQuerySplitter().getSplits(query, PARTITION, 2, DATASTORE);
93+
List<Query> splits = getSplitsWithRetry(query, PARTITION, 2, DATASTORE);
8294

8395
Truth.assertThat(splits).isNotEmpty();
8496
splits.forEach(
@@ -87,4 +99,85 @@ public void testQuerySplitterWithDb() throws DatastoreException {
8799
Truth.assertThat(split.getFilter()).isEqualTo(propertyFilter);
88100
});
89101
}
102+
103+
/**
104+
* A generic helper method that executes a {@link Callable} with retries using the GAX retrying
105+
* framework.
106+
*
107+
* <p>It configures a {@link DirectRetryingExecutor} with the provided {@link RetrySettings} and
108+
* the custom {@link ResultRetryAlgorithmWithContext}.
109+
*
110+
* @param callable the action to execute
111+
* @param retrySettings the retry configuration (backoff, max attempts, timeouts)
112+
* @param resultRetryAlgorithm the algorithm to determine if a failed attempt should be retried
113+
* @return the result of the callable execution
114+
* @throws Exception if the execution fails after all retry attempts.
115+
*/
116+
private static <V> V runWithRetry(
117+
Callable<V> callable,
118+
RetrySettings retrySettings,
119+
ResultRetryAlgorithmWithContext<V> resultRetryAlgorithm)
120+
throws Exception {
121+
ApiClock clock = NanoClock.getDefaultClock();
122+
// We must wrap the result algorithm and timed algorithm into a RetryAlgorithm
123+
// as required by DirectRetryingExecutor.
124+
RetryAlgorithm<V> retryAlgorithm =
125+
new RetryAlgorithm<>(
126+
resultRetryAlgorithm, new ExponentialRetryAlgorithm(retrySettings, clock));
127+
128+
DirectRetryingExecutor<V> executor = new DirectRetryingExecutor<>(retryAlgorithm);
129+
RetryingFuture<V> future = executor.createFuture(callable);
130+
131+
ApiFuture<V> submittedFuture = executor.submit(future);
132+
133+
try {
134+
return submittedFuture.get();
135+
} catch (ExecutionException e) {
136+
Throwable cause = e.getCause();
137+
// submittedFuture.get() wraps any exception thrown during execution in an ExecutionException.
138+
// We unwrap and rethrow the actual cause (Exception or Error) directly so that test failures
139+
// report the root cause (e.g., DatastoreException or AssertionError) instead of the wrapper.
140+
if (cause instanceof Exception) {
141+
throw (Exception) cause;
142+
}
143+
if (cause instanceof Error) {
144+
throw (Error) cause;
145+
}
146+
throw e;
147+
} catch (InterruptedException e) {
148+
// Restore the interrupted status before rethrowing, as per Java concurrency best practices.
149+
Thread.currentThread().interrupt();
150+
throw e;
151+
}
152+
}
153+
154+
// This low-level Datastore client (proto-over-HTTP) does not have built-in retry logic
155+
// (unlike the high-level google-cloud-datastore gRPC client). We must explicitly retry
156+
// here to handle transient backend errors (such as Code.INTERNAL auth issues).
157+
// We reuse GAX retrying utilities here in the test to implement this backoff/retry.
158+
private static List<Query> getSplitsWithRetry(
159+
Query query, PartitionId partition, int numSplits, Datastore datastore) throws Exception {
160+
// Fail fast configuration to avoid long wait times during test failures
161+
RetrySettings retrySettings =
162+
RetrySettings.newBuilder()
163+
.setMaxAttempts(3)
164+
.setInitialRetryDelayDuration(Duration.ofMillis(200))
165+
.setRetryDelayMultiplier(1.5)
166+
.setMaxRetryDelayDuration(Duration.ofMillis(500))
167+
.setTotalTimeoutDuration(Duration.ofSeconds(2))
168+
.build();
169+
return runWithRetry(
170+
() -> DatastoreHelper.getQuerySplitter().getSplits(query, partition, numSplits, datastore),
171+
retrySettings,
172+
new BasicResultRetryAlgorithm<List<Query>>() {
173+
@Override
174+
public boolean shouldRetry(Throwable prevThrowable, List<Query> prevResult) {
175+
if (prevThrowable instanceof DatastoreException) {
176+
DatastoreException de = (DatastoreException) prevThrowable;
177+
return de.getCode() == Code.INTERNAL;
178+
}
179+
return false;
180+
}
181+
});
182+
}
90183
}

0 commit comments

Comments
 (0)