Skip to content

Commit 0fd547c

Browse files
Deep Patelclaude
authored andcommitted
perf: skip concurrency overhead in BaseSingleBlockCombineOperator when numTasks=1
When a query runs with a single execution task (e.g. one segment or maxExecutionThreads=1), BaseSingleBlockCombineOperator still incurred the full multi-thread overhead: ExecutorService.submit(), Phaser registration/ deregistration, BlockingQueue.poll() with timeout, AtomicInteger, and AtomicReference. This adds a single-thread fast path in getNextBlock(): when _numTasks==1 and _resultsBlockMerger is non-null (i.e. the subclass uses the default merge strategy), segments are processed sequentially on the calling thread with none of that synchronization overhead. CPU time and memory are still tracked via ThreadResourceSnapshot. Subclasses that override mergeResults() with custom logic (e.g. SequentialSortedGroupByCombineOperator, which passes null for _resultsBlockMerger) are unaffected and continue using the standard path. Fixes #14617 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 2e80bff commit 0fd547c

1 file changed

Lines changed: 50 additions & 0 deletions

File tree

pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseSingleBlockCombineOperator.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.pinot.core.operator.blocks.results.ExceptionResultsBlock;
3030
import org.apache.pinot.core.operator.combine.merger.ResultsBlockMerger;
3131
import org.apache.pinot.core.query.request.context.QueryContext;
32+
import org.apache.pinot.spi.accounting.ThreadResourceSnapshot;
3233
import org.apache.pinot.spi.exception.QueryErrorCode;
3334
import org.apache.pinot.spi.exception.QueryErrorMessage;
3435
import org.apache.pinot.spi.exception.QueryException;
@@ -57,8 +58,14 @@ protected BaseSingleBlockCombineOperator(ResultsBlockMerger<T> resultsBlockMerge
5758
/// @inheritDoc
5859
///
5960
/// Handles exceptions here so that execution stats can be attached.
61+
/// When only a single task is needed and the subclass uses the default ResultsBlockMerger (not null), segments are
62+
/// processed directly on the calling thread to avoid the overhead of thread submission, Phaser synchronization,
63+
/// BlockingQueue polling, and atomic operations.
6064
@Override
6165
protected BaseResultsBlock getNextBlock() {
66+
if (_numTasks == 1 && _resultsBlockMerger != null) {
67+
return getNextBlockSingleThread();
68+
}
6269
try {
6370
startProcess();
6471
return checkTerminateExceptionAndAttachExecutionStats(mergeResults());
@@ -69,6 +76,49 @@ protected BaseResultsBlock getNextBlock() {
6976
}
7077
}
7178

79+
/// Processes all segments sequentially on the calling thread when only one task is needed.
80+
/// Avoids all concurrency overhead: no ExecutorService submission, no Phaser, no BlockingQueue, no atomics.
81+
@SuppressWarnings("unchecked")
82+
private BaseResultsBlock getNextBlockSingleThread() {
83+
ThreadResourceSnapshot resourceSnapshot = new ThreadResourceSnapshot();
84+
T mergedBlock = null;
85+
try {
86+
for (int i = 0; i < _numOperators; i++) {
87+
Operator operator = _operators.get(i);
88+
T resultsBlock;
89+
try {
90+
if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
91+
((AcquireReleaseColumnsSegmentOperator) operator).acquire();
92+
}
93+
resultsBlock = (T) operator.nextBlock();
94+
} catch (RuntimeException e) {
95+
return createExceptionResultsBlockAndAttachExecutionStats(wrapOperatorException(operator, e),
96+
"processing segments");
97+
} finally {
98+
if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
99+
((AcquireReleaseColumnsSegmentOperator) operator).release();
100+
}
101+
}
102+
if (resultsBlock.getErrorMessages() != null) {
103+
// Propagate segment-level error immediately
104+
return attachExecutionStats(resultsBlock);
105+
}
106+
if (mergedBlock == null) {
107+
mergedBlock = resultsBlock;
108+
} else {
109+
_resultsBlockMerger.mergeResultsBlocks(mergedBlock, resultsBlock);
110+
}
111+
if (_resultsBlockMerger.isQuerySatisfied(mergedBlock)) {
112+
break;
113+
}
114+
}
115+
} finally {
116+
_totalWorkerThreadCpuTimeNs.addAndGet(resourceSnapshot.getCpuTimeNs());
117+
_totalWorkerThreadMemAllocatedBytes.addAndGet(resourceSnapshot.getAllocatedBytes());
118+
}
119+
return checkTerminateExceptionAndAttachExecutionStats(mergedBlock);
120+
}
121+
72122
@Override
73123
protected void processSegments() {
74124
int operatorId;

0 commit comments

Comments
 (0)