|
19 | 19 |
|
20 | 20 | import java.util.Arrays; |
21 | 21 | import java.util.List; |
22 | | -import java.util.concurrent.*; |
| 22 | +import java.util.concurrent.CountDownLatch; |
| 23 | +import java.util.concurrent.ExecutionException; |
| 24 | +import java.util.concurrent.Future; |
| 25 | +import java.util.concurrent.TimeUnit; |
| 26 | +import java.util.concurrent.TimeoutException; |
| 27 | +import java.util.concurrent.atomic.AtomicBoolean; |
| 28 | +import java.util.concurrent.atomic.AtomicReference; |
23 | 29 | import java.util.function.Consumer; |
24 | 30 | import java.util.stream.Collectors; |
25 | 31 |
|
26 | 32 | import org.apache.commons.lang3.AbstractLangTest; |
27 | 33 | import org.apache.commons.lang3.exception.UncheckedInterruptedException; |
28 | | -import org.junit.jupiter.api.RepeatedTest; |
29 | 34 | import org.junit.jupiter.api.Test; |
30 | 35 |
|
31 | 36 | import static org.junit.jupiter.api.Assertions.*; |
@@ -119,44 +124,62 @@ void testOnFuture() { |
119 | 124 | assertEquals("Z", UncheckedFuture.on(new TestFuture<>("Z")).get()); |
120 | 125 | } |
121 | 126 |
|
122 | | - @RepeatedTest(10) |
| 127 | + |
| 128 | + @Test |
123 | 129 | void interruptFlagIsPreservedOnGet() throws Exception { |
124 | | - assertInterruptPreserved(future -> future.get()); |
| 130 | + assertInterruptPreserved(UncheckedFuture::get); |
125 | 131 | } |
126 | 132 |
|
127 | | - @RepeatedTest(10) |
| 133 | + @Test |
128 | 134 | void interruptFlagIsPreservedOnGetWithTimeout() throws Exception { |
129 | | - assertInterruptPreserved(future -> future.get(2, TimeUnit.SECONDS)); |
| 135 | + assertInterruptPreserved(uf -> uf.get(1, TimeUnit.DAYS)); |
130 | 136 | } |
131 | 137 |
|
132 | | - private static void assertInterruptPreserved( |
133 | | - Consumer<UncheckedFuture<Integer>> futureCall) throws Exception { |
134 | | - |
135 | | - ExecutorService executor = Executors.newFixedThreadPool(2); |
136 | | - try { |
137 | | - CountDownLatch future2IsAboutToWait = new CountDownLatch(1); |
138 | | - Future<Integer> future1 = executor.submit(() -> { |
139 | | - Thread.sleep(10_000); |
140 | | - return 42; |
141 | | - }); |
142 | | - Future<Integer> future2 = executor.submit(() -> { |
143 | | - future2IsAboutToWait.countDown(); |
144 | | - try { |
145 | | - futureCall.accept(UncheckedFuture.on(future1)); |
146 | | - return 1; |
147 | | - } catch (RuntimeException e) { |
148 | | - assertTrue(Thread.currentThread().isInterrupted()); |
149 | | - return 2; |
150 | | - } |
151 | | - }); |
152 | | - |
153 | | - assertTrue(future2IsAboutToWait.await(2, TimeUnit.SECONDS)); |
154 | | - executor.shutdownNow(); |
155 | | - assertEquals(2, future2.get(2, TimeUnit.SECONDS)); |
156 | | - } finally { |
157 | | - executor.shutdownNow(); |
158 | | - executor.awaitTermination(10, TimeUnit.SECONDS); |
159 | | - } |
| 138 | + private static void assertInterruptPreserved(Consumer<UncheckedFuture<Integer>> call) throws Exception { |
| 139 | + final CountDownLatch enteredGet = new CountDownLatch(1); |
| 140 | + Future<Integer> blockingFuture = new AbstractFutureProxy<Integer>(ConcurrentUtils.constantFuture(42)) { |
| 141 | + private final CountDownLatch neverRelease = new CountDownLatch(1); |
| 142 | + |
| 143 | + @Override |
| 144 | + public Integer get() throws InterruptedException { |
| 145 | + enteredGet.countDown(); |
| 146 | + neverRelease.await(); |
| 147 | + throw new AssertionError("We should not get here"); |
| 148 | + } |
| 149 | + |
| 150 | + @Override |
| 151 | + public Integer get(long timeout, TimeUnit unit) throws InterruptedException { |
| 152 | + enteredGet.countDown(); |
| 153 | + neverRelease.await(); |
| 154 | + throw new AssertionError("We should not get here"); |
| 155 | + } |
| 156 | + |
| 157 | + @Override |
| 158 | + public boolean isDone() { |
| 159 | + return false; |
| 160 | + } |
| 161 | + |
| 162 | + }; |
| 163 | + final UncheckedFuture<Integer> uf = UncheckedFuture.on(blockingFuture); |
| 164 | + final AtomicReference<Throwable> thrown = new AtomicReference<>(); |
| 165 | + final AtomicBoolean interruptObserved = new AtomicBoolean(false); |
| 166 | + Thread worker = new Thread(() -> { |
| 167 | + try { |
| 168 | + call.accept(uf); |
| 169 | + thrown.set(new AssertionError("We should not get here")); |
| 170 | + } catch (Throwable e) { |
| 171 | + interruptObserved.set(Thread.currentThread().isInterrupted()); |
| 172 | + thrown.set(e); |
| 173 | + } |
| 174 | + }, "unchecked-future-test-worker"); |
| 175 | + worker.start(); |
| 176 | + assertTrue(enteredGet.await(2, TimeUnit.SECONDS), "Worker did not enter Future.get() in time"); |
| 177 | + worker.interrupt(); |
| 178 | + worker.join(); |
| 179 | + Throwable t = thrown.get(); |
| 180 | + assertInstanceOf(UncheckedInterruptedException.class, t, "Unexpected exception: " + t); |
| 181 | + assertInstanceOf(InterruptedException.class, t.getCause(), "Cause should be InterruptedException"); |
| 182 | + assertTrue(interruptObserved.get(), "Interrupt flag was not restored by the wrapper"); |
160 | 183 | } |
161 | 184 |
|
162 | 185 | } |
0 commit comments