Skip to content

perf: skip concurrency overhead in BaseSingleBlockCombineOperator when numTasks=1#18146

Open
deeppatel710 wants to merge 4 commits intoapache:masterfrom
deeppatel710:perf/single-thread-combine-operator
Open

perf: skip concurrency overhead in BaseSingleBlockCombineOperator when numTasks=1#18146
deeppatel710 wants to merge 4 commits intoapache:masterfrom
deeppatel710:perf/single-thread-combine-operator

Conversation

@deeppatel710
Copy link
Copy Markdown
Contributor

Summary

Fixes #14617

When a query runs with a single execution task (one segment, or maxExecutionThreads=1), BaseSingleBlockCombineOperator still incurred the full multi-thread overhead:

  • ExecutorService.submit() — thread pool task submission
  • Phaser — register/deregister synchronization
  • BlockingQueue.poll() — with timeout waiting
  • AtomicInteger / AtomicReference — volatile reads/writes on the hot path

None of this is necessary when _numTasks == 1.

Change

Added a getNextBlockSingleThread() fast path in BaseSingleBlockCombineOperator.getNextBlock(): when _numTasks == 1 and _resultsBlockMerger is non-null, all segments are processed sequentially on the
calling thread with no synchronization overhead. CPU time and memory allocation are still tracked via ThreadResourceSnapshot.

Subclasses that override mergeResults() with custom logic (e.g. SequentialSortedGroupByCombineOperator, which passes null for _resultsBlockMerger) are unaffected and fall through to the standard
multi-thread path.

Test plan

  • All existing combine operator tests pass (125 tests)
  • CombineSlowOperatorsTest, CombineErrorOperatorsTest, SelectionCombineOperatorTest, SortedGroupByCombineOperatorsTest, CombinePlanNodeTest — all green

Copy link
Copy Markdown
Contributor

@xiangfu0 xiangfu0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found a critical correctness issue with the fast path implementation:

Timeout Handling Missing: The new getNextBlockSingleThread() method lacks timeout protection that exists in the original mergeResults() method. The original method checks _queryContext.getEndTimeMs() and returns a timeout results block if the deadline is exceeded. The fast path has no such protection, which means:

  1. A hanging segment operator could block indefinitely
  2. No respect for query timeout deadline
  3. Potential resource exhaustion if a segment operator stalls

The original mergeResults() path handles this via the _blockingQueue.poll(waitTimeMs, TimeUnit.MILLISECONDS) with explicit timeout checking. The fast path should implement similar timeout protection.

}
}

/// Processes all segments sequentially on the calling thread when only one task is needed.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing timeout protection. This fast path should respect _queryContext.getEndTimeMs() like the original mergeResults() method does. A hanging segment operator could block indefinitely here without timeout checking.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Added a System.currentTimeMillis() >= endTimeMs check at the top of each loop iteration before invoking operator.nextBlock(). If the deadline is exceeded, getTimeoutResultsBlock(i) is returned
immediately — exactly mirroring the waitTimeMs <= 0 guard in mergeResults(). A stalled segment operator will now be bypassed at the next iteration boundary rather than blocking indefinitely.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xiangfu0 still waiting on your stamp here, thanks

@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Apr 12, 2026

Codecov Report

❌ Patch coverage is 48.00000% with 13 lines in your changes missing coverage. Please review.
✅ Project coverage is 63.39%. Comparing base (3eff094) to head (c82a4cb).
⚠️ Report is 63 commits behind head on master.

Files with missing lines Patch % Lines
...erator/combine/BaseSingleBlockCombineOperator.java 48.00% 7 Missing and 6 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18146      +/-   ##
============================================
- Coverage     63.48%   63.39%   -0.10%     
- Complexity     1627     1679      +52     
============================================
  Files          3244     3253       +9     
  Lines        197365   198746    +1381     
  Branches      30540    30787     +247     
============================================
+ Hits         125306   125987     +681     
- Misses        62019    62673     +654     
- Partials      10040    10086      +46     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-11 ?
java-21 63.39% <48.00%> (-0.08%) ⬇️
temurin 63.39% <48.00%> (-0.10%) ⬇️
unittests 63.38% <48.00%> (-0.10%) ⬇️
unittests1 55.34% <48.00%> (-0.13%) ⬇️
unittests2 34.90% <32.00%> (-0.08%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Deep Patel and others added 4 commits April 26, 2026 09:23
…n numTasks=1

When a query runs with a single execution task (e.g. one segment or
maxExecutionThreads=1), BaseSingleBlockCombineOperator still incurred the
full multi-thread overhead: ExecutorService.submit(), Phaser registration/
deregistration, BlockingQueue.poll() with timeout, AtomicInteger, and
AtomicReference.

This adds a single-thread fast path in getNextBlock(): when _numTasks==1
and _resultsBlockMerger is non-null (i.e. the subclass uses the default
merge strategy), segments are processed sequentially on the calling thread
with none of that synchronization overhead. CPU time and memory are still
tracked via ThreadResourceSnapshot.

Subclasses that override mergeResults() with custom logic (e.g.
SequentialSortedGroupByCombineOperator, which passes null for
_resultsBlockMerger) are unaffected and continue using the standard path.

Fixes apache#14617

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ombineOperator

The getNextBlockSingleThread() fast path was missing the timeout protection
present in the original mergeResults() method. A stalled segment operator
could block indefinitely with no deadline enforcement.

Fix: check System.currentTimeMillis() >= endTimeMs before invoking each
segment operator. If the deadline is exceeded, return a timeout results block
immediately. This mirrors the waitTimeMs <= 0 guard in mergeResults().

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…hread fast path

Two bugs in getNextBlockSingleThread():

1. Resource stats double-counting:
   The method accumulated _totalWorkerThreadCpuTimeNs on the calling thread,
   but InstanceResponseOperator.getBaseBlock() already captures the same
   thread's CPU time as mainThreadCpuTimeNs. The calSystemActivitiesCpuTimeNs
   formula (wallClock - mainThread - workerThread/N) then subtracted the same
   work twice, producing a negative value clamped to 0. This broke
   testResourceUsageStats in the CPU/memory query killing integration tests.
   Fix: remove the ThreadResourceSnapshot tracking; the main thread's snapshot
   already accounts for all work done in the single-thread path.

2. Exception handling for operator failures:
   wrapOperatorException() throws (not returns) a new QueryException for most
   RuntimeExceptions. Calling it as an argument to createExceptionResultsBlock-
   AndAttachExecutionStats caused the thrown exception to escape the catch block
   uncaught, propagating to callers instead of being wrapped in an error block.
   This broke JsonExtractScalarTransformFunctionTest.mvWithNullsWithoutDefault.
   Fix: wrap in try/throw/catch to capture whatever wrapOperatorException
   throws or returns, then convert to an error block — mirroring the multi-
   thread path's onProcessSegmentsException handler.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@deeppatel710 deeppatel710 force-pushed the perf/single-thread-combine-operator branch from ce8cec5 to c82a4cb Compare April 26, 2026 16:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add special combine operator for single thread case

3 participants