From 1ee223173b667d00f96ae4903a8a003b01c70248 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 1 Jun 2026 12:53:34 -0700 Subject: [PATCH] feat: Wait for gracefulShutdownTimeout in DartWorkerRunner#stop. This patch adjusts DartWorkerRunner to wait for gracefulShutdownTimeout before canceling workers. The default gracefulShutdownTimeout is changed from 0s to 30s to match the documentation, and also, it is believed, to match actual behavior. --- .../msq/dart/guice/DartWorkerModule.java | 9 +- .../msq/dart/worker/DartWorkerRunner.java | 77 +++++++++++-- .../apache/druid/msq/exec/WorkerRunRef.java | 31 +++++- .../msq/dart/worker/DartWorkerRunnerTest.java | 104 +++++++++++++++++- .../druid/msq/exec/WorkerRunRefTest.java | 4 +- .../server/initialization/ServerConfig.java | 2 +- 6 files changed, 205 insertions(+), 22 deletions(-) diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java index 5ee9aa1f6ee1..8a3b9a3d0c1b 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java @@ -64,9 +64,11 @@ import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.rpc.ServiceClientFactory; import org.apache.druid.server.DruidNode; +import org.apache.druid.server.initialization.ServerConfig; import org.apache.druid.server.security.AuthorizerMapper; import java.io.File; +import java.time.Clock; import java.util.Collections; import java.util.List; import java.util.Properties; @@ -122,7 +124,8 @@ public DartWorkerRunner createWorkerRunner( final DruidProcessingConfig processingConfig, @Dart final ResourcePermissionMapper permissionMapper, final MemoryIntrospector memoryIntrospector, - final AuthorizerMapper authorizerMapper + final AuthorizerMapper authorizerMapper, + final ServerConfig serverConfig ) { final ExecutorService exec = Execs.multiThreaded(memoryIntrospector.numTasksInJvm(), "dart-worker-%s"); @@ -134,7 +137,9 @@ public DartWorkerRunner createWorkerRunner( discoveryProvider, permissionMapper, authorizerMapper, - baseTempDir + serverConfig, + baseTempDir, + Clock.systemUTC() ); } diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerRunner.java b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerRunner.java index 205d19ac6b1f..392fd60a916d 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerRunner.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerRunner.java @@ -44,6 +44,7 @@ import org.apache.druid.msq.rpc.ResourcePermissionMapper; import org.apache.druid.msq.rpc.WorkerResource; import org.apache.druid.query.QueryContext; +import org.apache.druid.server.initialization.ServerConfig; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.utils.CloseableUtils; import org.joda.time.DateTime; @@ -52,6 +53,7 @@ import java.io.Closeable; import java.io.File; import java.io.IOException; +import java.time.Clock; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -59,6 +61,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -76,8 +79,8 @@ public class DartWorkerRunner /** * Used to track the time since this runner has been idle. */ + @GuardedBy("this") private final Stopwatch sinceLastWorkerFinished = Stopwatch.createUnstarted(); - /** * Query ID -> Worker instance. */ @@ -88,7 +91,15 @@ public class DartWorkerRunner private final DruidNodeDiscoveryProvider discoveryProvider; private final ResourcePermissionMapper permissionMapper; private final AuthorizerMapper authorizerMapper; + private final ServerConfig serverConfig; private final File baseTempDir; + private final Clock clock; + + /** + * Used to fence off new workers from starting after {@link #stop()} has been called. + */ + @GuardedBy("this") + private boolean stopped; public DartWorkerRunner( final DartWorkerContextFactory workerFactory, @@ -96,7 +107,9 @@ public DartWorkerRunner( final DruidNodeDiscoveryProvider discoveryProvider, final ResourcePermissionMapper permissionMapper, final AuthorizerMapper authorizerMapper, - final File baseTempDir + final ServerConfig serverConfig, + final File baseTempDir, + final Clock clock ) { this.workerFactory = workerFactory; @@ -104,7 +117,9 @@ public DartWorkerRunner( this.discoveryProvider = discoveryProvider; this.permissionMapper = permissionMapper; this.authorizerMapper = authorizerMapper; + this.serverConfig = serverConfig; this.baseTempDir = baseTempDir; + this.clock = clock; } /** @@ -123,10 +138,20 @@ public Worker startWorker( final boolean newHolder; synchronized (this) { + if (stopped) { + throw DruidException.forPersona(DruidException.Persona.OPERATOR) + .ofCategory(DruidException.Category.RUNTIME_FAILURE) + .build("Cannot start query[%s] because this instance has been stopped", queryId); + } + if (!activeControllerHosts.contains(controllerHost)) { throw DruidException.forPersona(DruidException.Persona.OPERATOR) .ofCategory(DruidException.Category.RUNTIME_FAILURE) - .build("Received startWorker request for unknown controller[%s]", controllerHost); + .build( + "Received startWorker request for query[%s] with controller[%s]", + queryId, + controllerHost + ); } final WorkerHolder existingHolder = workerMap.get(queryId); @@ -137,7 +162,7 @@ public Worker startWorker( final WorkerContext workerContext = workerFactory.build(queryId, controllerHost, baseTempDir, context); final Worker worker = new WorkerImpl(null, workerContext); final WorkerResource resource = new WorkerResource(worker, permissionMapper, authorizerMapper); - holder = new WorkerHolder(worker, workerContext, controllerHost, resource, DateTimes.nowUtc()); + holder = new WorkerHolder(worker, workerContext, controllerHost, resource, DateTimes.utc(clock.millis())); workerMap.put(queryId, holder); this.notifyAll(); newHolder = true; @@ -250,15 +275,47 @@ public void start() @LifecycleStop public void stop() { + final List runningWorkers; synchronized (this) { - final Collection holders = workerMap.values(); - - for (final WorkerHolder holder : holders) { - holder.runRef.cancel(); + if (stopped) { + return; } + stopped = true; + runningWorkers = new ArrayList<>(workerMap.values()); + workerMap.clear(); + } - for (final WorkerHolder holder : holders) { - holder.runRef.awaitStop(); + if (runningWorkers.isEmpty()) { + return; + } + + // Wait for workers to exit outside the lock. + final DateTime waitStart = DateTimes.utc(clock.millis()); + final DateTime deadline = waitStart.plus(serverConfig.getGracefulShutdownTimeout()); + + log.info( + "Waiting until[%s] for queries[%s] to stop.", + deadline, + runningWorkers.stream().map(holder -> holder.workerContext.queryId()).collect(Collectors.joining(", ")) + ); + + for (final WorkerHolder holder : runningWorkers) { + try { + final long timeout = deadline.getMillis() - clock.millis(); + if (timeout <= 0 || !holder.runRef.awaitStop(timeout, TimeUnit.MILLISECONDS)) { + log.warn( + "Canceling work for query[%s] due to timeout during stop (waited [%,d] ms)", + holder.workerContext.queryId(), + clock.millis() - waitStart.getMillis() + ); + + holder.runRef.cancel(); + holder.runRef.awaitStop(); + } + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); } } } diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerRunRef.java b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerRunRef.java index adf851bb5959..d08e10b1593d 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerRunRef.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerRunRef.java @@ -30,6 +30,8 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * Reference to a single run of a particular worker. @@ -147,11 +149,24 @@ public synchronized void cancel() } } + /** + * Wait for the worker run to finish, indefinitely. + */ + public void awaitStop() throws InterruptedException + { + awaitStop(-1, TimeUnit.MILLISECONDS); + } + /** * Wait for the worker run to finish. Does not throw exceptions from the future, even if the worker * ended exceptionally. + * + * @param timeout maximum time to wait; negative to wait forever + * @param timeUnit unit for timeout + * + * @return true if the worker stopped, false if the timeout elapsed (in which case the worker may still be running) */ - public void awaitStop() + public boolean awaitStop(final long timeout, final TimeUnit timeUnit) throws InterruptedException { final ListenableFuture future; synchronized (this) { @@ -163,13 +178,19 @@ public void awaitStop() } try { - future.get(); + if (timeout < 0) { + future.get(); + } else { + future.get(timeout, timeUnit); + } + return true; } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); + catch (TimeoutException e) { + return false; } catch (ExecutionException | CancellationException ignored) { - // Do nothing + // Error still counts as stopped. + return true; } } } diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/worker/DartWorkerRunnerTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/worker/DartWorkerRunnerTest.java index 7bf3c53ca96a..a92b30d7ebca 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/worker/DartWorkerRunnerTest.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/worker/DartWorkerRunnerTest.java @@ -36,9 +36,11 @@ import org.apache.druid.msq.exec.WorkerImpl; import org.apache.druid.query.QueryContext; import org.apache.druid.server.DruidNode; +import org.apache.druid.server.initialization.ServerConfig; import org.apache.druid.server.security.AuthorizerMapper; import org.hamcrest.CoreMatchers; import org.hamcrest.MatcherAssert; +import org.joda.time.Period; import org.junit.internal.matchers.ThrowableMessageMatcher; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; @@ -55,6 +57,11 @@ import java.io.File; import java.io.IOException; import java.nio.file.Path; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZoneOffset; import java.util.Collections; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -77,6 +84,13 @@ public class DartWorkerRunnerTest private ListeningExecutorService workerExec; private DartWorkerRunner workerRunner; + private TestClock clock; + + /** + * Graceful shutdown timeout returned by the {@link ServerConfig} given to {@link #workerRunner}. Defaults to + * {@link Period#ZERO} so most tests cancel workers immediately on {@link DartWorkerRunner#stop()}. + */ + private Period gracefulShutdownTimeout; private AutoCloseable mockCloser; @TempDir @@ -104,13 +118,24 @@ public class DartWorkerRunnerTest public void setUp() { mockCloser = MockitoAnnotations.openMocks(this); + clock = new TestClock(0L); + gracefulShutdownTimeout = Period.ZERO; workerRunner = new DartWorkerRunner( workerFactory, workerExec = MoreExecutors.listeningDecorator(Execs.multiThreaded(MAX_WORKERS, "worker-exec-%s")), discoveryProvider, new DartResourcePermissionMapper(), authorizerMapper, - temporaryFolder.toFile() + new ServerConfig() + { + @Override + public Period getGracefulShutdownTimeout() + { + return gracefulShutdownTimeout; + } + }, + temporaryFolder.toFile(), + clock ); // "discoveryProvider" provides "discovery". @@ -191,7 +216,7 @@ public void test_startWorker_controllerNotActive() MatcherAssert.assertThat( e, ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString( - "Received startWorker request for unknown controller")) + "Received startWorker request for query[abc] with controller[" + CONTROLLER_SERVER_HOST + "]")) ); } @@ -284,4 +309,79 @@ public void test_startWorker_thenStopRunner() throws InterruptedException workerRunner.awaitQuerySet(Set::isEmpty); Assertions.assertEquals(0, workerRunner.getWorkersResponse().getWorkers().size()); } + + @Test + @Timeout(value = 10, unit = TimeUnit.SECONDS) + public void test_stop_cancelsWorkerAfterTimeoutElapses() throws InterruptedException + { + gracefulShutdownTimeout = Period.seconds(30); + discoveryListener.getValue().nodesAdded(Collections.singletonList(CONTROLLER_DISCOVERY_NODE)); + workerRunner.startWorker(QUERY_ID, CONTROLLER_SERVER_HOST, QueryContext.empty()); + Assertions.assertEquals(1, workerRunner.getWorkersResponse().getWorkers().size()); + + // Advance the clock 40 seconds (past the 30-second timeout) after it is first read. This causes the worker runner + // to cancel the worker on stop(). + clock.advanceOnNextRead(Duration.ofSeconds(40)); + workerRunner.stop(); + + workerRunner.awaitQuerySet(Set::isEmpty); + Assertions.assertEquals(0, workerRunner.getWorkersResponse().getWorkers().size()); + } + + /** + * A manually-advanced {@link Clock} for tests. + */ + private static class TestClock extends Clock + { + private final ZoneId zone; + private long nowMillis; + private long advanceOnNextReadMillis; + + TestClock(final long nowMillis) + { + this(nowMillis, ZoneOffset.UTC); + } + + private TestClock(final long nowMillis, final ZoneId zone) + { + this.nowMillis = nowMillis; + this.zone = zone; + } + + /** + * Arrange for the clock to jump forward by the given duration immediately after the next read. The triggering + * read returns the pre-jump time; subsequent reads return the post-jump time. + */ + public void advanceOnNextRead(final Duration duration) + { + advanceOnNextReadMillis = duration.toMillis(); + } + + @Override + public long millis() + { + final long current = nowMillis; + nowMillis += advanceOnNextReadMillis; + advanceOnNextReadMillis = 0; + return current; + } + + @Override + public Instant instant() + { + return Instant.ofEpochMilli(millis()); + } + + @Override + public ZoneId getZone() + { + return zone; + } + + @Override + public Clock withZone(final ZoneId zone) + { + return new TestClock(nowMillis, zone); + } + } } diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerRunRefTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerRunRefTest.java index 6f12c6bed196..5a8825150b75 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerRunRefTest.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerRunRefTest.java @@ -120,7 +120,7 @@ public void stop() } @Test - public void testSuccessfulCompletionOfWorker() + public void testSuccessfulCompletionOfWorker() throws InterruptedException { final Worker worker = new TestWorker("test-worker") { @@ -156,7 +156,7 @@ public void stop() } @Test - public void testUnsuccessfulCompletionOfWorker() + public void testUnsuccessfulCompletionOfWorker() throws InterruptedException { final RuntimeException expectedException = new RuntimeException("Worker failed"); final Worker worker = new TestWorker("test-worker") diff --git a/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java b/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java index a7206ca9cf7e..b667d05e0197 100644 --- a/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java +++ b/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java @@ -167,7 +167,7 @@ public ServerConfig(boolean enableQueryRequestsQueuing) @JsonProperty @NotNull - private Period gracefulShutdownTimeout = Period.ZERO; + private Period gracefulShutdownTimeout = Period.seconds(30); @JsonProperty @NotNull