|
29 | 29 | import org.apache.pinot.core.operator.blocks.results.ExceptionResultsBlock; |
30 | 30 | import org.apache.pinot.core.operator.combine.merger.ResultsBlockMerger; |
31 | 31 | import org.apache.pinot.core.query.request.context.QueryContext; |
32 | | -import org.apache.pinot.spi.accounting.ThreadResourceSnapshot; |
33 | 32 | import org.apache.pinot.spi.exception.QueryErrorCode; |
34 | 33 | import org.apache.pinot.spi.exception.QueryErrorMessage; |
35 | 34 | import org.apache.pinot.spi.exception.QueryException; |
@@ -80,49 +79,54 @@ protected BaseResultsBlock getNextBlock() { |
80 | 79 | /// Avoids all concurrency overhead: no ExecutorService submission, no Phaser, no BlockingQueue, no atomics. |
81 | 80 | /// Respects the query deadline: if the timeout is exceeded before a segment operator is invoked, a timeout |
82 | 81 | /// results block is returned immediately rather than blocking indefinitely on a stalled operator. |
| 82 | + /// |
| 83 | + /// Note: we do NOT separately accumulate _totalWorkerThreadCpuTimeNs / _totalWorkerThreadMemAllocatedBytes here. |
| 84 | + /// This method runs on the calling (main) thread, which InstanceResponseOperator already measures via its own |
| 85 | + /// ThreadResourceSnapshot as mainThreadCpuTimeNs. Double-counting the same thread's CPU time would cause |
| 86 | + /// calSystemActivitiesCpuTimeNs to produce a negative value (clamped to 0), breaking resource usage stats. |
83 | 87 | @SuppressWarnings("unchecked") |
84 | 88 | private BaseResultsBlock getNextBlockSingleThread() { |
85 | | - ThreadResourceSnapshot resourceSnapshot = new ThreadResourceSnapshot(); |
86 | 89 | T mergedBlock = null; |
87 | 90 | long endTimeMs = _queryContext.getEndTimeMs(); |
88 | | - try { |
89 | | - for (int i = 0; i < _numOperators; i++) { |
90 | | - // Check timeout before invoking each segment operator so we respect the query deadline |
91 | | - // even if a segment operator blocks for a long time (mirrors mergeResults() timeout logic). |
92 | | - if (System.currentTimeMillis() >= endTimeMs) { |
93 | | - return attachExecutionStats(getTimeoutResultsBlock(i)); |
| 91 | + for (int i = 0; i < _numOperators; i++) { |
| 92 | + // Check timeout before invoking each segment operator so we respect the query deadline |
| 93 | + // even if a segment operator blocks for a long time (mirrors mergeResults() timeout logic). |
| 94 | + if (System.currentTimeMillis() >= endTimeMs) { |
| 95 | + return attachExecutionStats(getTimeoutResultsBlock(i)); |
| 96 | + } |
| 97 | + Operator operator = _operators.get(i); |
| 98 | + T resultsBlock; |
| 99 | + try { |
| 100 | + if (operator instanceof AcquireReleaseColumnsSegmentOperator) { |
| 101 | + ((AcquireReleaseColumnsSegmentOperator) operator).acquire(); |
94 | 102 | } |
95 | | - Operator operator = _operators.get(i); |
96 | | - T resultsBlock; |
| 103 | + resultsBlock = (T) operator.nextBlock(); |
| 104 | + } catch (RuntimeException e) { |
| 105 | + // wrapOperatorException either returns the exception (for EarlyTerminationException) or throws a new |
| 106 | + // QueryException. Either way we catch it here and convert to an error block, mirroring the multi-thread path |
| 107 | + // where onProcessSegmentsException handles all operator exceptions. |
97 | 108 | try { |
98 | | - if (operator instanceof AcquireReleaseColumnsSegmentOperator) { |
99 | | - ((AcquireReleaseColumnsSegmentOperator) operator).acquire(); |
100 | | - } |
101 | | - resultsBlock = (T) operator.nextBlock(); |
102 | | - } catch (RuntimeException e) { |
103 | | - return createExceptionResultsBlockAndAttachExecutionStats(wrapOperatorException(operator, e), |
104 | | - "processing segments"); |
105 | | - } finally { |
106 | | - if (operator instanceof AcquireReleaseColumnsSegmentOperator) { |
107 | | - ((AcquireReleaseColumnsSegmentOperator) operator).release(); |
108 | | - } |
109 | | - } |
110 | | - if (resultsBlock.getErrorMessages() != null) { |
111 | | - // Propagate segment-level error immediately |
112 | | - return attachExecutionStats(resultsBlock); |
113 | | - } |
114 | | - if (mergedBlock == null) { |
115 | | - mergedBlock = resultsBlock; |
116 | | - } else { |
117 | | - _resultsBlockMerger.mergeResultsBlocks(mergedBlock, resultsBlock); |
| 109 | + throw wrapOperatorException(operator, e); |
| 110 | + } catch (Exception wrapped) { |
| 111 | + return createExceptionResultsBlockAndAttachExecutionStats(wrapped, "processing segments"); |
118 | 112 | } |
119 | | - if (_resultsBlockMerger.isQuerySatisfied(mergedBlock)) { |
120 | | - break; |
| 113 | + } finally { |
| 114 | + if (operator instanceof AcquireReleaseColumnsSegmentOperator) { |
| 115 | + ((AcquireReleaseColumnsSegmentOperator) operator).release(); |
121 | 116 | } |
122 | 117 | } |
123 | | - } finally { |
124 | | - _totalWorkerThreadCpuTimeNs.addAndGet(resourceSnapshot.getCpuTimeNs()); |
125 | | - _totalWorkerThreadMemAllocatedBytes.addAndGet(resourceSnapshot.getAllocatedBytes()); |
| 118 | + if (resultsBlock.getErrorMessages() != null) { |
| 119 | + // Propagate segment-level error immediately |
| 120 | + return attachExecutionStats(resultsBlock); |
| 121 | + } |
| 122 | + if (mergedBlock == null) { |
| 123 | + mergedBlock = resultsBlock; |
| 124 | + } else { |
| 125 | + _resultsBlockMerger.mergeResultsBlocks(mergedBlock, resultsBlock); |
| 126 | + } |
| 127 | + if (_resultsBlockMerger.isQuerySatisfied(mergedBlock)) { |
| 128 | + break; |
| 129 | + } |
126 | 130 | } |
127 | 131 | return checkTerminateExceptionAndAttachExecutionStats(mergedBlock); |
128 | 132 | } |
|
0 commit comments