Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,11 @@
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.security.AuthorizerMapper;

import java.io.File;
import java.time.Clock;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
Expand Down Expand Up @@ -122,7 +124,8 @@ public DartWorkerRunner createWorkerRunner(
final DruidProcessingConfig processingConfig,
@Dart final ResourcePermissionMapper permissionMapper,
final MemoryIntrospector memoryIntrospector,
final AuthorizerMapper authorizerMapper
final AuthorizerMapper authorizerMapper,
final ServerConfig serverConfig
)
{
final ExecutorService exec = Execs.multiThreaded(memoryIntrospector.numTasksInJvm(), "dart-worker-%s");
Expand All @@ -134,7 +137,9 @@ public DartWorkerRunner createWorkerRunner(
discoveryProvider,
permissionMapper,
authorizerMapper,
baseTempDir
serverConfig,
baseTempDir,
Clock.systemUTC()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.druid.msq.rpc.ResourcePermissionMapper;
import org.apache.druid.msq.rpc.WorkerResource;
import org.apache.druid.query.QueryContext;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.utils.CloseableUtils;
import org.joda.time.DateTime;
Expand All @@ -52,13 +53,15 @@
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;

Expand All @@ -76,8 +79,8 @@ public class DartWorkerRunner
/**
* Used to track the time since this runner has been idle.
*/
@GuardedBy("this")
private final Stopwatch sinceLastWorkerFinished = Stopwatch.createUnstarted();

/**
* Query ID -> Worker instance.
*/
Expand All @@ -88,23 +91,35 @@ public class DartWorkerRunner
private final DruidNodeDiscoveryProvider discoveryProvider;
private final ResourcePermissionMapper permissionMapper;
private final AuthorizerMapper authorizerMapper;
private final ServerConfig serverConfig;
private final File baseTempDir;
private final Clock clock;

/**
* Used to fence off new workers from starting after {@link #stop()} has been called.
*/
@GuardedBy("this")
private boolean stopped;

public DartWorkerRunner(
final DartWorkerContextFactory workerFactory,
final ListeningExecutorService workerExec,
final DruidNodeDiscoveryProvider discoveryProvider,
final ResourcePermissionMapper permissionMapper,
final AuthorizerMapper authorizerMapper,
final File baseTempDir
final ServerConfig serverConfig,
final File baseTempDir,
final Clock clock
)
{
this.workerFactory = workerFactory;
this.workerExec = workerExec;
this.discoveryProvider = discoveryProvider;
this.permissionMapper = permissionMapper;
this.authorizerMapper = authorizerMapper;
this.serverConfig = serverConfig;
this.baseTempDir = baseTempDir;
this.clock = clock;
}

/**
Expand All @@ -123,10 +138,20 @@ public Worker startWorker(
final boolean newHolder;

synchronized (this) {
if (stopped) {
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
.build("Cannot start query[%s] because this instance has been stopped", queryId);
}

if (!activeControllerHosts.contains(controllerHost)) {
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
.build("Received startWorker request for unknown controller[%s]", controllerHost);
.build(
"Received startWorker request for query[%s] with controller[%s]",
queryId,
controllerHost
);
}

final WorkerHolder existingHolder = workerMap.get(queryId);
Expand All @@ -137,7 +162,7 @@ public Worker startWorker(
final WorkerContext workerContext = workerFactory.build(queryId, controllerHost, baseTempDir, context);
final Worker worker = new WorkerImpl(null, workerContext);
final WorkerResource resource = new WorkerResource(worker, permissionMapper, authorizerMapper);
holder = new WorkerHolder(worker, workerContext, controllerHost, resource, DateTimes.nowUtc());
holder = new WorkerHolder(worker, workerContext, controllerHost, resource, DateTimes.utc(clock.millis()));
workerMap.put(queryId, holder);
this.notifyAll();
newHolder = true;
Expand Down Expand Up @@ -250,15 +275,47 @@ public void start()
@LifecycleStop
public void stop()
{
final List<WorkerHolder> runningWorkers;
synchronized (this) {
final Collection<WorkerHolder> holders = workerMap.values();

for (final WorkerHolder holder : holders) {
holder.runRef.cancel();
if (stopped) {
return;
}
stopped = true;
runningWorkers = new ArrayList<>(workerMap.values());
workerMap.clear();
}

for (final WorkerHolder holder : holders) {
holder.runRef.awaitStop();
if (runningWorkers.isEmpty()) {
return;
}

// Wait for workers to exit outside the lock.
final DateTime waitStart = DateTimes.utc(clock.millis());
final DateTime deadline = waitStart.plus(serverConfig.getGracefulShutdownTimeout());

log.info(
"Waiting until[%s] for queries[%s] to stop.",
deadline,
runningWorkers.stream().map(holder -> holder.workerContext.queryId()).collect(Collectors.joining(", "))
);

for (final WorkerHolder holder : runningWorkers) {
try {
final long timeout = deadline.getMillis() - clock.millis();
if (timeout <= 0 || !holder.runRef.awaitStop(timeout, TimeUnit.MILLISECONDS)) {
log.warn(
"Canceling work for query[%s] due to timeout during stop (waited [%,d] ms)",
holder.workerContext.queryId(),
clock.millis() - waitStart.getMillis()
);

holder.runRef.cancel();
holder.runRef.awaitStop();
}
}
catch (InterruptedException e) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Cancel remaining workers when shutdown wait is interrupted

If the lifecycle stop thread is interrupted while waiting for a worker, awaitStop now throws and this catch exits without canceling the current or remaining runningWorkers. Since stopped is already true and workerMap has been cleared, a later stop() call returns immediately, leaving background worker threads running instead of forcing cancellation. The interruption path should cancel the copied workers before returning or rethrowing.

Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@

import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* Reference to a single run of a particular worker.
Expand Down Expand Up @@ -147,11 +149,24 @@ public synchronized void cancel()
}
}

/**
* Wait for the worker run to finish, indefinitely.
*/
public void awaitStop() throws InterruptedException
{
awaitStop(-1, TimeUnit.MILLISECONDS);
}

/**
* Wait for the worker run to finish. Does not throw exceptions from the future, even if the worker
* ended exceptionally.
*
* @param timeout maximum time to wait; negative to wait forever
* @param timeUnit unit for timeout
*
* @return true if the worker stopped, false if the timeout elapsed (in which case the worker may still be running)
*/
public void awaitStop()
public boolean awaitStop(final long timeout, final TimeUnit timeUnit) throws InterruptedException
{
final ListenableFuture<?> future;
synchronized (this) {
Expand All @@ -163,13 +178,19 @@ public void awaitStop()
}

try {
future.get();
if (timeout < 0) {
future.get();
} else {
future.get(timeout, timeUnit);
}
return true;
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
catch (TimeoutException e) {
return false;
}
catch (ExecutionException | CancellationException ignored) {
// Do nothing
// Error still counts as stopped.
return true;
}
}
}
Loading
Loading