Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.segment.join;

import com.google.common.annotations.VisibleForTesting;
import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.filter.ValueMatcher;
import org.apache.druid.segment.ColumnSelectorFactory;
Expand Down Expand Up @@ -78,10 +79,11 @@ public void setValueMatcher(@Nullable ValueMatcher valueMatcher)
private void advanceToMatch()
{
if (valueMatcher != null) {
while (!isDone() && !valueMatcher.matches(false)) {
while (!isDoneOrInterrupted() && !valueMatcher.matches(false)) {
baseCursor.advance();
}
}
BaseQuery.checkInterrupted();
}

@Override
Expand All @@ -100,8 +102,6 @@ public Filter getPostJoinFilter()
public void advance()
{
baseCursor.advance();
// Relies on baseCursor.advance() call inside this for BaseQuery.checkInterrupted() checks -- unlike other cursors
// which call advanceInterruptibly() and hence have to explicitly provision for interrupts.
advanceToMatch();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@

package org.apache.druid.server;

import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.FunctionalIterable;
import org.apache.druid.java.util.common.guava.SequenceWrapper;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.DirectQueryProcessingPool;
Expand All @@ -44,6 +48,7 @@

import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.StreamSupport;

/**
Expand Down Expand Up @@ -111,12 +116,56 @@ public <T> QueryRunner<T> getQueryRunnerForIntervals(final Query<T> query, final

// Note: Not calling 'postProcess'; it isn't official/documented functionality so we'll only support it where
// it is already supported.
return FluentQueryRunner
final QueryRunner<T> decorated = FluentQueryRunner
.create(scheduler.wrapQueryRunner(baseRunner), queryRunnerFactory.getToolchest())
.applyPreMergeDecoration()
.mergeResults(true)
.applyPostMergeDecoration()
.emitCPUTimeMetric(emitter, cpuAccumulator);

// Register a cancellable future so that QueryScheduler.cancelQuery() can interrupt this thread.
// LocalQuerySegmentWalker runs queries on the calling (Jetty) thread, so no future is created by the
// parallel-merge path in CachingClusteredClient. Without this, broker-side join queries are uncancellable.
return (queryPlus, responseContext) -> {
final AtomicReference<Thread> runnerThreadRef = new AtomicReference<>();
final SettableFuture<Object> queryFuture = SettableFuture.create();
queryFuture.addListener(
() -> {
if (queryFuture.isCancelled()) {
final Thread t = runnerThreadRef.get();
if (t != null) {
t.interrupt();
}
}
},
Execs.directExecutor()
);
scheduler.registerQueryFuture(queryPlus.getQuery(), queryFuture);
return Sequences.wrap(
decorated.run(queryPlus, responseContext),
new SequenceWrapper()
{
@Override
public void before()
{
runnerThreadRef.set(Thread.currentThread());
// Handle the race where cancel arrived before sequence iteration began.
if (queryFuture.isCancelled()) {
Thread.currentThread().interrupt();
}
}

@Override
public void after(boolean isDone, Throwable thrown)
{
// Clear the thread ref before completing the future to prevent a late-arriving cancel
// from interrupting a subsequent request on the same Jetty thread.
runnerThreadRef.set(null);
queryFuture.set(null);
}
}
);
};
}

@Override
Expand Down
Loading
Loading