|
| 1 | +package io.temporal.internal.worker; |
| 2 | + |
| 3 | +import com.uber.m3.tally.NoopScope; |
| 4 | +import io.temporal.worker.tuning.PollerBehaviorSimpleMaximum; |
| 5 | +import org.checkerframework.checker.nullness.qual.NonNull; |
| 6 | +import org.junit.Test; |
| 7 | +import org.junit.runner.RunWith; |
| 8 | +import org.junit.runners.Parameterized; |
| 9 | + |
| 10 | +import java.util.concurrent.CompletableFuture; |
| 11 | +import java.util.concurrent.CountDownLatch; |
| 12 | +import java.util.concurrent.TimeUnit; |
| 13 | +import java.util.concurrent.atomic.AtomicReference; |
| 14 | + |
| 15 | +import static org.junit.Assert.*; |
| 16 | + |
| 17 | +/** |
| 18 | + * Tests that an in-flight poll survives shutdown when graceful poll shutdown is enabled, and is |
| 19 | + * killed promptly when it is not. |
| 20 | + */ |
| 21 | +@RunWith(Parameterized.class) |
| 22 | +public class GracefulPollShutdownTest { |
| 23 | + |
| 24 | + @Parameterized.Parameter public boolean graceful; |
| 25 | + |
| 26 | + @Parameterized.Parameters(name = "graceful={0}") |
| 27 | + public static Object[] data() { |
| 28 | + return new Object[] {true, false}; |
| 29 | + } |
| 30 | + |
| 31 | + @Test(timeout = 10_000) |
| 32 | + public void inflightPollSurvivesShutdownOnlyWhenGraceful() throws Exception { |
| 33 | + NamespaceCapabilities capabilities = new NamespaceCapabilities(); |
| 34 | + capabilities.setGracefulPollShutdown(graceful); |
| 35 | + |
| 36 | + AtomicReference<String> processedTask = new AtomicReference<>(); |
| 37 | + CountDownLatch taskProcessedLatch = new CountDownLatch(1); |
| 38 | + ShutdownableTaskExecutor<String> taskExecutor = |
| 39 | + new ShutdownableTaskExecutor<String>() { |
| 40 | + @Override |
| 41 | + public void process(@NonNull String task) { |
| 42 | + processedTask.set(task); |
| 43 | + taskProcessedLatch.countDown(); |
| 44 | + } |
| 45 | + |
| 46 | + @Override |
| 47 | + public boolean isShutdown() { |
| 48 | + return false; |
| 49 | + } |
| 50 | + |
| 51 | + @Override |
| 52 | + public boolean isTerminated() { |
| 53 | + return false; |
| 54 | + } |
| 55 | + |
| 56 | + @Override |
| 57 | + public CompletableFuture<Void> shutdown( |
| 58 | + ShutdownManager shutdownManager, boolean interruptTasks) { |
| 59 | + return CompletableFuture.completedFuture(null); |
| 60 | + } |
| 61 | + |
| 62 | + @Override |
| 63 | + public void awaitTermination(long timeout, TimeUnit unit) {} |
| 64 | + }; |
| 65 | + |
| 66 | + // -- poll task: first call returns immediately, second blocks until released -- |
| 67 | + CountDownLatch secondPollStarted = new CountDownLatch(1); |
| 68 | + CountDownLatch releasePoll = new CountDownLatch(1); |
| 69 | + |
| 70 | + MultiThreadedPoller.PollTask<String> pollTask = |
| 71 | + new MultiThreadedPoller.PollTask<String>() { |
| 72 | + private int callCount = 0; |
| 73 | + |
| 74 | + @Override |
| 75 | + public synchronized String poll() { |
| 76 | + callCount++; |
| 77 | + if (callCount == 1) { |
| 78 | + return "task-1"; |
| 79 | + } else if (callCount == 2) { |
| 80 | + secondPollStarted.countDown(); |
| 81 | + try { |
| 82 | + releasePoll.await(); |
| 83 | + } catch (InterruptedException e) { |
| 84 | + Thread.currentThread().interrupt(); |
| 85 | + return null; |
| 86 | + } |
| 87 | + return "task-2"; |
| 88 | + } |
| 89 | + // Subsequent calls just block until interrupted (simulates long poll) |
| 90 | + try { |
| 91 | + Thread.sleep(Long.MAX_VALUE); |
| 92 | + } catch (InterruptedException e) { |
| 93 | + Thread.currentThread().interrupt(); |
| 94 | + } |
| 95 | + return null; |
| 96 | + } |
| 97 | + }; |
| 98 | + |
| 99 | + // -- create poller with 1 thread so polls are sequential -- |
| 100 | + MultiThreadedPoller<String> poller = |
| 101 | + new MultiThreadedPoller<>( |
| 102 | + "test-identity", |
| 103 | + pollTask, |
| 104 | + taskExecutor, |
| 105 | + PollerOptions.newBuilder() |
| 106 | + .setPollerBehavior(new PollerBehaviorSimpleMaximum(1)) |
| 107 | + .setPollThreadNamePrefix("test-poller") |
| 108 | + .build(), |
| 109 | + new NoopScope(), |
| 110 | + capabilities); |
| 111 | + |
| 112 | + poller.start(); |
| 113 | + |
| 114 | + // Wait for the first task to be processed (proves poller is running) |
| 115 | + assertTrue("first task should be processed", taskProcessedLatch.await(5, TimeUnit.SECONDS)); |
| 116 | + assertEquals("task-1", processedTask.get()); |
| 117 | + |
| 118 | + // Wait for the second poll to be in-flight |
| 119 | + assertTrue("second poll should start", secondPollStarted.await(5, TimeUnit.SECONDS)); |
| 120 | + |
| 121 | + // Trigger shutdown (don't interrupt tasks) |
| 122 | + ShutdownManager shutdownManager = new ShutdownManager(); |
| 123 | + CompletableFuture<Void> shutdownFuture = poller.shutdown(shutdownManager, false); |
| 124 | + |
| 125 | + if (graceful) { |
| 126 | + // In graceful mode the poller waits for the in-flight poll to complete. |
| 127 | + // The shutdown should NOT have completed yet since the poll is still blocked. |
| 128 | + assertFalse("shutdown should not complete while poll is in-flight", shutdownFuture.isDone()); |
| 129 | + |
| 130 | + // Simulate the server returning the poll response (as it would after ShutdownWorker RPC) |
| 131 | + releasePoll.countDown(); |
| 132 | + |
| 133 | + // Wait for shutdown to complete - the poll should return "task-2" and be processed |
| 134 | + shutdownFuture.get(5, TimeUnit.SECONDS); |
| 135 | + |
| 136 | + assertEquals("task-2", processedTask.get()); |
| 137 | + } else { |
| 138 | + // In legacy mode the poller forcefully interrupts in-flight polls. |
| 139 | + // Shutdown should complete quickly without releasing the blocked poll. |
| 140 | + shutdownFuture.get(5, TimeUnit.SECONDS); |
| 141 | + |
| 142 | + // The second task should NOT have been processed since the poll was killed. |
| 143 | + assertNotEquals( |
| 144 | + "task-2 should not be processed in legacy mode", "task-2", processedTask.get()); |
| 145 | + } |
| 146 | + |
| 147 | + shutdownManager.close(); |
| 148 | + } |
| 149 | +} |
0 commit comments