diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java index 5ee9aa1f6ee1..8a3b9a3d0c1b 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java @@ -64,9 +64,11 @@ import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.rpc.ServiceClientFactory; import org.apache.druid.server.DruidNode; +import org.apache.druid.server.initialization.ServerConfig; import org.apache.druid.server.security.AuthorizerMapper; import java.io.File; +import java.time.Clock; import java.util.Collections; import java.util.List; import java.util.Properties; @@ -122,7 +124,8 @@ public DartWorkerRunner createWorkerRunner( final DruidProcessingConfig processingConfig, @Dart final ResourcePermissionMapper permissionMapper, final MemoryIntrospector memoryIntrospector, - final AuthorizerMapper authorizerMapper + final AuthorizerMapper authorizerMapper, + final ServerConfig serverConfig ) { final ExecutorService exec = Execs.multiThreaded(memoryIntrospector.numTasksInJvm(), "dart-worker-%s"); @@ -134,7 +137,9 @@ public DartWorkerRunner createWorkerRunner( discoveryProvider, permissionMapper, authorizerMapper, - baseTempDir + serverConfig, + baseTempDir, + Clock.systemUTC() ); } diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerRunner.java b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerRunner.java index 205d19ac6b1f..392fd60a916d 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerRunner.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerRunner.java @@ -44,6 +44,7 @@ import org.apache.druid.msq.rpc.ResourcePermissionMapper; import org.apache.druid.msq.rpc.WorkerResource; import org.apache.druid.query.QueryContext; +import org.apache.druid.server.initialization.ServerConfig; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.utils.CloseableUtils; import org.joda.time.DateTime; @@ -52,6 +53,7 @@ import java.io.Closeable; import java.io.File; import java.io.IOException; +import java.time.Clock; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -59,6 +61,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -76,8 +79,8 @@ public class DartWorkerRunner /** * Used to track the time since this runner has been idle. */ + @GuardedBy("this") private final Stopwatch sinceLastWorkerFinished = Stopwatch.createUnstarted(); - /** * Query ID -> Worker instance. */ @@ -88,7 +91,15 @@ public class DartWorkerRunner private final DruidNodeDiscoveryProvider discoveryProvider; private final ResourcePermissionMapper permissionMapper; private final AuthorizerMapper authorizerMapper; + private final ServerConfig serverConfig; private final File baseTempDir; + private final Clock clock; + + /** + * Used to fence off new workers from starting after {@link #stop()} has been called. + */ + @GuardedBy("this") + private boolean stopped; public DartWorkerRunner( final DartWorkerContextFactory workerFactory, @@ -96,7 +107,9 @@ public DartWorkerRunner( final DruidNodeDiscoveryProvider discoveryProvider, final ResourcePermissionMapper permissionMapper, final AuthorizerMapper authorizerMapper, - final File baseTempDir + final ServerConfig serverConfig, + final File baseTempDir, + final Clock clock ) { this.workerFactory = workerFactory; @@ -104,7 +117,9 @@ public DartWorkerRunner( this.discoveryProvider = discoveryProvider; this.permissionMapper = permissionMapper; this.authorizerMapper = authorizerMapper; + this.serverConfig = serverConfig; this.baseTempDir = baseTempDir; + this.clock = clock; } /** @@ -123,10 +138,20 @@ public Worker startWorker( final boolean newHolder; synchronized (this) { + if (stopped) { + throw DruidException.forPersona(DruidException.Persona.OPERATOR) + .ofCategory(DruidException.Category.RUNTIME_FAILURE) + .build("Cannot start query[%s] because this instance has been stopped", queryId); + } + if (!activeControllerHosts.contains(controllerHost)) { throw DruidException.forPersona(DruidException.Persona.OPERATOR) .ofCategory(DruidException.Category.RUNTIME_FAILURE) - .build("Received startWorker request for unknown controller[%s]", controllerHost); + .build( + "Received startWorker request for query[%s] with controller[%s]", + queryId, + controllerHost + ); } final WorkerHolder existingHolder = workerMap.get(queryId); @@ -137,7 +162,7 @@ public Worker startWorker( final WorkerContext workerContext = workerFactory.build(queryId, controllerHost, baseTempDir, context); final Worker worker = new WorkerImpl(null, workerContext); final WorkerResource resource = new WorkerResource(worker, permissionMapper, authorizerMapper); - holder = new WorkerHolder(worker, workerContext, controllerHost, resource, DateTimes.nowUtc()); + holder = new WorkerHolder(worker, workerContext, controllerHost, resource, DateTimes.utc(clock.millis())); workerMap.put(queryId, holder); this.notifyAll(); newHolder = true; @@ -250,15 +275,47 @@ public void start() @LifecycleStop public void stop() { + final List runningWorkers; synchronized (this) { - final Collection holders = workerMap.values(); - - for (final WorkerHolder holder : holders) { - holder.runRef.cancel(); + if (stopped) { + return; } + stopped = true; + runningWorkers = new ArrayList<>(workerMap.values()); + workerMap.clear(); + } - for (final WorkerHolder holder : holders) { - holder.runRef.awaitStop(); + if (runningWorkers.isEmpty()) { + return; + } + + // Wait for workers to exit outside the lock. + final DateTime waitStart = DateTimes.utc(clock.millis()); + final DateTime deadline = waitStart.plus(serverConfig.getGracefulShutdownTimeout()); + + log.info( + "Waiting until[%s] for queries[%s] to stop.", + deadline, + runningWorkers.stream().map(holder -> holder.workerContext.queryId()).collect(Collectors.joining(", ")) + ); + + for (final WorkerHolder holder : runningWorkers) { + try { + final long timeout = deadline.getMillis() - clock.millis(); + if (timeout <= 0 || !holder.runRef.awaitStop(timeout, TimeUnit.MILLISECONDS)) { + log.warn( + "Canceling work for query[%s] due to timeout during stop (waited [%,d] ms)", + holder.workerContext.queryId(), + clock.millis() - waitStart.getMillis() + ); + + holder.runRef.cancel(); + holder.runRef.awaitStop(); + } + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); } } } diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerRunRef.java b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerRunRef.java index adf851bb5959..d08e10b1593d 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerRunRef.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerRunRef.java @@ -30,6 +30,8 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * Reference to a single run of a particular worker. @@ -147,11 +149,24 @@ public synchronized void cancel() } } + /** + * Wait for the worker run to finish, indefinitely. + */ + public void awaitStop() throws InterruptedException + { + awaitStop(-1, TimeUnit.MILLISECONDS); + } + /** * Wait for the worker run to finish. Does not throw exceptions from the future, even if the worker * ended exceptionally. + * + * @param timeout maximum time to wait; negative to wait forever + * @param timeUnit unit for timeout + * + * @return true if the worker stopped, false if the timeout elapsed (in which case the worker may still be running) */ - public void awaitStop() + public boolean awaitStop(final long timeout, final TimeUnit timeUnit) throws InterruptedException { final ListenableFuture future; synchronized (this) { @@ -163,13 +178,19 @@ public void awaitStop() } try { - future.get(); + if (timeout < 0) { + future.get(); + } else { + future.get(timeout, timeUnit); + } + return true; } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); + catch (TimeoutException e) { + return false; } catch (ExecutionException | CancellationException ignored) { - // Do nothing + // Error still counts as stopped. + return true; } } } diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/worker/DartWorkerRunnerTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/worker/DartWorkerRunnerTest.java index 7bf3c53ca96a..a92b30d7ebca 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/worker/DartWorkerRunnerTest.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/worker/DartWorkerRunnerTest.java @@ -36,9 +36,11 @@ import org.apache.druid.msq.exec.WorkerImpl; import org.apache.druid.query.QueryContext; import org.apache.druid.server.DruidNode; +import org.apache.druid.server.initialization.ServerConfig; import org.apache.druid.server.security.AuthorizerMapper; import org.hamcrest.CoreMatchers; import org.hamcrest.MatcherAssert; +import org.joda.time.Period; import org.junit.internal.matchers.ThrowableMessageMatcher; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; @@ -55,6 +57,11 @@ import java.io.File; import java.io.IOException; import java.nio.file.Path; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZoneOffset; import java.util.Collections; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -77,6 +84,13 @@ public class DartWorkerRunnerTest private ListeningExecutorService workerExec; private DartWorkerRunner workerRunner; + private TestClock clock; + + /** + * Graceful shutdown timeout returned by the {@link ServerConfig} given to {@link #workerRunner}. Defaults to + * {@link Period#ZERO} so most tests cancel workers immediately on {@link DartWorkerRunner#stop()}. + */ + private Period gracefulShutdownTimeout; private AutoCloseable mockCloser; @TempDir @@ -104,13 +118,24 @@ public class DartWorkerRunnerTest public void setUp() { mockCloser = MockitoAnnotations.openMocks(this); + clock = new TestClock(0L); + gracefulShutdownTimeout = Period.ZERO; workerRunner = new DartWorkerRunner( workerFactory, workerExec = MoreExecutors.listeningDecorator(Execs.multiThreaded(MAX_WORKERS, "worker-exec-%s")), discoveryProvider, new DartResourcePermissionMapper(), authorizerMapper, - temporaryFolder.toFile() + new ServerConfig() + { + @Override + public Period getGracefulShutdownTimeout() + { + return gracefulShutdownTimeout; + } + }, + temporaryFolder.toFile(), + clock ); // "discoveryProvider" provides "discovery". @@ -191,7 +216,7 @@ public void test_startWorker_controllerNotActive() MatcherAssert.assertThat( e, ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString( - "Received startWorker request for unknown controller")) + "Received startWorker request for query[abc] with controller[" + CONTROLLER_SERVER_HOST + "]")) ); } @@ -284,4 +309,79 @@ public void test_startWorker_thenStopRunner() throws InterruptedException workerRunner.awaitQuerySet(Set::isEmpty); Assertions.assertEquals(0, workerRunner.getWorkersResponse().getWorkers().size()); } + + @Test + @Timeout(value = 10, unit = TimeUnit.SECONDS) + public void test_stop_cancelsWorkerAfterTimeoutElapses() throws InterruptedException + { + gracefulShutdownTimeout = Period.seconds(30); + discoveryListener.getValue().nodesAdded(Collections.singletonList(CONTROLLER_DISCOVERY_NODE)); + workerRunner.startWorker(QUERY_ID, CONTROLLER_SERVER_HOST, QueryContext.empty()); + Assertions.assertEquals(1, workerRunner.getWorkersResponse().getWorkers().size()); + + // Advance the clock 40 seconds (past the 30-second timeout) after it is first read. This causes the worker runner + // to cancel the worker on stop(). + clock.advanceOnNextRead(Duration.ofSeconds(40)); + workerRunner.stop(); + + workerRunner.awaitQuerySet(Set::isEmpty); + Assertions.assertEquals(0, workerRunner.getWorkersResponse().getWorkers().size()); + } + + /** + * A manually-advanced {@link Clock} for tests. + */ + private static class TestClock extends Clock + { + private final ZoneId zone; + private long nowMillis; + private long advanceOnNextReadMillis; + + TestClock(final long nowMillis) + { + this(nowMillis, ZoneOffset.UTC); + } + + private TestClock(final long nowMillis, final ZoneId zone) + { + this.nowMillis = nowMillis; + this.zone = zone; + } + + /** + * Arrange for the clock to jump forward by the given duration immediately after the next read. The triggering + * read returns the pre-jump time; subsequent reads return the post-jump time. + */ + public void advanceOnNextRead(final Duration duration) + { + advanceOnNextReadMillis = duration.toMillis(); + } + + @Override + public long millis() + { + final long current = nowMillis; + nowMillis += advanceOnNextReadMillis; + advanceOnNextReadMillis = 0; + return current; + } + + @Override + public Instant instant() + { + return Instant.ofEpochMilli(millis()); + } + + @Override + public ZoneId getZone() + { + return zone; + } + + @Override + public Clock withZone(final ZoneId zone) + { + return new TestClock(nowMillis, zone); + } + } } diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerRunRefTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerRunRefTest.java index 6f12c6bed196..5a8825150b75 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerRunRefTest.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerRunRefTest.java @@ -120,7 +120,7 @@ public void stop() } @Test - public void testSuccessfulCompletionOfWorker() + public void testSuccessfulCompletionOfWorker() throws InterruptedException { final Worker worker = new TestWorker("test-worker") { @@ -156,7 +156,7 @@ public void stop() } @Test - public void testUnsuccessfulCompletionOfWorker() + public void testUnsuccessfulCompletionOfWorker() throws InterruptedException { final RuntimeException expectedException = new RuntimeException("Worker failed"); final Worker worker = new TestWorker("test-worker") diff --git a/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java b/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java index a7206ca9cf7e..b667d05e0197 100644 --- a/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java +++ b/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java @@ -167,7 +167,7 @@ public ServerConfig(boolean enableQueryRequestsQueuing) @JsonProperty @NotNull - private Period gracefulShutdownTimeout = Period.ZERO; + private Period gracefulShutdownTimeout = Period.seconds(30); @JsonProperty @NotNull