3838import static org .junit .jupiter .api .Assertions .assertSame ;
3939import static org .junit .jupiter .api .Assertions .assertTrue ;
4040
41+ import com .google .api .core .ApiClock ;
4142import com .google .api .core .ApiFuture ;
4243import com .google .api .core .NanoClock ;
44+ import com .google .api .gax .core .FakeApiClock ;
45+ import com .google .api .gax .core .RecordingScheduler ;
4346import com .google .api .gax .retrying .FailingCallable .CustomException ;
4447import com .google .api .gax .rpc .testing .FakeCallContext ;
4548import java .time .Duration ;
5154import java .util .concurrent .ScheduledExecutorService ;
5255import java .util .concurrent .atomic .AtomicInteger ;
5356import java .util .concurrent .atomic .AtomicReference ;
57+ import org .junit .jupiter .api .BeforeEach ;
5458import org .junit .jupiter .api .Test ;
5559import org .mockito .Mockito ;
5660
5761class ScheduledRetryingExecutorTest extends AbstractRetryingExecutorTest {
5862
5963 // Number of test runs, essential for multithreaded tests.
6064 private static final int EXECUTIONS_COUNT = 5 ;
65+ private FakeApiClock fakeClock ;
66+
67+ @ BeforeEach
68+ void setUp () {
69+ fakeClock = new FakeApiClock (0L );
70+ scheduledExecutorService = RecordingScheduler .create (fakeClock );
71+ }
6172
6273 @ Override
6374 protected RetryingExecutorWithContext <String > getExecutor (RetryAlgorithm <String > retryAlgorithm ) {
@@ -67,9 +78,17 @@ protected RetryingExecutorWithContext<String> getExecutor(RetryAlgorithm<String>
6778 @ Override
6879 protected RetryAlgorithm <String > getAlgorithm (
6980 RetrySettings retrySettings , int apocalypseCountDown , RuntimeException apocalypseException ) {
81+ return getAlgorithm (retrySettings , apocalypseCountDown , apocalypseException , fakeClock );
82+ }
83+
84+ protected RetryAlgorithm <String > getAlgorithm (
85+ RetrySettings retrySettings ,
86+ int apocalypseCountDown ,
87+ RuntimeException apocalypseException ,
88+ ApiClock clock ) {
7089 return new RetryAlgorithm <>(
7190 new TestResultRetryAlgorithm <String >(apocalypseCountDown , apocalypseException ),
72- new ExponentialRetryAlgorithm (retrySettings , NanoClock . getDefaultClock () ));
91+ new ExponentialRetryAlgorithm (retrySettings , clock ));
7392 }
7493
7594 private RetryingExecutorWithContext <String > getRetryingExecutor (
@@ -81,54 +100,60 @@ private RetryingExecutorWithContext<String> getRetryingExecutor(
81100 void testSuccessWithFailuresPeekAttempt () throws Exception {
82101 RetrySettings retrySettings =
83102 FAST_RETRY_SETTINGS .toBuilder ()
84- .setTotalTimeoutDuration (java .time .Duration .ofMillis (1000L ))
103+ .setTotalTimeoutDuration (java .time .Duration .ofMillis (10000L ))
85104 .setMaxAttempts (100 )
86105 .build ();
87- for (int executionsCount = 0 ; executionsCount < EXECUTIONS_COUNT ; executionsCount ++) {
88-
89- FailingCallable callable = new FailingCallable (15 , "request" , "SUCCESS" , tracer );
90-
91- RetryingExecutorWithContext <String > executor =
92- getRetryingExecutor (getAlgorithm (retrySettings , 0 , null ), scheduledExecutorService );
93- RetryingFuture <String > future =
94- executor .createFuture (
95- callable ,
96- FakeCallContext .createDefault ().withTracer (tracer ).withRetrySettings (retrySettings ));
97- callable .setExternalFuture (future );
98-
99- assertNull (future .peekAttemptResult ());
100- assertSame (future .peekAttemptResult (), future .peekAttemptResult ());
101- assertFalse (future .getAttemptResult ().isDone ());
102- assertFalse (future .getAttemptResult ().isCancelled ());
103-
104- future .setAttemptFuture (executor .submit (future ));
105-
106- final AtomicInteger failedAttempts = new AtomicInteger (0 );
107- final AtomicReference <ApiFuture <String >> lastSeenAttempt = new AtomicReference <>();
108- await ()
109- .pollInterval (Duration .ofMillis (2 ))
110- .atMost (Duration .ofSeconds (5 ))
111- .until (
112- () -> {
113- ApiFuture <String > attemptResult = future .peekAttemptResult ();
114- if (attemptResult != null && attemptResult != lastSeenAttempt .get ()) {
115- lastSeenAttempt .set (attemptResult );
116- assertTrue (attemptResult .isDone ());
117- assertFalse (attemptResult .isCancelled ());
118- try {
119- attemptResult .get ();
120- } catch (ExecutionException e ) {
121- if (e .getCause () instanceof CustomException ) {
122- failedAttempts .incrementAndGet ();
106+ ScheduledExecutorService localExecutor = Executors .newSingleThreadScheduledExecutor ();
107+ try {
108+ for (int executionsCount = 0 ; executionsCount < EXECUTIONS_COUNT ; executionsCount ++) {
109+
110+ FailingCallable callable = new FailingCallable (15 , "request" , "SUCCESS" , tracer );
111+
112+ RetryingExecutorWithContext <String > executor =
113+ getRetryingExecutor (
114+ getAlgorithm (retrySettings , 0 , null , NanoClock .getDefaultClock ()), localExecutor );
115+ RetryingFuture <String > future =
116+ executor .createFuture (
117+ callable ,
118+ FakeCallContext .createDefault ().withTracer (tracer ).withRetrySettings (retrySettings ));
119+ callable .setExternalFuture (future );
120+
121+ assertNull (future .peekAttemptResult ());
122+ assertSame (future .peekAttemptResult (), future .peekAttemptResult ());
123+ assertFalse (future .getAttemptResult ().isDone ());
124+ assertFalse (future .getAttemptResult ().isCancelled ());
125+
126+ future .setAttemptFuture (executor .submit (future ));
127+
128+ final AtomicInteger failedAttempts = new AtomicInteger (0 );
129+ final AtomicReference <ApiFuture <String >> lastSeenAttempt = new AtomicReference <>();
130+ await ()
131+ .pollInterval (Duration .ofMillis (2 ))
132+ .atMost (Duration .ofSeconds (5 ))
133+ .until (
134+ () -> {
135+ ApiFuture <String > attemptResult = future .peekAttemptResult ();
136+ if (attemptResult != null && attemptResult != lastSeenAttempt .get ()) {
137+ lastSeenAttempt .set (attemptResult );
138+ assertTrue (attemptResult .isDone ());
139+ assertFalse (attemptResult .isCancelled ());
140+ try {
141+ attemptResult .get ();
142+ } catch (ExecutionException e ) {
143+ if (e .getCause () instanceof CustomException ) {
144+ failedAttempts .incrementAndGet ();
145+ }
123146 }
124147 }
125- }
126- return future .isDone ();
127- });
148+ return future .isDone ();
149+ });
128150
129- assertFutureSuccess (future );
130- assertEquals (15 , future .getAttemptSettings ().getAttemptCount ());
131- assertTrue (failedAttempts .get () > 0 );
151+ assertFutureSuccess (future );
152+ assertEquals (15 , future .getAttemptSettings ().getAttemptCount ());
153+ assertTrue (failedAttempts .get () > 0 );
154+ }
155+ } finally {
156+ localExecutor .shutdownNow ();
132157 }
133158 }
134159
@@ -259,36 +284,42 @@ void testCancelOuterFutureAfterStart() throws Exception {
259284 // tiny RRD value is small, but not impossible.
260285 .setJittered (false )
261286 .build ();
262- for (int executionsCount = 0 ; executionsCount < EXECUTIONS_COUNT ; executionsCount ++) {
263- FailingCallable callable = new FailingCallable (4 , "request" , "SUCCESS" , tracer );
264- RetryingExecutorWithContext <String > executor =
265- getRetryingExecutor (getAlgorithm (retrySettings , 0 , null ), scheduledExecutorService );
266- RetryingFuture <String > future =
267- executor .createFuture (
268- callable ,
269- FakeCallContext .createDefault ().withTracer (tracer ).withRetrySettings (retrySettings ));
270- callable .setExternalFuture (future );
271- future .setAttemptFuture (executor .submit (future ));
272-
273- await ()
274- .atMost (Duration .ofSeconds (5 ))
275- .until (
276- () ->
277- future .getAttemptSettings () != null
278- && future .getAttemptSettings ().getAttemptCount () > 0 );
279-
280- boolean res = future .cancel (false );
281- assertTrue (res );
282- assertFutureCancel (future );
283-
284- // Verify that the cancelled future is traced. Every attempt increases the number
285- // of cancellation attempts from the tracer.
286- Mockito .verify (tracer , Mockito .times (executionsCount + 1 )).attemptCancelled ();
287-
288- // Assert that future has at least been attempted once
289- // i.e. The future from executor.submit() has been run by the ScheduledExecutor
290- assertTrue (future .getAttemptSettings ().getAttemptCount () > 0 );
291- assertTrue (future .getAttemptSettings ().getAttemptCount () < 4 );
287+ ScheduledExecutorService localExecutor = Executors .newSingleThreadScheduledExecutor ();
288+ try {
289+ for (int executionsCount = 0 ; executionsCount < EXECUTIONS_COUNT ; executionsCount ++) {
290+ FailingCallable callable = new FailingCallable (4 , "request" , "SUCCESS" , tracer );
291+ RetryingExecutorWithContext <String > executor =
292+ getRetryingExecutor (
293+ getAlgorithm (retrySettings , 0 , null , NanoClock .getDefaultClock ()), localExecutor );
294+ RetryingFuture <String > future =
295+ executor .createFuture (
296+ callable ,
297+ FakeCallContext .createDefault ().withTracer (tracer ).withRetrySettings (retrySettings ));
298+ callable .setExternalFuture (future );
299+ future .setAttemptFuture (executor .submit (future ));
300+
301+ await ()
302+ .atMost (Duration .ofSeconds (5 ))
303+ .until (
304+ () ->
305+ future .getAttemptSettings () != null
306+ && future .getAttemptSettings ().getAttemptCount () > 0 );
307+
308+ boolean res = future .cancel (false );
309+ assertTrue (res );
310+ assertFutureCancel (future );
311+
312+ // Verify that the cancelled future is traced. Every attempt increases the number
313+ // of cancellation attempts from the tracer.
314+ Mockito .verify (tracer , Mockito .times (executionsCount + 1 )).attemptCancelled ();
315+
316+ // Assert that future has at least been attempted once
317+ // i.e. The future from executor.submit() has been run by the ScheduledExecutor
318+ assertTrue (future .getAttemptSettings ().getAttemptCount () > 0 );
319+ assertTrue (future .getAttemptSettings ().getAttemptCount () < 4 );
320+ }
321+ } finally {
322+ localExecutor .shutdownNow ();
292323 }
293324 }
294325
0 commit comments