Skip to content

Commit 52466bb

Browse files
mrhhsgCopilot
andcommitted
[improvement](be,fe) Address adaptive batch size review comments
### What problem does this PR solve? Issue Number: close #xxx Related PR: #61535 Problem Summary: Address review comments from velodb-robyang on the adaptive batch size PR. Changes include: 1. Move block_budget.h from common/ to util/ and update 8 include paths 2. Add doc comments to block.h explaining bytes()/allocated_bytes()/columns_byte_size() 3. Remove unused default parameter values from join_hash_table.h and vdata_stream_recvr.h 4. Add per-operator output block byte profile counters (OutputBlockBytes, MaxOutputBlockBytes, MinOutputBlockBytes) in the operator base class 5. Add comment to new_json_reader.h explaining row-based reader batch size 6. Fix preferred_block_size_bytes() to return 8MB default instead of 0; simplify block_max_rows() condition; add relationship documentation 7. Add bounds checking for session variables: preferredBlockSizeBytes [1MB,512MB], preferredBlockSizeRows [1,1000000], preferredMaxColumnInBlockSizeBytes [256KB,128MB] 8. Simplify file_scanner.cpp adaptive batch size condition ### Release note Added per-operator output block byte size profile counters (OutputBlockBytes, MaxOutputBlockBytes, MinOutputBlockBytes) for better observability of block sizes flowing through the query pipeline. Added bounds checking for adaptive batch size session variables. ### Check List (For Author) - Test: Regression test / Unit Test / Manual test - FE UT: JniScannerTest#testSetBatchSize passed - BE+FE build succeeded - Behavior changed: No - Does this need documentation: No Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent b892a78 commit 52466bb

File tree

59 files changed

+317
-187
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+317
-187
lines changed

