Skip to content

Commit 9b073c7

Browse files
authored
refactor(test): migrate remaining wait loops to Awaitility in GAX tests (#13484)
This PR migrates all remaining wait loops and polling blocks in GAX tests to use Awaitility. This is the second part of b/505479343, following the initial introduction of Awaitility in #13458. Specifically, this migrates: - Semaphore64Test: replaced mock thread wait loops with Awaitility state waiting. - ScheduledRetryingExecutorTest: migrated future attempt checking, attempt cancellation wait, and first attempt latch count down to Awaitility. - BatcherImplTest: converted GC loops and thread state polling blocks to Awaitility.
1 parent 3580407 commit 9b073c7

3 files changed

Lines changed: 99 additions & 96 deletions

File tree

sdk-platform-java/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java

Lines changed: 54 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import static com.google.api.gax.rpc.testing.FakeBatchableApi.callLabeledIntSquarer;
3434
import static com.google.common.truth.Truth.assertThat;
3535
import static com.google.common.truth.Truth.assertWithMessage;
36+
import static org.awaitility.Awaitility.await;
3637
import static org.junit.jupiter.api.Assertions.assertThrows;
3738
import static org.mockito.ArgumentMatchers.any;
3839
import static org.mockito.ArgumentMatchers.eq;
@@ -599,15 +600,14 @@ void testPushCurrentBatchRunnable() throws Exception {
599600
// Batcher present inside runnable should be GCed after following loop.
600601
batcher.close();
601602
batcher = null;
602-
for (int retry = 0; retry < 3; retry++) {
603-
System.gc();
604-
System.runFinalization();
605-
isExecutorCancelled = pushBatchRunnable.isCancelled();
606-
if (isExecutorCancelled) {
607-
break;
608-
}
609-
Thread.sleep(DELAY_TIME * (1L << retry));
610-
}
603+
await()
604+
.atMost(Duration.ofSeconds(5))
605+
.until(
606+
() -> {
607+
System.gc();
608+
System.runFinalization();
609+
return pushBatchRunnable.isCancelled();
610+
});
611611
// ScheduledFuture should be isCancelled now.
612612
assertThat(pushBatchRunnable.isCancelled()).isTrue();
613613
}
@@ -733,18 +733,14 @@ public void splitResponse(
733733
*/
734734
@Test
735735
void testUnclosedBatchersAreLogged() throws Exception {
736-
final long DELAY_TIME = 30L;
737-
int actualRemaining = 0;
738-
for (int retry = 0; retry < 3; retry++) {
739-
System.gc();
740-
System.runFinalization();
741-
actualRemaining = BatcherReference.cleanQueue();
742-
if (actualRemaining == 0) {
743-
break;
744-
}
745-
Thread.sleep(DELAY_TIME * (1L << retry));
746-
}
747-
assertThat(actualRemaining).isAtMost(0);
736+
await()
737+
.atMost(Duration.ofSeconds(5))
738+
.until(
739+
() -> {
740+
System.gc();
741+
System.runFinalization();
742+
return BatcherReference.cleanQueue() == 0;
743+
});
748744
underTest = createDefaultBatcherImpl(batchingSettings, null);
749745
Batcher<Integer, Integer> extraBatcher = createDefaultBatcherImpl(batchingSettings, null);
750746

@@ -771,20 +767,16 @@ public boolean isLoggable(LogRecord record) {
771767

772768
underTest = null;
773769
// That *should* have been the last reference. Try to reclaim it.
774-
boolean success = false;
775-
for (int retry = 0; retry < 3; retry++) {
776-
System.gc();
777-
System.runFinalization();
778-
int orphans = BatcherReference.cleanQueue();
779-
if (orphans == 1) {
780-
success = true;
781-
break;
782-
}
783-
// Validates that there are no other batcher instance present while GC cleanup.
784-
assertWithMessage("unexpected extra orphans").that(orphans).isEqualTo(0);
785-
Thread.sleep(DELAY_TIME * (1L << retry));
786-
}
787-
assertWithMessage("Batcher was not garbage collected").that(success).isTrue();
770+
await()
771+
.atMost(Duration.ofSeconds(5))
772+
.until(
773+
() -> {
774+
System.gc();
775+
System.runFinalization();
776+
int orphans = BatcherReference.cleanQueue();
777+
assertWithMessage("unexpected extra orphans").that(orphans).isAtMost(1);
778+
return orphans == 1;
779+
});
788780

789781
LogRecord lr;
790782
synchronized (records) {
@@ -807,18 +799,14 @@ public boolean isLoggable(LogRecord record) {
807799
@Test
808800
void testClosedBatchersAreNotLogged() throws Exception {
809801
// Clean out the existing instances
810-
final long DELAY_TIME = 30L;
811-
int actualRemaining = 0;
812-
for (int retry = 0; retry < 3; retry++) {
813-
System.gc();
814-
System.runFinalization();
815-
actualRemaining = BatcherReference.cleanQueue();
816-
if (actualRemaining == 0) {
817-
break;
818-
}
819-
Thread.sleep(DELAY_TIME * (1L << retry));
820-
}
821-
assertThat(actualRemaining).isAtMost(0);
802+
await()
803+
.atMost(Duration.ofSeconds(5))
804+
.until(
805+
() -> {
806+
System.gc();
807+
System.runFinalization();
808+
return BatcherReference.cleanQueue() == 0;
809+
});
822810

823811
// Capture logs
824812
final List<LogRecord> records = new ArrayList<>(1);
@@ -849,16 +837,19 @@ public boolean isLoggable(LogRecord record) {
849837
}
850838
}
851839
// Run GC a few times to give the batchers a chance to be collected
852-
for (int retry = 0; retry < 100; retry++) {
853-
System.gc();
854-
System.runFinalization();
855-
BatcherReference.cleanQueue();
856-
Thread.sleep(10);
857-
}
858-
859-
synchronized (records) {
860-
assertThat(records).isEmpty();
861-
}
840+
await()
841+
.pollInterval(Duration.ofMillis(10))
842+
.during(Duration.ofSeconds(1))
843+
.atMost(Duration.ofSeconds(5))
844+
.until(
845+
() -> {
846+
System.gc();
847+
System.runFinalization();
848+
BatcherReference.cleanQueue();
849+
synchronized (records) {
850+
return records.isEmpty();
851+
}
852+
});
862853
} finally {
863854
// reset logging
864855
batcherLogger.setFilter(oldFilter);
@@ -990,10 +981,12 @@ void testThrottlingBlocking() throws Exception {
990981
// resulting in a shorter total_throttled_time at the verification of throttledTime
991982
// at the end of the test.
992983
// https://github.com/googleapis/sdk-platform-java/issues/1193
993-
do {
994-
Thread.sleep(10);
995-
} while (batcherAddThreadHolder.isEmpty()
996-
|| batcherAddThreadHolder.get(0).getState() != Thread.State.WAITING);
984+
await()
985+
.atMost(Duration.ofSeconds(5))
986+
.until(
987+
() ->
988+
!batcherAddThreadHolder.isEmpty()
989+
&& batcherAddThreadHolder.get(0).getState() == Thread.State.WAITING);
997990

998991
long beforeGetCall = System.currentTimeMillis();
999992
executor.submit(

sdk-platform-java/gax-java/gax/src/test/java/com/google/api/gax/batching/Semaphore64Test.java

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,14 @@
2929
*/
3030
package com.google.api.gax.batching;
3131

32+
import static org.awaitility.Awaitility.await;
3233
import static org.junit.jupiter.api.Assertions.assertEquals;
3334
import static org.junit.jupiter.api.Assertions.assertFalse;
3435
import static org.junit.jupiter.api.Assertions.assertThrows;
3536
import static org.junit.jupiter.api.Assertions.assertTrue;
3637
import static org.junit.jupiter.api.Assertions.fail;
3738

39+
import java.time.Duration;
3840
import java.util.LinkedList;
3941
import java.util.List;
4042
import java.util.concurrent.TimeUnit;
@@ -68,8 +70,7 @@ void testBlocking() throws InterruptedException {
6870
Thread t = new Thread(() -> semaphore.acquire(1));
6971
t.start();
7072

71-
Thread.sleep(50);
72-
assertTrue(t.isAlive());
73+
await().atMost(Duration.ofSeconds(5)).until(() -> t.getState() == Thread.State.WAITING);
7374

7475
semaphore.release(1);
7576
t.join();
@@ -95,8 +96,7 @@ void testReducePermitLimitBlocking() throws InterruptedException {
9596
Thread t = new Thread(() -> semaphore.acquire(1));
9697
t.start();
9798

98-
Thread.sleep(50);
99-
assertTrue(t.isAlive());
99+
await().atMost(Duration.ofSeconds(5)).until(() -> t.getState() == Thread.State.WAITING);
100100

101101
semaphore.release(1);
102102
t.join();
@@ -124,17 +124,15 @@ void testAcquirePartialBlocking() throws Exception {
124124
Thread t1 = new Thread(() -> semaphore.acquire(1));
125125
t1.start();
126126
// wait for thread to start
127-
Thread.sleep(100);
128-
assertTrue(t1.isAlive());
127+
await().atMost(Duration.ofSeconds(5)).until(() -> t1.getState() == Thread.State.WAITING);
129128
semaphore.release(6);
130129
t1.join();
131130

132131
// now there should be 4 permits available, acquiring 6 should block
133132
Thread t2 = new Thread(() -> semaphore.acquirePartial(6));
134133
t2.start();
135134
// wait fo thread to start
136-
Thread.sleep(100);
137-
assertTrue(t2.isAlive());
135+
await().atMost(Duration.ofSeconds(5)).until(() -> t2.getState() == Thread.State.WAITING);
138136
// limit should still be 5 and get limit should not block
139137
assertEquals(5, semaphore.getPermitLimit());
140138
}
@@ -158,8 +156,7 @@ void testIncreasePermitLimitBlocking() throws Exception {
158156
Thread t = new Thread(() -> semaphore.acquire(1));
159157
t.start();
160158

161-
Thread.sleep(50);
162-
assertTrue(t.isAlive());
159+
await().atMost(Duration.ofSeconds(5)).until(() -> t.getState() == Thread.State.WAITING);
163160

164161
semaphore.increasePermitLimit(1);
165162
t.join();
@@ -208,8 +205,7 @@ void testReleaseWontOverflowBlocking() throws Exception {
208205
semaphore.release(10);
209206
Thread t = new Thread(() -> semaphore.acquire(11));
210207
t.start();
211-
Thread.sleep(100);
212-
assertTrue(t.isAlive());
208+
await().atMost(Duration.ofSeconds(5)).until(() -> t.getState() == Thread.State.WAITING);
213209
}
214210

215211
@Test
@@ -239,7 +235,6 @@ void testPermitLimitUnderflowBlocking() throws Exception {
239235
semaphore.release(10);
240236
Thread t = new Thread(() -> semaphore.acquire(11));
241237
t.start();
242-
Thread.sleep(100);
243-
assertTrue(t.isAlive());
238+
await().atMost(Duration.ofSeconds(5)).until(() -> t.getState() == Thread.State.WAITING);
244239
}
245240
}

sdk-platform-java/gax-java/gax/src/test/java/com/google/api/gax/retrying/ScheduledRetryingExecutorTest.java

Lines changed: 36 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
package com.google.api.gax.retrying;
3131

3232
import static com.google.api.gax.retrying.FailingCallable.FAST_RETRY_SETTINGS;
33+
import static org.awaitility.Awaitility.await;
3334
import static org.junit.jupiter.api.Assertions.assertEquals;
3435
import static org.junit.jupiter.api.Assertions.assertFalse;
3536
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -41,12 +42,15 @@
4142
import com.google.api.core.NanoClock;
4243
import com.google.api.gax.retrying.FailingCallable.CustomException;
4344
import com.google.api.gax.rpc.testing.FakeCallContext;
45+
import java.time.Duration;
4446
import java.util.concurrent.CancellationException;
4547
import java.util.concurrent.ExecutionException;
4648
import java.util.concurrent.Executors;
4749
import java.util.concurrent.Future;
4850
import java.util.concurrent.RejectedExecutionException;
4951
import java.util.concurrent.ScheduledExecutorService;
52+
import java.util.concurrent.atomic.AtomicInteger;
53+
import java.util.concurrent.atomic.AtomicReference;
5054
import org.junit.jupiter.api.Test;
5155
import org.mockito.Mockito;
5256

@@ -99,26 +103,32 @@ void testSuccessWithFailuresPeekAttempt() throws Exception {
99103

100104
future.setAttemptFuture(executor.submit(future));
101105

102-
int failedAttempts = 0;
103-
while (!future.isDone()) {
104-
ApiFuture<String> attemptResult = future.peekAttemptResult();
105-
if (attemptResult != null) {
106-
assertTrue(attemptResult.isDone());
107-
assertFalse(attemptResult.isCancelled());
108-
try {
109-
attemptResult.get();
110-
} catch (ExecutionException e) {
111-
if (e.getCause() instanceof CustomException) {
112-
failedAttempts++;
113-
}
114-
}
115-
}
116-
Thread.sleep(0L, 100);
117-
}
106+
final AtomicInteger failedAttempts = new AtomicInteger(0);
107+
final AtomicReference<ApiFuture<String>> lastSeenAttempt = new AtomicReference<>();
108+
await()
109+
.pollInterval(Duration.ofMillis(2))
110+
.atMost(Duration.ofSeconds(5))
111+
.until(
112+
() -> {
113+
ApiFuture<String> attemptResult = future.peekAttemptResult();
114+
if (attemptResult != null && attemptResult != lastSeenAttempt.get()) {
115+
lastSeenAttempt.set(attemptResult);
116+
assertTrue(attemptResult.isDone());
117+
assertFalse(attemptResult.isCancelled());
118+
try {
119+
attemptResult.get();
120+
} catch (ExecutionException e) {
121+
if (e.getCause() instanceof CustomException) {
122+
failedAttempts.incrementAndGet();
123+
}
124+
}
125+
}
126+
return future.isDone();
127+
});
118128

119129
assertFutureSuccess(future);
120130
assertEquals(15, future.getAttemptSettings().getAttemptCount());
121-
assertTrue(failedAttempts > 0);
131+
assertTrue(failedAttempts.get() > 0);
122132
}
123133
}
124134

@@ -260,9 +270,12 @@ void testCancelOuterFutureAfterStart() throws Exception {
260270
callable.setExternalFuture(future);
261271
future.setAttemptFuture(executor.submit(future));
262272

263-
// The test sleeps a duration long enough to ensure that the future has been submitted for
264-
// execution
265-
Thread.sleep(150L);
273+
await()
274+
.atMost(Duration.ofSeconds(5))
275+
.until(
276+
() ->
277+
future.getAttemptSettings() != null
278+
&& future.getAttemptSettings().getAttemptCount() > 0);
266279

267280
boolean res = future.cancel(false);
268281
assertTrue(res);
@@ -302,7 +315,9 @@ void testCancelProxiedFutureAfterStart() throws Exception {
302315
callable.setExternalFuture(future);
303316
future.setAttemptFuture(executor.submit(future));
304317

305-
Thread.sleep(50L);
318+
await()
319+
.atMost(Duration.ofSeconds(5))
320+
.until(() -> callable.getFirstAttemptFinishedLatch().getCount() == 0);
306321

307322
// Note that shutdownNow() will not cancel internal FutureTasks automatically, which
308323
// may potentially cause another thread handing on RetryingFuture#get() call forever.

0 commit comments

Comments
 (0)