Skip to content

Commit ce8cec5

Browse files
Deep Patelclaude
authored andcommitted
Fix resource stats double-counting and exception handling in single-thread fast path
Two bugs in getNextBlockSingleThread(): 1. Resource stats double-counting: The method accumulated _totalWorkerThreadCpuTimeNs on the calling thread, but InstanceResponseOperator.getBaseBlock() already captures the same thread's CPU time as mainThreadCpuTimeNs. The calSystemActivitiesCpuTimeNs formula (wallClock - mainThread - workerThread/N) then subtracted the same work twice, producing a negative value clamped to 0. This broke testResourceUsageStats in the CPU/memory query killing integration tests. Fix: remove the ThreadResourceSnapshot tracking; the main thread's snapshot already accounts for all work done in the single-thread path. 2. Exception handling for operator failures: wrapOperatorException() throws (not returns) a new QueryException for most RuntimeExceptions. Calling it as an argument to createExceptionResultsBlock- AndAttachExecutionStats caused the thrown exception to escape the catch block uncaught, propagating to callers instead of being wrapped in an error block. This broke JsonExtractScalarTransformFunctionTest.mvWithNullsWithoutDefault. Fix: wrap in try/throw/catch to capture whatever wrapOperatorException throws or returns, then convert to an error block — mirroring the multi- thread path's onProcessSegmentsException handler. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 8bf535d commit ce8cec5

1 file changed

Lines changed: 39 additions & 35 deletions

File tree

pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseSingleBlockCombineOperator.java

Lines changed: 39 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import org.apache.pinot.core.operator.blocks.results.ExceptionResultsBlock;
3030
import org.apache.pinot.core.operator.combine.merger.ResultsBlockMerger;
3131
import org.apache.pinot.core.query.request.context.QueryContext;
32-
import org.apache.pinot.spi.accounting.ThreadResourceSnapshot;
3332
import org.apache.pinot.spi.exception.QueryErrorCode;
3433
import org.apache.pinot.spi.exception.QueryErrorMessage;
3534
import org.apache.pinot.spi.exception.QueryException;
@@ -80,49 +79,54 @@ protected BaseResultsBlock getNextBlock() {
8079
/// Avoids all concurrency overhead: no ExecutorService submission, no Phaser, no BlockingQueue, no atomics.
8180
/// Respects the query deadline: if the timeout is exceeded before a segment operator is invoked, a timeout
8281
/// results block is returned immediately rather than blocking indefinitely on a stalled operator.
82+
///
83+
/// Note: we do NOT separately accumulate _totalWorkerThreadCpuTimeNs / _totalWorkerThreadMemAllocatedBytes here.
84+
/// This method runs on the calling (main) thread, which InstanceResponseOperator already measures via its own
85+
/// ThreadResourceSnapshot as mainThreadCpuTimeNs. Double-counting the same thread's CPU time would cause
86+
/// calSystemActivitiesCpuTimeNs to produce a negative value (clamped to 0), breaking resource usage stats.
8387
@SuppressWarnings("unchecked")
8488
private BaseResultsBlock getNextBlockSingleThread() {
85-
ThreadResourceSnapshot resourceSnapshot = new ThreadResourceSnapshot();
8689
T mergedBlock = null;
8790
long endTimeMs = _queryContext.getEndTimeMs();
88-
try {
89-
for (int i = 0; i < _numOperators; i++) {
90-
// Check timeout before invoking each segment operator so we respect the query deadline
91-
// even if a segment operator blocks for a long time (mirrors mergeResults() timeout logic).
92-
if (System.currentTimeMillis() >= endTimeMs) {
93-
return attachExecutionStats(getTimeoutResultsBlock(i));
91+
for (int i = 0; i < _numOperators; i++) {
92+
// Check timeout before invoking each segment operator so we respect the query deadline
93+
// even if a segment operator blocks for a long time (mirrors mergeResults() timeout logic).
94+
if (System.currentTimeMillis() >= endTimeMs) {
95+
return attachExecutionStats(getTimeoutResultsBlock(i));
96+
}
97+
Operator operator = _operators.get(i);
98+
T resultsBlock;
99+
try {
100+
if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
101+
((AcquireReleaseColumnsSegmentOperator) operator).acquire();
94102
}
95-
Operator operator = _operators.get(i);
96-
T resultsBlock;
103+
resultsBlock = (T) operator.nextBlock();
104+
} catch (RuntimeException e) {
105+
// wrapOperatorException either returns the exception (for EarlyTerminationException) or throws a new
106+
// QueryException. Either way we catch it here and convert to an error block, mirroring the multi-thread path
107+
// where onProcessSegmentsException handles all operator exceptions.
97108
try {
98-
if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
99-
((AcquireReleaseColumnsSegmentOperator) operator).acquire();
100-
}
101-
resultsBlock = (T) operator.nextBlock();
102-
} catch (RuntimeException e) {
103-
return createExceptionResultsBlockAndAttachExecutionStats(wrapOperatorException(operator, e),
104-
"processing segments");
105-
} finally {
106-
if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
107-
((AcquireReleaseColumnsSegmentOperator) operator).release();
108-
}
109-
}
110-
if (resultsBlock.getErrorMessages() != null) {
111-
// Propagate segment-level error immediately
112-
return attachExecutionStats(resultsBlock);
113-
}
114-
if (mergedBlock == null) {
115-
mergedBlock = resultsBlock;
116-
} else {
117-
_resultsBlockMerger.mergeResultsBlocks(mergedBlock, resultsBlock);
109+
throw wrapOperatorException(operator, e);
110+
} catch (Exception wrapped) {
111+
return createExceptionResultsBlockAndAttachExecutionStats(wrapped, "processing segments");
118112
}
119-
if (_resultsBlockMerger.isQuerySatisfied(mergedBlock)) {
120-
break;
113+
} finally {
114+
if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
115+
((AcquireReleaseColumnsSegmentOperator) operator).release();
121116
}
122117
}
123-
} finally {
124-
_totalWorkerThreadCpuTimeNs.addAndGet(resourceSnapshot.getCpuTimeNs());
125-
_totalWorkerThreadMemAllocatedBytes.addAndGet(resourceSnapshot.getAllocatedBytes());
118+
if (resultsBlock.getErrorMessages() != null) {
119+
// Propagate segment-level error immediately
120+
return attachExecutionStats(resultsBlock);
121+
}
122+
if (mergedBlock == null) {
123+
mergedBlock = resultsBlock;
124+
} else {
125+
_resultsBlockMerger.mergeResultsBlocks(mergedBlock, resultsBlock);
126+
}
127+
if (_resultsBlockMerger.isQuerySatisfied(mergedBlock)) {
128+
break;
129+
}
126130
}
127131
return checkTerminateExceptionAndAttachExecutionStats(mergedBlock);
128132
}

0 commit comments

Comments
 (0)