be/src/core/block/block.h

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,12 +183,17 @@ class Block {
183183

184184
Status check_type_and_column() const;
185185

186-
/// Approximate number of bytes in memory - for profiling and limits.
186+
/// Approximate number of bytes used by column data in memory.
187+
/// This reflects the actual data footprint (e.g. string contents, numeric arrays)
188+
/// and is the metric used by adaptive batch size byte budgets.
187189
size_t bytes() const;
188190

191+
/// Returns per-column byte sizes as a comma-separated string (for debugging).
189192
std::string columns_bytes() const;
190193

191-
/// Approximate number of allocated bytes in memory - for profiling and limits.
194+
/// Approximate number of allocated (reserved) bytes in memory.
195+
/// This may be larger than bytes() due to pre-allocated capacity in vectors/arenas.
196+
/// Used for memory tracking and profiling.
192197
MOCK_FUNCTION size_t allocated_bytes() const;
193198

194199
/** Get a list of column names separated by commas. */
@@ -353,6 +358,8 @@ class Block {
353358
bool need_keep_first);
354359

355360
// Helper: sum byte_size() of all mutable columns.
361+
// Unlike Block::bytes() which operates on immutable ColumnPtr,
362+
// this works on MutableColumns during block construction (e.g. in BlockReader).
356363
static inline size_t columns_byte_size(const MutableColumns& cols) {
357364
size_t total = 0;
358365
for (const auto& col : cols) {

be/src/exec/common/hash_table/join_hash_table.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ class JoinHashTable {
107107
uint32_t* __restrict probe_idxs, bool& probe_visited,
108108
uint32_t* __restrict build_idxs, const uint8_t* null_map,
109109
bool with_other_conjuncts, bool is_mark_join, bool has_mark_join_conjunct,
110-
int batch_size_limit = 0) {
110+
int batch_size_limit) {
111111
const int effective_batch_size = batch_size_limit > 0 ? batch_size_limit : max_batch_size;
112112

113113
if ((JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||

be/src/exec/exchange/local_exchanger.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,8 @@ Status ShuffleExchanger::get_block(RuntimeState* state, Block* block, bool* eos,
150150
PartitionedBlock partitioned_block;
151151
MutableBlock mutable_block;
152152

153-
const auto max_batch_size = state->block_max_rows();
154-
const auto max_block_bytes = state->block_max_bytes();
153+
const auto max_batch_size = state->batch_size();
154+
const auto max_block_bytes = state->preferred_block_size_bytes();
155155

156156
auto get_data = [&]() -> Status {
157157
do {

be/src/exec/exchange/vdata_stream_recvr.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ class VDataStreamRecvr : public HasTaskExecutionCtx {
7979
MOCK_FUNCTION Status create_merger(const VExprContextSPtrs& ordering_expr,
8080
const std::vector<bool>& is_asc_order,
8181
const std::vector<bool>& nulls_first, size_t batch_size,
82-
int64_t limit, size_t offset, size_t block_max_bytes = 0);
82+
int64_t limit, size_t offset, size_t block_max_bytes);
8383

8484
std::vector<SenderQueue*> sender_queues() const { return _sender_queues; }
8585

be/src/exec/exchange/vdata_stream_sender.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -291,8 +291,8 @@ Status Channel::close(RuntimeState* state) {
291291
BlockSerializer::BlockSerializer(ExchangeSinkLocalState* parent, bool is_local)
292292
: _parent(parent),
293293
_is_local(is_local),
294-
_batch_size(parent->state()->block_max_rows()),
295-
_block_max_bytes(parent->state()->block_max_bytes()) {}
294+
_batch_size(parent->state()->batch_size()),
295+
_block_max_bytes(parent->state()->preferred_block_size_bytes()) {}
296296

297297
Status BlockSerializer::next_serialized_block(Block* block, PBlock* dest, size_t num_receivers,
298298
bool* serialized, bool eos, const uint32_t* data,
@@ -315,7 +315,7 @@ Status BlockSerializer::next_serialized_block(Block* block, PBlock* dest, size_t
315315

316316
if (_mutable_block->rows() >= _batch_size || eos ||
317317
(_mutable_block->rows() > 0 && _mutable_block->allocated_bytes() > _buffer_mem_limit) ||
318-
(_block_max_bytes > 0 && _mutable_block->bytes() >= _block_max_bytes)) {
318+
_mutable_block->bytes() >= _block_max_bytes) {
319319
if (!_is_local) {
320320
RETURN_IF_ERROR(_serialize_block(dest, num_receivers));
321321
}

be/src/exec/operator/aggregation_sink_operator.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -814,7 +814,7 @@ size_t AggSinkLocalState::get_reserve_mem_size(RuntimeState* state, bool eos) co
814814
if constexpr (std::is_same_v<HashTableCtxType, std::monostate>) {
815815
return 0;
816816
} else {
817-
return arg.hash_table->estimate_memory(state->block_max_rows());
817+
return arg.hash_table->estimate_memory(state->batch_size());
818818
}
819819
},
820820
_agg_data->method_variant);

be/src/exec/operator/aggregation_source_operator.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
#include <memory>
2121
#include <string>
2222

23-
#include "common/block_budget.h"
23+
#include "util/block_budget.h"
2424
#include "common/exception.h"
2525
#include "core/column/column_fixed_length_object.h"
2626
#include "exec/operator/operator.h"
@@ -123,7 +123,7 @@ Status AggLocalState::_get_results_with_serialized_key(RuntimeState* state, Bloc
123123
}
124124

125125
// Compute effective max rows based on estimated bytes per row.
126-
const BlockBudget budget(state->block_max_rows(), state->block_max_bytes());
126+
const BlockBudget budget(state->batch_size(), state->preferred_block_size_bytes());
127127
const size_t effective_max_rows = budget.effective_max_rows(_estimated_row_bytes);
128128

129129
std::visit(
@@ -319,7 +319,7 @@ Status AggLocalState::_get_with_serialized_key_result(RuntimeState* state, Block
319319
SCOPED_TIMER(_get_results_timer);
320320

321321
// Compute effective max rows based on estimated bytes per row.
322-
const BlockBudget budget(state->block_max_rows(), state->block_max_bytes());
322+
const BlockBudget budget(state->batch_size(), state->preferred_block_size_bytes());
323323
const size_t effective_max_rows = budget.effective_max_rows(_estimated_row_bytes);
324324

325325
std::visit(

be/src/exec/operator/distinct_streaming_aggregation_operator.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
#include <memory>
2323
#include <utility>
2424

25-
#include "common/block_budget.h"
25+
#include "util/block_budget.h"
2626
#include "common/compiler_util.h" // IWYU pragma: keep
2727
#include "exec/operator/streaming_agg_min_reduction.h"
2828
#include "exprs/vectorized_agg_fn.h"
@@ -38,8 +38,8 @@ namespace doris {
3838
DistinctStreamingAggLocalState::DistinctStreamingAggLocalState(RuntimeState* state,
3939
OperatorXBase* parent)
4040
: PipelineXLocalState<FakeSharedState>(state, parent),
41-
batch_size(state->block_max_rows()),
42-
block_max_bytes(state->block_max_bytes()),
41+
batch_size(state->batch_size()),
42+
block_max_bytes(state->preferred_block_size_bytes()),
4343
_agg_data(std::make_unique<DistinctDataVariants>()),
4444
_child_block(Block::create_unique()),
4545
_aggregated_block(Block::create_unique()),
@@ -397,7 +397,7 @@ Status DistinctStreamingAggOperatorX::pull(RuntimeState* state, Block* block, bo
397397

398398
bool DistinctStreamingAggOperatorX::need_more_input_data(RuntimeState* state) const {
399399
auto& local_state = get_local_state(state);
400-
const BlockBudget budget(state->block_max_rows(), state->block_max_bytes());
400+
const BlockBudget budget(state->batch_size(), state->preferred_block_size_bytes());
401401
const bool need_batch = local_state._stop_emplace_flag
402402
? local_state._aggregated_block->empty()
403403
: budget.within_budget(local_state._aggregated_block->rows(),

be/src/exec/operator/exchange_source_operator.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ Status ExchangeSourceOperatorX::get_block(RuntimeState* state, Block* block, boo
155155
SCOPED_TIMER(local_state.create_merger_timer);
156156
RETURN_IF_ERROR(local_state.stream_recvr->create_merger(
157157
local_state.vsort_exec_exprs.ordering_expr_ctxs(), _is_asc_order, _nulls_first,
158-
state->block_max_rows(), _limit, _offset, state->block_max_bytes()));
158+
state->batch_size(), _limit, _offset, state->preferred_block_size_bytes()));
159159
local_state.is_ready = true;
160160
return Status::OK();
161161
}

be/src/exec/operator/hashjoin_build_sink.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,15 +139,15 @@ size_t HashJoinBuildSinkLocalState::get_reserve_mem_size(RuntimeState* state, bo
139139
const auto bytes = _build_side_mutable_block.bytes();
140140
const auto allocated_bytes = _build_side_mutable_block.allocated_bytes();
141141
const auto bytes_per_row = bytes / build_block_rows;
142-
const auto estimated_size_of_next_block = bytes_per_row * state->block_max_rows();
142+
const auto estimated_size_of_next_block = bytes_per_row * state->batch_size();
143143
// If the new size is greater than 85% of allocalted bytes, it maybe need to realloc.
144144
if (((estimated_size_of_next_block + bytes) * 100 / allocated_bytes) >= 85) {
145145
size_to_reserve += static_cast<size_t>(static_cast<double>(allocated_bytes) * 1.15);
146146
}
147147
}
148148

149149
if (eos) {
150-
const size_t rows = build_block_rows + state->block_max_rows();
150+
const size_t rows = build_block_rows + state->batch_size();
151151
const auto bucket_size = hash_join_table_calc_bucket_size(rows);
152152

153153
size_to_reserve += bucket_size * sizeof(uint32_t); // JoinHashTable::first
@@ -606,7 +606,7 @@ Status HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state, Blo
606606
using HashTableCtxType = std::decay_t<decltype(arg)>;
607607
using JoinOpType = std::decay_t<decltype(join_op)>;
608608
ProcessHashTableBuild<HashTableCtxType> hash_table_build_process(
609-
rows, raw_ptrs, this, state->block_max_rows(), state);
609+
rows, raw_ptrs, this, state->batch_size(), state);
610610
auto st = hash_table_build_process.template run<JoinOpType::value>(
611611
arg, null_map_val ? &null_map_val->get_data() : nullptr,
612612
&_shared_state->_has_null_in_build_side,

0 commit comments

Comments
 (0)