From c56db1ec1f9f690e763e765837a1a72ddec36f7b Mon Sep 17 00:00:00 2001 From: laihui Date: Mon, 8 Jun 2026 20:13:42 +0800 Subject: [PATCH 1/2] introduce adaptive random bucket load routing --- be/src/cloud/cloud_delta_writer.cpp | 23 +- be/src/cloud/cloud_delta_writer.h | 5 +- be/src/cloud/cloud_tablets_channel.cpp | 18 ++ be/src/common/config.cpp | 7 + be/src/common/config.h | 7 + be/src/exec/sink/delta_writer_v2_pool.cpp | 1 + be/src/exec/sink/vrow_distribution.cpp | 20 +- be/src/exec/sink/vrow_distribution.h | 7 +- be/src/exec/sink/vtablet_finder.cpp | 78 ++++- be/src/exec/sink/vtablet_finder.h | 47 ++- be/src/exec/sink/writer/vtablet_writer.cpp | 158 ++++++++-- be/src/exec/sink/writer/vtablet_writer.h | 9 +- be/src/exec/sink/writer/vtablet_writer_v2.cpp | 14 +- be/src/load/channel/load_channel_mgr.cpp | 2 +- be/src/load/channel/tablets_channel.cpp | 189 ++++++++++- be/src/load/channel/tablets_channel.h | 18 ++ be/src/load/delta_writer/delta_writer.cpp | 29 +- be/src/load/delta_writer/delta_writer.h | 12 +- .../load/delta_writer/delta_writer_context.h | 1 + be/src/load/delta_writer/delta_writer_v2.cpp | 18 +- be/src/load/delta_writer/delta_writer_v2.h | 4 +- .../load/memtable/memtable_memory_limiter.cpp | 71 ++++- .../load/memtable/memtable_memory_limiter.h | 4 +- be/src/load/memtable/memtable_writer.cpp | 34 +- be/src/load/memtable/memtable_writer.h | 6 +- be/src/storage/tablet_info.cpp | 25 +- be/src/storage/tablet_info.h | 7 + .../java/org/apache/doris/common/Config.java | 11 + .../apache/doris/planner/OlapTableSink.java | 298 ++++++++++++++++++ .../java/org/apache/doris/qe/Coordinator.java | 53 ++++ .../doris/qe/runtime/ThriftPlansBuilder.java | 61 ++++ .../doris/service/FrontendServiceImpl.java | 130 ++++++++ gensrc/proto/internal_service.proto | 9 +- gensrc/thrift/DataSinks.thrift | 8 + gensrc/thrift/Descriptors.thrift | 9 +- 35 files changed, 1328 insertions(+), 65 deletions(-) diff --git a/be/src/cloud/cloud_delta_writer.cpp b/be/src/cloud/cloud_delta_writer.cpp index d51d5d8b5763bc..8b558de938293e 100644 --- a/be/src/cloud/cloud_delta_writer.cpp +++ b/be/src/cloud/cloud_delta_writer.cpp @@ -22,6 +22,8 @@ #include "cloud/cloud_storage_engine.h" #include "cloud/config.h" #include "load/delta_writer/delta_writer.h" +#include "load/memtable/memtable_memory_limiter.h" +#include "runtime/exec_env.h" #include "runtime/thread_context.h" namespace doris { @@ -64,10 +66,22 @@ Status CloudDeltaWriter::batch_init(std::vector writers) { return cloud::bthread_fork_join(tasks, 10); } -Status CloudDeltaWriter::write(const Block* block, const DorisVector& row_idxs) { +Status CloudDeltaWriter::write(const Block* block, const DorisVector& row_idxs, + bool* memtable_flushed) { + if (memtable_flushed != nullptr) { + *memtable_flushed = false; + } if (row_idxs.empty()) [[unlikely]] { return Status::OK(); } + if (_req.enable_table_memtable_backpressure) { + ExecEnv::GetInstance()->memtable_memory_limiter()->handle_table_memtable_backpressure( + [this]() { + std::lock_guard lock(_mtx); + return _is_cancelled; + }, + table_id()); + } std::lock_guard lock(_mtx); CHECK(_is_init || _is_cancelled); { @@ -77,7 +91,7 @@ Status CloudDeltaWriter::write(const Block* block, const DorisVector& std::this_thread::sleep_for(std::chrono::milliseconds(10)); } } - return _memtable_writer->write(block, row_idxs); + return _memtable_writer->write(block, row_idxs, memtable_flushed); } Status CloudDeltaWriter::close() { @@ -86,6 +100,11 @@ Status CloudDeltaWriter::close() { return _memtable_writer->close(); } +Status CloudDeltaWriter::flush_memtable_async() { + std::lock_guard lock(_mtx); + return BaseDeltaWriter::flush_memtable_async(); +} + Status CloudDeltaWriter::cancel_with_status(const Status& st) { std::lock_guard lock(_mtx); return BaseDeltaWriter::cancel_with_status(st); diff --git a/be/src/cloud/cloud_delta_writer.h b/be/src/cloud/cloud_delta_writer.h index 614bfd0f16af0b..6ab9251213fdbe 100644 --- a/be/src/cloud/cloud_delta_writer.h +++ b/be/src/cloud/cloud_delta_writer.h @@ -33,10 +33,13 @@ class CloudDeltaWriter final : public BaseDeltaWriter { const UniqueId& load_id); ~CloudDeltaWriter() override; - Status write(const Block* block, const DorisVector& row_idxs) override; + Status write(const Block* block, const DorisVector& row_idxs, + bool* memtable_flushed = nullptr) override; Status close() override; + Status flush_memtable_async() override; + Status cancel_with_status(const Status& st) override; Status build_rowset() override; diff --git a/be/src/cloud/cloud_tablets_channel.cpp b/be/src/cloud/cloud_tablets_channel.cpp index 590677a38d7de0..02caf8084fe87e 100644 --- a/be/src/cloud/cloud_tablets_channel.cpp +++ b/be/src/cloud/cloud_tablets_channel.cpp @@ -60,6 +60,24 @@ Status CloudTabletsChannel::add_batch(const PTabletWriterAddBlockRequest& reques return Status::OK(); } + if (request.is_receiver_side_random_bucket()) { + std::unordered_map> partition_to_rowidxs; + _build_partition_to_rowidxs_for_receiver_side_random_bucket(request, &partition_to_rowidxs); + if (!partition_to_rowidxs.empty()) { + std::unordered_set partition_ids; + partition_ids.reserve(partition_to_rowidxs.size()); + for (const auto& [partition_id, _] : partition_to_rowidxs) { + partition_ids.insert(partition_id); + } + { + std::lock_guard l(_tablet_writers_lock); + RETURN_IF_ERROR(_init_writers_by_partition_ids(partition_ids)); + } + } + return _write_block_data_for_receiver_side_random_bucket(request, cur_seq, + partition_to_rowidxs, response); + } + std::unordered_map> tablet_to_rowidxs; _build_tablet_to_rowidxs(request, &tablet_to_rowidxs); diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 8b9158d8f67a7d..1811c4a424c6cb 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -726,6 +726,8 @@ DEFINE_mInt32(memory_gc_sleep_time_ms, "500"); // max write buffer size before flush, default 200MB DEFINE_mInt64(write_buffer_size, "209715200"); DEFINE_mBool(enable_adaptive_write_buffer_size, "true"); +// Whether random bucket load rotates to the next local bucket when memtable flushes. +DEFINE_mBool(enable_adaptive_random_bucket_load_bucket_rotation, "true"); // max buffer size used in memtable for the aggregated table, default 400MB DEFINE_mInt64(write_buffer_size_for_agg, "104857600"); DEFINE_mInt64(min_write_buffer_size_for_partial_update, "1048576"); @@ -838,6 +840,11 @@ DEFINE_mDouble(min_flush_thread_num_per_cpu, "0.5"); // Whether to enable adaptive flush thread adjustment DEFINE_mBool(enable_adaptive_flush_threads, "true"); +// Whether to block writes when one table has too many pending flush memtables on this BE. +DEFINE_mBool(enable_table_memtable_flush_backpressure, "true"); +// Max pending flush memtables for one table on this BE before blocking new writes. +DEFINE_mInt32(table_memtable_flush_pending_count_limit, "10"); + // config for tablet meta checkpoint DEFINE_mInt32(tablet_meta_checkpoint_min_new_rowsets_num, "10"); DEFINE_mInt32(tablet_meta_checkpoint_min_interval_secs, "600"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 930e9f67fb6ffe..144420c67f5564 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -785,6 +785,8 @@ DECLARE_mInt32(memory_gc_sleep_time_ms); // max write buffer size before flush, default 200MB DECLARE_mInt64(write_buffer_size); DECLARE_mBool(enable_adaptive_write_buffer_size); +// Whether random bucket load rotates to the next local bucket when memtable flushes. +DECLARE_mBool(enable_adaptive_random_bucket_load_bucket_rotation); // max buffer size used in memtable for the aggregated table, default 400MB DECLARE_mInt64(write_buffer_size_for_agg); @@ -894,6 +896,11 @@ DECLARE_mDouble(min_flush_thread_num_per_cpu); // Whether to enable adaptive flush thread adjustment DECLARE_mBool(enable_adaptive_flush_threads); +// Whether to block writes when one table has too many pending flush memtables on this BE. +DECLARE_mBool(enable_table_memtable_flush_backpressure); +// Max pending flush memtables for one table on this BE before blocking new writes. +DECLARE_mInt32(table_memtable_flush_pending_count_limit); + // config for tablet meta checkpoint DECLARE_mInt32(tablet_meta_checkpoint_min_new_rowsets_num); DECLARE_mInt32(tablet_meta_checkpoint_min_interval_secs); diff --git a/be/src/exec/sink/delta_writer_v2_pool.cpp b/be/src/exec/sink/delta_writer_v2_pool.cpp index de03011bf2eaa7..9cfcdf87b61078 100644 --- a/be/src/exec/sink/delta_writer_v2_pool.cpp +++ b/be/src/exec/sink/delta_writer_v2_pool.cpp @@ -17,6 +17,7 @@ #include "exec/sink/delta_writer_v2_pool.h" +#include "exec/sink/vtablet_finder.h" #include "load/delta_writer/delta_writer_v2.h" #include "runtime/runtime_profile.h" diff --git a/be/src/exec/sink/vrow_distribution.cpp b/be/src/exec/sink/vrow_distribution.cpp index 40a20c08d5a1a6..6d2b82ae5ba3df 100644 --- a/be/src/exec/sink/vrow_distribution.cpp +++ b/be/src/exec/sink/vrow_distribution.cpp @@ -289,7 +289,9 @@ void VRowDistribution::_filter_block_by_skip(Block* block, RowPartTabletIds& row if (!_skip[i]) { row_ids.emplace_back(i); partition_ids.emplace_back(_partitions[i]->id); - tablet_ids.emplace_back(_tablet_ids[i]); + if (!_tablet_finder->is_adaptive_random_bucket()) { + tablet_ids.emplace_back(_tablet_ids[i]); + } } } } @@ -312,7 +314,9 @@ Status VRowDistribution::_filter_block_by_skip_and_where_clause( if (nullable_column->get_bool_inline(i) && !_skip[i]) { row_ids.emplace_back(i); partition_ids.emplace_back(_partitions[i]->id); - tablet_ids.emplace_back(_tablet_ids[i]); + if (!_tablet_finder->is_adaptive_random_bucket()) { + tablet_ids.emplace_back(_tablet_ids[i]); + } } } } else if (const auto* const_column = check_and_get_column(*filter_column)) { @@ -331,7 +335,9 @@ Status VRowDistribution::_filter_block_by_skip_and_where_clause( if (filter[i] != 0 && !_skip[i]) { row_ids.emplace_back(i); partition_ids.emplace_back(_partitions[i]->id); - tablet_ids.emplace_back(_tablet_ids[i]); + if (!_tablet_finder->is_adaptive_random_bucket()) { + tablet_ids.emplace_back(_tablet_ids[i]); + } } } } @@ -342,7 +348,9 @@ Status VRowDistribution::_filter_block_by_skip_and_where_clause( Status VRowDistribution::_filter_block(Block* block, std::vector& row_part_tablet_ids) { for (int i = 0; i < _schema->indexes().size(); i++) { - _get_tablet_ids(block, i, _tablet_ids); + if (!_tablet_finder->is_adaptive_random_bucket()) { + _get_tablet_ids(block, i, _tablet_ids); + } auto& where_clause = _schema->indexes()[i]->where_clause; if (where_clause != nullptr) { RETURN_IF_ERROR(_filter_block_by_skip_and_where_clause(block, where_clause, @@ -524,7 +532,9 @@ void VRowDistribution::_reset_row_part_tablet_ids( // This is important for performance. row_ids.reserve(rows); partition_ids.reserve(rows); - tablet_ids.reserve(rows); + if (!_tablet_finder->is_adaptive_random_bucket()) { + tablet_ids.reserve(rows); + } } } diff --git a/be/src/exec/sink/vrow_distribution.h b/be/src/exec/sink/vrow_distribution.h index afc669f95bc63d..cdd4f5f88cbdbd 100644 --- a/be/src/exec/sink/vrow_distribution.h +++ b/be/src/exec/sink/vrow_distribution.h @@ -57,7 +57,12 @@ class RowPartTabletIds { std::string value; value.reserve(row_ids.size() * 15); for (int i = 0; i < row_ids.size(); i++) { - value.append(fmt::format("[{}, {}, {}]", row_ids[i], partition_ids[i], tablet_ids[i])); + if (i < tablet_ids.size()) { + value.append( + fmt::format("[{}, {}, {}]", row_ids[i], partition_ids[i], tablet_ids[i])); + } else { + value.append(fmt::format("[{}, {}]", row_ids[i], partition_ids[i])); + } } return value; } diff --git a/be/src/exec/sink/vtablet_finder.cpp b/be/src/exec/sink/vtablet_finder.cpp index edbf6475560d09..5a5f95c70f1343 100644 --- a/be/src/exec/sink/vtablet_finder.cpp +++ b/be/src/exec/sink/vtablet_finder.cpp @@ -22,16 +22,89 @@ #include #include +#include #include #include #include "common/compiler_util.h" // IWYU pragma: keep +#include "common/config.h" #include "common/status.h" #include "core/block/block.h" #include "runtime/runtime_state.h" #include "storage/tablet_info.h" namespace doris { + +void AdaptiveRandomBucketState::init_partition(int64_t partition_id, + const std::vector& tablets, + const std::vector& bucket_seqs, + int32_t start_tablet_idx) { + if (partition_id < 0 || tablets.empty()) { + return; + } + std::lock_guard lock(_mutex); + if (_partition_states.contains(partition_id)) { + return; + } + + PartitionState state; + state.partition_id = partition_id; + state.tablets = tablets; + state.bucket_seqs = bucket_seqs; + if (start_tablet_idx >= 0 && start_tablet_idx < state.tablets.size()) { + state.tablet_pos = start_tablet_idx; + } + state.current_tablet_id = state.tablets[state.tablet_pos]; + + for (int32_t tablet_pos = 0; tablet_pos < state.tablets.size(); ++tablet_pos) { + _tablet_to_partition[state.tablets[tablet_pos]] = partition_id; + _tablet_to_bucket[state.tablets[tablet_pos]] = tablet_pos; + } + _partition_states.emplace(partition_id, std::move(state)); + LOG(INFO) << "FIND_TABLET_RANDOM_BUCKET: load_id=" << _load_id << ", partition=" << partition_id + << ", local tablet count=" << tablets.size() + << ", start tablet=" << _partition_states.at(partition_id).current_tablet_id; +} + +int64_t AdaptiveRandomBucketState::current_tablet(int64_t partition_id) { + std::lock_guard lock(_mutex); + auto it = _partition_states.find(partition_id); + if (it == _partition_states.end()) { + return -1; + } + return it->second.current_tablet_id; +} + +void AdaptiveRandomBucketState::rotate_by_tablet(int64_t tablet_id) { + if (!config::enable_adaptive_random_bucket_load_bucket_rotation) { + return; + } + std::lock_guard lock(_mutex); + auto partition_it = _tablet_to_partition.find(tablet_id); + if (partition_it == _tablet_to_partition.end()) { + return; + } + auto state_it = _partition_states.find(partition_it->second); + if (state_it == _partition_states.end()) { + return; + } + auto bucket_it = _tablet_to_bucket.find(tablet_id); + if (bucket_it == _tablet_to_bucket.end()) { + return; + } + auto& state = state_it->second; + if (bucket_it->second != state.tablet_pos) { + return; + } + int32_t next_pos = (state.tablet_pos + 1) % static_cast(state.tablets.size()); + LOG(INFO) << "FIND_TABLET_RANDOM_BUCKET: load_id=" << _load_id + << ", partition=" << state.partition_id << " shared rotate tablet " + << state.current_tablet_id << " -> " << state.tablets[next_pos] + << " after tablet=" << tablet_id << " memtable flushed"; + state.tablet_pos = next_pos; + state.current_tablet_id = state.tablets[next_pos]; +} + Status OlapTabletFinder::find_tablets(RuntimeState* state, Block* block, int rows, std::vector& partitions, std::vector& tablet_index, std::vector& skip, @@ -82,8 +155,11 @@ Status OlapTabletFinder::find_tablets(RuntimeState* state, Block* block, int row if (_find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_ROW) { _vpartition->find_tablets(block, qualified_rows, partitions, tablet_index); + } else if (_find_tablet_mode == FindTabletMode::FIND_TABLET_RANDOM_BUCKET) { + // Receiver-side random bucket mode only needs partition ids on sender side. + // The receiver decides the concrete tablet from its local ordered tablet list. } else { - // for random distribution + // FIND_TABLET_EVERY_BATCH / FIND_TABLET_EVERY_SINK _vpartition->find_tablets(block, qualified_rows, partitions, tablet_index, &_partition_to_tablet_map); if (_find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_BATCH) { diff --git a/be/src/exec/sink/vtablet_finder.h b/be/src/exec/sink/vtablet_finder.h index 32b173985e61e9..ce3b2c317b7539 100644 --- a/be/src/exec/sink/vtablet_finder.h +++ b/be/src/exec/sink/vtablet_finder.h @@ -19,15 +19,45 @@ #include #include +#include +#include +#include +#include #include "common/status.h" #include "core/block/block.h" #include "exec/common/hash_table/phmap_fwd_decl.h" #include "storage/tablet_info.h" #include "util/bitmap.h" +#include "util/uid_util.h" namespace doris { +class AdaptiveRandomBucketState { +public: + explicit AdaptiveRandomBucketState(UniqueId load_id) : _load_id(load_id) {} + + void init_partition(int64_t partition_id, const std::vector& tablets, + const std::vector& bucket_seqs, int32_t start_tablet_idx); + int64_t current_tablet(int64_t partition_id); + void rotate_by_tablet(int64_t tablet_id); + +private: + struct PartitionState { + int64_t partition_id = -1; + std::vector tablets; + std::vector bucket_seqs; + int32_t tablet_pos = 0; + int64_t current_tablet_id = -1; + }; + + std::mutex _mutex; + UniqueId _load_id; + std::unordered_map _partition_states; + std::unordered_map _tablet_to_partition; + std::unordered_map _tablet_to_bucket; +}; + class OlapTabletFinder { public: // FIND_TABLET_EVERY_ROW is used for hash distribution info, which indicates that we @@ -37,7 +67,13 @@ class OlapTabletFinder { // FIND_TABLET_EVERY_SINK is used for random distribution info when load_to_single_tablet set to true, // which indicates that we should only compute tablet index in the corresponding partition once for the // whole time in olap table sink - enum FindTabletMode { FIND_TABLET_EVERY_ROW, FIND_TABLET_EVERY_BATCH, FIND_TABLET_EVERY_SINK }; + // FIND_TABLET_RANDOM_BUCKET is used for V1 receiver-side random bucket mode. + enum FindTabletMode { + FIND_TABLET_EVERY_ROW, + FIND_TABLET_EVERY_BATCH, + FIND_TABLET_EVERY_SINK, + FIND_TABLET_RANDOM_BUCKET + }; OlapTabletFinder(VOlapTablePartitionParam* vpartition, FindTabletMode mode) : _vpartition(vpartition), _find_tablet_mode(mode), _filter_bitmap(1024) {}; @@ -51,7 +87,14 @@ class OlapTabletFinder { return _find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_SINK; } - bool is_single_tablet() { return _partition_to_tablet_map.size() == 1; } + bool is_adaptive_random_bucket() const { + return _find_tablet_mode == FindTabletMode::FIND_TABLET_RANDOM_BUCKET; + } + + bool is_single_tablet() { + return _find_tablet_mode != FindTabletMode::FIND_TABLET_RANDOM_BUCKET && + _partition_to_tablet_map.size() == 1; + } // all partitions for multi find-processes of its relative writer. const flat_hash_set& partition_ids() { return _partition_ids; } diff --git a/be/src/exec/sink/writer/vtablet_writer.cpp b/be/src/exec/sink/writer/vtablet_writer.cpp index c77a94c501585b..f8d06dc3a4e1ef 100644 --- a/be/src/exec/sink/writer/vtablet_writer.cpp +++ b/be/src/exec/sink/writer/vtablet_writer.cpp @@ -41,6 +41,7 @@ #include #include #include +#include #include #include @@ -52,10 +53,6 @@ #include "exprs/vexpr_fwd.h" #include "runtime/runtime_profile.h" -#ifdef DEBUG -#include -#endif - #include "common/compiler_util.h" // IWYU pragma: keep #include "common/logging.h" #include "common/metrics/doris_metrics.h" @@ -133,6 +130,15 @@ Status IndexChannel::init(RuntimeState* state, const std::vectorsecond; } channel->add_tablet(tablet); + if (_parent->_tablet_finder->is_adaptive_random_bucket() && config::is_cloud_mode()) { + for (const auto* part : _parent->_vpartition->get_partitions()) { + if (part->id != tablet.partition_id || part->bucket_be_id != replica_node_id) { + continue; + } + _channels_by_partition.emplace(tablet.partition_id, channel); + break; + } + } if (_parent->_write_single_replica) { auto* slave_location = _parent->_slave_location->find_tablet(tablet.tablet_id); if (slave_location != nullptr) { @@ -573,6 +579,8 @@ Status VNodeChannel::init(RuntimeState* state) { _cur_add_block_request->set_sender_id(_parent->_sender_id); _cur_add_block_request->set_backend_id(_node_id); _cur_add_block_request->set_eos(false); + _cur_add_block_request->set_is_receiver_side_random_bucket( + _parent->_tablet_finder->is_adaptive_random_bucket()); // add block closure // Has to using value to capture _task_exec_ctx because tablet writer may destroyed during callback. @@ -632,6 +640,8 @@ void VNodeChannel::_open_internal(bool is_incremental) { if (_parent->_t_sink.olap_table_sink.__isset.storage_vault_id) { request->set_storage_vault_id(_parent->_t_sink.olap_table_sink.storage_vault_id); } + request->set_is_receiver_side_random_bucket( + _parent->_tablet_finder->is_adaptive_random_bucket()); std::set deduper; for (auto& tablet : _tablets_wait_open) { if (deduper.contains(tablet.tablet_id)) { @@ -657,6 +667,69 @@ void VNodeChannel::_open_internal(bool is_incremental) { request->set_txn_expiration(_parent->_txn_expiration); request->set_write_file_cache(_parent->_write_file_cache); + if (_parent->_tablet_finder->is_adaptive_random_bucket()) { + std::unordered_map> partition_to_ordered_tablets; + std::unordered_map> partition_to_local_tablets; + for (const auto& tablet : _all_tablets) { + partition_to_ordered_tablets[tablet.partition_id].push_back(tablet.tablet_id); + partition_to_local_tablets[tablet.partition_id].insert(tablet.tablet_id); + } + std::unordered_map id_to_partition; + for (const auto* part : _parent->_vpartition->get_partitions()) { + id_to_partition.emplace(part->id, part); + } + for (const auto& [partition_id, ordered_tablets] : partition_to_ordered_tablets) { + auto* random_bucket_partition = request->add_random_bucket_partitions(); + random_bucket_partition->set_partition_id(partition_id); + auto partition_it = id_to_partition.find(partition_id); + if (partition_it == id_to_partition.end()) { + LOG(WARNING) << "unknown partition for adaptive random bucket, load_id=" + << _parent->_load_id << ", partition_id=" << partition_id; + continue; + } + const auto& local_bucket_seqs = partition_it->second->local_bucket_seqs; + if (!local_bucket_seqs.empty()) { + const std::vector* full_ordered_tablets = nullptr; + for (const auto& index : partition_it->second->indexes) { + if (index.index_id == _index_channel->_index_id) { + full_ordered_tablets = &index.tablets; + break; + } + } + if (full_ordered_tablets == nullptr) { + LOG(WARNING) << "unknown index for adaptive random bucket, load_id=" + << _parent->_load_id << ", partition_id=" << partition_id + << ", index_id=" << _index_channel->_index_id; + continue; + } + for (auto bucket_seq : local_bucket_seqs) { + if (bucket_seq < 0 || + bucket_seq >= cast_set(full_ordered_tablets->size())) { + LOG(WARNING) << "invalid local bucket seq, load_id=" << _parent->_load_id + << ", partition_id=" << partition_id + << ", bucket_seq=" << bucket_seq + << ", full_ordered_tablets_size=" + << full_ordered_tablets->size(); + continue; + } + auto tablet_id = (*full_ordered_tablets)[bucket_seq]; + if (!partition_to_local_tablets[partition_id].contains(tablet_id)) { + LOG(WARNING) << "skip non-local tablet selected by local bucket seq, load_id=" + << _parent->_load_id << ", partition_id=" << partition_id + << ", bucket_seq=" << bucket_seq + << ", tablet_id=" << tablet_id << ", node_id=" << _node_id; + continue; + } + random_bucket_partition->add_ordered_tablet_ids(tablet_id); + } + continue; + } + for (auto tablet_id : ordered_tablets) { + random_bucket_partition->add_ordered_tablet_ids(tablet_id); + } + } + } + if (_wg_id > 0) { request->set_workload_group_id(_wg_id); } @@ -728,9 +801,11 @@ Status VNodeChannel::open_wait() { Status VNodeChannel::add_block(Block* block, const Payload* payload) { SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get()); - if (payload->second.empty()) { + if (payload->row_part_tablet_ids == nullptr || payload->row_ids == nullptr || + payload->row_ids->empty()) { return Status::OK(); } + DCHECK_EQ(payload->row_ids->size(), payload->route_idxs.size()); // If add_block() when _eos_is_produced==true, there must be sth wrong, we can only mark this channel as failed. auto st = none_of({_cancelled, _eos_is_produced}); if (!st.ok()) { @@ -777,13 +852,17 @@ Status VNodeChannel::add_block(Block* block, const Payload* payload) { } SCOPED_RAW_TIMER(&_stat.append_node_channel_ns); - st = block->append_to_block_by_selector(_cur_mutable_block.get(), *(payload->first)); + st = block->append_to_block_by_selector(_cur_mutable_block.get(), *payload->row_ids); if (!st.ok()) { _cancel_with_msg(fmt::format("{}, err: {}", channel_info(), st.to_string())); return st; } - for (auto tablet_id : payload->second) { - _cur_add_block_request->add_tablet_ids(tablet_id); + auto* row_part_tablet_ids = payload->row_part_tablet_ids; + for (uint32_t route_idx : payload->route_idxs) { + _cur_add_block_request->add_partition_ids(row_part_tablet_ids->partition_ids[route_idx]); + if (!_parent->_tablet_finder->is_adaptive_random_bucket()) { + _cur_add_block_request->add_tablet_ids(row_part_tablet_ids->tablet_ids[route_idx]); + } } _write_bytes.fetch_add(_cur_mutable_block->bytes()); @@ -808,6 +887,7 @@ Status VNodeChannel::add_block(Block* block, const Payload* payload) { } _cur_mutable_block = MutableBlock::create_unique(block->clone_empty()); _cur_add_block_request->clear_tablet_ids(); + _cur_add_block_request->clear_partition_ids(); } return Status::OK(); @@ -930,9 +1010,10 @@ void VNodeChannel::try_send_pending_block(RuntimeState* state) { // tablet_ids has already set when add row request->set_packet_seq(_next_packet_seq); auto block = mutable_block->to_block(); - CHECK(block.rows() == request->tablet_ids_size()) - << "block rows: " << block.rows() - << ", tablet_ids_size: " << request->tablet_ids_size(); + int request_rows = request->is_receiver_side_random_bucket() ? request->partition_ids_size() + : request->tablet_ids_size(); + CHECK(block.rows() == request_rows) + << "block rows: " << block.rows() << ", request_rows: " << request_rows; if (block.rows() > 0) { SCOPED_ATOMIC_TIMER(&_serialize_batch_ns); size_t uncompressed_bytes = 0, compressed_bytes = 0; @@ -973,8 +1054,10 @@ void VNodeChannel::try_send_pending_block(RuntimeState* state) { } if (request->eos()) { - for (auto pid : _parent->_tablet_finder->partition_ids()) { - request->add_partition_ids(pid); + if (!request->is_receiver_side_random_bucket()) { + for (auto pid : _parent->_tablet_finder->partition_ids()) { + request->add_partition_ids(pid); + } } request->set_write_single_replica(_parent->_write_single_replica); @@ -1535,12 +1618,20 @@ Status VTabletWriter::_init(RuntimeState* state, RuntimeProfile* profile) { if (table_sink.__isset.send_batch_parallelism && table_sink.send_batch_parallelism > 1) { _send_batch_parallelism = table_sink.send_batch_parallelism; } - // if distributed column list is empty, we can ensure that tablet is with random distribution info - // and if load_to_single_tablet is set and set to true, we should find only one tablet in one partition - // for the whole olap table sink + // If distributed column list is empty, the table uses random distribution. + // Mode priority (highest to lowest): + // 1. FIND_TABLET_RANDOM_BUCKET: FE set enable_adaptive_random_bucket on the sink, + // meaning enable_adaptive_random_bucket_load is ON. Using a sink-level flag (mirroring + // load_to_single_tablet) ensures the mode is fixed correctly even when the initial + // partition list is empty (e.g. auto-partition tables on first load). + // 2. FIND_TABLET_EVERY_SINK: load_to_single_tablet=true (legacy single-tablet mode). + // 3. FIND_TABLET_EVERY_BATCH: default round-robin per batch. auto find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_ROW; if (table_sink.partition.distributed_columns.empty()) { - if (table_sink.__isset.load_to_single_tablet && table_sink.load_to_single_tablet) { + if (table_sink.__isset.enable_adaptive_random_bucket && + table_sink.enable_adaptive_random_bucket && config::is_cloud_mode()) { + find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_RANDOM_BUCKET; + } else if (table_sink.__isset.load_to_single_tablet && table_sink.load_to_single_tablet) { find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_SINK; } else { find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_BATCH; @@ -2014,11 +2105,32 @@ void VTabletWriter::_generate_one_index_channel_payload( RowPartTabletIds& row_part_tablet_id, int32_t index_idx, ChannelDistributionPayload& channel_payload) { auto& row_ids = row_part_tablet_id.row_ids; + auto& partition_ids = row_part_tablet_id.partition_ids; auto& tablet_ids = row_part_tablet_id.tablet_ids; size_t row_cnt = row_ids.size(); for (size_t i = 0; i < row_ids.size(); i++) { + if (_tablet_finder->is_adaptive_random_bucket() && config::is_cloud_mode()) { + auto partition_it = _channels[index_idx]->_channels_by_partition.find(partition_ids[i]); + DCHECK(partition_it != _channels[index_idx]->_channels_by_partition.end()) + << "unknown partition channel, partition_id=" << partition_ids[i]; + auto payload_it = + channel_payload.find(partition_it->second.get()); // + if (payload_it == channel_payload.end()) { + auto [tmp_it, _] = channel_payload.emplace( + partition_it->second.get(), + Payload {std::make_unique(), &row_part_tablet_id, + std::vector()}); + payload_it = tmp_it; + payload_it->second.row_ids->reserve(row_cnt); + payload_it->second.route_idxs.reserve(row_cnt); + } + payload_it->second.row_ids->push_back(row_ids[i]); + payload_it->second.route_idxs.push_back(cast_set(i)); + continue; + } + // (tablet_id, VNodeChannel) where this tablet locate auto it = _channels[index_idx]->_channels_by_tablet.find(tablet_ids[i]); DCHECK(it != _channels[index_idx]->_channels_by_tablet.end()) @@ -2029,14 +2141,14 @@ void VTabletWriter::_generate_one_index_channel_payload( auto payload_it = channel_payload.find(locate_node.get()); // if (payload_it == channel_payload.end()) { auto [tmp_it, _] = channel_payload.emplace( - locate_node.get(), - Payload {std::make_unique(), std::vector()}); + locate_node.get(), Payload {std::make_unique(), + &row_part_tablet_id, std::vector()}); payload_it = tmp_it; - payload_it->second.first->reserve(row_cnt); - payload_it->second.second.reserve(row_cnt); + payload_it->second.row_ids->reserve(row_cnt); + payload_it->second.route_idxs.reserve(row_cnt); } - payload_it->second.first->push_back(row_ids[i]); - payload_it->second.second.push_back(tablet_ids[i]); + payload_it->second.row_ids->push_back(row_ids[i]); + payload_it->second.route_idxs.push_back(cast_set(i)); } } } diff --git a/be/src/exec/sink/writer/vtablet_writer.h b/be/src/exec/sink/writer/vtablet_writer.h index d3e6e8da0f1af0..790f06506a6eca 100644 --- a/be/src/exec/sink/writer/vtablet_writer.h +++ b/be/src/exec/sink/writer/vtablet_writer.h @@ -220,8 +220,11 @@ struct WriterStats { VNodeChannelStat channel_stat; }; -// pair -using Payload = std::pair, std::vector>; +struct Payload { + std::unique_ptr row_ids; + RowPartTabletIds* row_part_tablet_ids = nullptr; + std::vector route_idxs; +}; // every NodeChannel keeps a data transmission channel with one BE. for multiple times open, it has a dozen of requests and corresponding closures. class VNodeChannel { @@ -594,6 +597,8 @@ class IndexChannel { std::unordered_map> _node_channels; // from tablet_id to backend channel std::unordered_map>> _channels_by_tablet; + // from partition_id to FE-planned bucket owner channel in cloud receiver-side random bucket mode + std::unordered_map> _channels_by_partition; bool _has_inc_node = false; // lock to protect _failed_channels and _failed_channels_msgs diff --git a/be/src/exec/sink/writer/vtablet_writer_v2.cpp b/be/src/exec/sink/writer/vtablet_writer_v2.cpp index 17f41063c6a33d..31f1be6cd823c2 100644 --- a/be/src/exec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/exec/sink/writer/vtablet_writer_v2.cpp @@ -158,9 +158,10 @@ Status VTabletWriterV2::_init(RuntimeState* state, RuntimeProfile* profile) { _location = _pool->add(new OlapTableLocationParam(table_sink.location)); _nodes_info = _pool->add(new DorisNodesInfo(table_sink.nodes_info)); - // if distributed column list is empty, we can ensure that tablet is with random distribution info - // and if load_to_single_tablet is set and set to true, we should find only one tablet in one partition - // for the whole olap table sink + // If distributed column list is empty, the table uses random distribution. + // Mode priority (highest to lowest): + // 1. FIND_TABLET_EVERY_SINK: load_to_single_tablet=true (legacy single-tablet mode). + // 2. FIND_TABLET_EVERY_BATCH: default round-robin per batch. auto find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_ROW; if (table_sink.partition.distributed_columns.empty()) { if (table_sink.__isset.load_to_single_tablet && table_sink.load_to_single_tablet) { @@ -556,6 +557,7 @@ Status VTabletWriterV2::_write_memtable(std::shared_ptr block, int64_t ta .is_high_priority = _is_high_priority, .write_file_cache = _write_file_cache, .storage_vault_id {}, + .enable_table_memtable_backpressure = _tablet_finder->is_adaptive_random_bucket(), }; bool index_not_found = true; for (const auto& index : _schema->indexes()) { @@ -584,13 +586,15 @@ Status VTabletWriterV2::_write_memtable(std::shared_ptr block, int64_t ta { SCOPED_TIMER(_wait_mem_limit_timer); ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush( - [state = _state]() { return state->is_cancelled(); }); + [state = _state]() { return state->is_cancelled(); }, + _state->workload_group().get()); if (_state->is_cancelled()) { return _state->cancel_reason(); } } SCOPED_TIMER(_write_memtable_timer); - st = delta_writer->write(block.get(), rows.row_idxes); + bool memtable_flushed = false; + st = delta_writer->write(block.get(), rows.row_idxes, &memtable_flushed); return st; } diff --git a/be/src/load/channel/load_channel_mgr.cpp b/be/src/load/channel/load_channel_mgr.cpp index 65ec0ed5ec80af..a5f655b5fb67c0 100644 --- a/be/src/load/channel/load_channel_mgr.cpp +++ b/be/src/load/channel/load_channel_mgr.cpp @@ -166,7 +166,7 @@ Status LoadChannelMgr::add_batch(const PTabletWriterAddBlockRequest& request, // because this may block for a while, which may lead to rpc timeout. SCOPED_TIMER(channel->get_handle_mem_limit_timer()); ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush( - [channel]() { return channel->is_cancelled(); }); + [channel]() { return channel->is_cancelled(); }, channel->workload_group().get()); if (channel->is_cancelled()) { return Status::Cancelled("LoadChannel has been cancelled: {}.", load_id.to_string()); } diff --git a/be/src/load/channel/tablets_channel.cpp b/be/src/load/channel/tablets_channel.cpp index 9b730e8874eaa1..dcbbecd049487e 100644 --- a/be/src/load/channel/tablets_channel.cpp +++ b/be/src/load/channel/tablets_channel.cpp @@ -143,7 +143,6 @@ Status BaseTabletsChannel::open(const PTabletWriterOpenRequest& request) { _schema = std::make_shared(); RETURN_IF_ERROR(_schema->init(request.schema())); _tuple_desc = _schema->tuple_desc(); - int max_sender = request.num_senders(); /* * a tablets channel in reciever is related to a bulk of VNodeChannel of sender. each instance one or none. @@ -174,6 +173,7 @@ Status BaseTabletsChannel::open(const PTabletWriterOpenRequest& request) { _closed_senders.Reset(max_sender); RETURN_IF_ERROR(_open_all_writers(request)); + _init_receiver_side_random_bucket_state(request); _state = kOpened; return Status::OK(); @@ -241,6 +241,7 @@ Status BaseTabletsChannel::incremental_open(const PTabletWriterOpenRequest& para wrequest.table_schema_param = _schema; wrequest.txn_expiration = params.txn_expiration(); // Required by CLOUD. wrequest.storage_vault_id = params.storage_vault_id(); + wrequest.enable_table_memtable_backpressure = params.is_receiver_side_random_bucket(); auto delta_writer = create_delta_writer(wrequest); { @@ -254,11 +255,45 @@ Status BaseTabletsChannel::incremental_open(const PTabletWriterOpenRequest& para _s_tablet_writer_count += incremental_tablet_num; LOG(INFO) << ss.str(); + _init_receiver_side_random_bucket_state(params); _state = kOpened; return Status::OK(); } +void BaseTabletsChannel::_init_receiver_side_random_bucket_state( + const PTabletWriterOpenRequest& request) { + if (!request.is_receiver_side_random_bucket() || request.tablets().empty()) { + return; + } + if (_adaptive_random_bucket_state == nullptr) { + _adaptive_random_bucket_state = std::make_shared(_load_id); + } + _random_bucket_partition_params.clear(); + _random_bucket_partition_params.reserve(request.random_bucket_partitions_size()); + for (const auto& partition : request.random_bucket_partitions()) { + RandomBucketPartitionParam params; + params.ordered_tablet_ids.reserve(partition.ordered_tablet_ids_size()); + for (auto tablet_id : partition.ordered_tablet_ids()) { + params.ordered_tablet_ids.push_back(tablet_id); + } + _random_bucket_partition_params.emplace(partition.partition_id(), std::move(params)); + } + + for (const auto& [partition_id, params] : _random_bucket_partition_params) { + CHECK(!params.ordered_tablet_ids.empty()) + << "ordered_tablet_ids is empty, load_id=" << _load_id + << ", partition_id=" << partition_id; + std::vector ordered_positions; + ordered_positions.reserve(params.ordered_tablet_ids.size()); + for (size_t i = 0; i < params.ordered_tablet_ids.size(); ++i) { + ordered_positions.push_back(cast_set(i)); + } + _adaptive_random_bucket_state->init_partition(partition_id, params.ordered_tablet_ids, + ordered_positions, 0); + } +} + std::unique_ptr TabletsChannel::create_delta_writer(const WriteRequest& request) { return std::make_unique(_engine, request, _profile, _load_id); } @@ -525,6 +560,7 @@ Status BaseTabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& req .is_high_priority = _is_high_priority, .write_file_cache = request.write_file_cache(), .storage_vault_id = request.storage_vault_id(), + .enable_table_memtable_backpressure = request.is_receiver_side_random_bucket(), }; auto delta_writer = create_delta_writer(wrequest); @@ -578,9 +614,10 @@ Status BaseTabletsChannel::_write_block_data( [[maybe_unused]] size_t uncompressed_size = 0; [[maybe_unused]] int64_t uncompressed_time = 0; RETURN_IF_ERROR(send_data.deserialize(request.block(), &uncompressed_size, &uncompressed_time)); - CHECK(send_data.rows() == request.tablet_ids_size()) - << "block rows: " << send_data.rows() - << ", tablet_ids_size: " << request.tablet_ids_size(); + int request_rows = request.is_receiver_side_random_bucket() ? request.partition_ids_size() + : request.tablet_ids_size(); + CHECK(send_data.rows() == request_rows) + << "block rows: " << send_data.rows() << ", request_rows: " << request_rows; g_tablets_channel_send_data_allocated_size << send_data.allocated_bytes(); Defer defer { @@ -622,9 +659,14 @@ Status BaseTabletsChannel::_write_block_data( SCOPED_TIMER(_write_block_timer); auto* tablet_load_infos = response->mutable_tablet_load_rowset_num_infos(); for (const auto& tablet_to_rowidxs_it : tablet_to_rowidxs) { + bool memtable_flushed = false; RETURN_IF_ERROR(write_tablet_data(tablet_to_rowidxs_it.first, [&](BaseDeltaWriter* writer) { - return writer->write(&send_data, tablet_to_rowidxs_it.second); + return writer->write(&send_data, tablet_to_rowidxs_it.second, &memtable_flushed); })); + if (memtable_flushed && request.is_receiver_side_random_bucket()) { + CHECK(_adaptive_random_bucket_state != nullptr); + _adaptive_random_bucket_state->rotate_by_tablet(tablet_to_rowidxs_it.first); + } auto tablet_writer_it = _tablet_writers.find(tablet_to_rowidxs_it.first); if (tablet_writer_it != _tablet_writers.end()) { @@ -639,6 +681,134 @@ Status BaseTabletsChannel::_write_block_data( return Status::OK(); } +std::shared_ptr BaseTabletsChannel::_get_partition_route_lock(int64_t partition_id) { + std::lock_guard l(_partition_route_locks_lock); + auto& lock = _partition_route_locks[partition_id]; + if (lock == nullptr) { + lock = std::make_shared(); + } + return lock; +} + +Status BaseTabletsChannel::_write_block_data_for_receiver_side_random_bucket( + const PTabletWriterAddBlockRequest& request, int64_t cur_seq, + std::unordered_map>& partition_to_rowidxs, + PTabletWriterAddBlockResult* response) { + Block send_data; + [[maybe_unused]] size_t uncompressed_size = 0; + [[maybe_unused]] int64_t uncompressed_time = 0; + RETURN_IF_ERROR(send_data.deserialize(request.block(), &uncompressed_size, &uncompressed_time)); + CHECK(send_data.rows() == request.partition_ids_size()) + << "block rows: " << send_data.rows() + << ", partition_ids_size: " << request.partition_ids_size(); + + { + std::lock_guard l(_lock); + for (const auto& [partition_id, _] : partition_to_rowidxs) { + _partition_ids.emplace(partition_id); + } + } + + g_tablets_channel_send_data_allocated_size << send_data.allocated_bytes(); + Defer defer { + [&]() { g_tablets_channel_send_data_allocated_size << -send_data.allocated_bytes(); }}; + + auto* tablet_errors = response->mutable_tablet_errors(); + auto* tablet_load_infos = response->mutable_tablet_load_rowset_num_infos(); + + auto write_partition_data = [&](int64_t partition_id, + const DorisVector& row_idxs) -> Status { + auto partition_lock = _get_partition_route_lock(partition_id); + std::lock_guard partition_guard(*partition_lock); + + CHECK(_adaptive_random_bucket_state != nullptr); + int64_t tablet_id = _adaptive_random_bucket_state->current_tablet(partition_id); + CHECK(tablet_id >= 0) << "invalid current tablet, load_id=" << _load_id + << ", partition_id=" << partition_id; + LOG(INFO) << "FIND_TABLET_RANDOM_BUCKET: route+write begin" + << ", load_id=" << _load_id << ", index_id=" << _index_id + << ", sender_id=" << request.sender_id() + << ", packet_seq=" << request.packet_seq() << ", partition_id=" << partition_id + << ", tablet_id=" << tablet_id << ", row_count=" << row_idxs.size(); + + { + std::shared_lock broken_rlock(_broken_tablets_lock); + if (_is_broken_tablet(tablet_id)) { + LOG(INFO) << "FIND_TABLET_RANDOM_BUCKET: skip broken tablet" + << ", load_id=" << _load_id << ", index_id=" << _index_id + << ", sender_id=" << request.sender_id() + << ", packet_seq=" << request.packet_seq() + << ", partition_id=" << partition_id << ", tablet_id=" << tablet_id; + return Status::OK(); + } + } + + BaseDeltaWriter* tablet_writer = nullptr; + { + std::lock_guard l(_tablet_writers_lock); + auto tablet_writer_it = _tablet_writers.find(tablet_id); + if (tablet_writer_it == _tablet_writers.end()) { + return Status::InternalError("unknown tablet to append data, tablet={}", tablet_id); + } + tablet_writer = tablet_writer_it->second.get(); + } + + bool memtable_flushed = false; + Status st = tablet_writer->write(&send_data, row_idxs, &memtable_flushed); + if (!st.ok()) { + auto err_msg = + fmt::format("tablet writer write failed, tablet_id={}, txn_id={}, err={}", + tablet_id, _txn_id, st.to_string()); + LOG(WARNING) << err_msg; + PTabletError* error = tablet_errors->Add(); + error->set_tablet_id(tablet_id); + error->set_msg(err_msg); + static_cast(tablet_writer->cancel_with_status(st)); + _add_broken_tablet(tablet_id); + return Status::OK(); + } + + LOG(INFO) << "FIND_TABLET_RANDOM_BUCKET: route+write done" + << ", load_id=" << _load_id << ", index_id=" << _index_id + << ", sender_id=" << request.sender_id() + << ", packet_seq=" << request.packet_seq() << ", partition_id=" << partition_id + << ", tablet_id=" << tablet_id << ", row_count=" << row_idxs.size() + << ", memtable_flushed=" << memtable_flushed; + if (memtable_flushed) { + _adaptive_random_bucket_state->rotate_by_tablet(tablet_id); + } + tablet_writer->set_tablet_load_rowset_num_info(tablet_load_infos); + return Status::OK(); + }; + + SCOPED_TIMER(_write_block_timer); + for (const auto& [partition_id, row_idxs] : partition_to_rowidxs) { + RETURN_IF_ERROR(write_partition_data(partition_id, row_idxs)); + } + + { + std::lock_guard l(_lock); + _next_seqs[request.sender_id()] = cur_seq + 1; + } + return Status::OK(); +} + +void BaseTabletsChannel::_build_partition_to_rowidxs_for_receiver_side_random_bucket( + const PTabletWriterAddBlockRequest& request, + std::unordered_map>* partition_to_rowidxs) { + CHECK(_adaptive_random_bucket_state != nullptr); + CHECK(request.partition_ids_size() > 0); + for (uint32_t i = 0; i < request.partition_ids_size(); ++i) { + int64_t partition_id = request.partition_ids(i); + auto it = partition_to_rowidxs->find(partition_id); + if (it == partition_to_rowidxs->end()) { + partition_to_rowidxs->emplace(partition_id, std::initializer_list {i}); + } else { + it->second.emplace_back(i); + } + } +} + Status TabletsChannel::add_batch(const PTabletWriterAddBlockRequest& request, PTabletWriterAddBlockResult* response) { SCOPED_TIMER(_add_batch_timer); @@ -658,10 +828,17 @@ Status TabletsChannel::add_batch(const PTabletWriterAddBlockRequest& request, return Status::OK(); } + if (request.is_receiver_side_random_bucket()) { + std::unordered_map /* row index */> + partition_to_rowidxs; + _build_partition_to_rowidxs_for_receiver_side_random_bucket(request, &partition_to_rowidxs); + return _write_block_data_for_receiver_side_random_bucket(request, cur_seq, + partition_to_rowidxs, response); + } + std::unordered_map /* row index */> tablet_to_rowidxs; _build_tablet_to_rowidxs(request, &tablet_to_rowidxs); - return _write_block_data(request, cur_seq, tablet_to_rowidxs, response); } diff --git a/be/src/load/channel/tablets_channel.h b/be/src/load/channel/tablets_channel.h index 289e8d48deabdc..45022b68662608 100644 --- a/be/src/load/channel/tablets_channel.h +++ b/be/src/load/channel/tablets_channel.h @@ -31,6 +31,7 @@ #include "common/status.h" #include "core/custom_allocator.h" +#include "exec/sink/vtablet_finder.h" #include "runtime/runtime_profile.h" #include "util/bitmap.h" #include "util/uid_util.h" @@ -121,9 +122,22 @@ class BaseTabletsChannel { bool is_finished() const { return _state == kFinished; } protected: + struct RandomBucketPartitionParam { + std::vector ordered_tablet_ids; + }; + + void _init_receiver_side_random_bucket_state(const PTabletWriterOpenRequest& request); Status _write_block_data(const PTabletWriterAddBlockRequest& request, int64_t cur_seq, std::unordered_map>& tablet_to_rowidxs, PTabletWriterAddBlockResult* response); + Status _write_block_data_for_receiver_side_random_bucket( + const PTabletWriterAddBlockRequest& request, int64_t cur_seq, + std::unordered_map>& partition_to_rowidxs, + PTabletWriterAddBlockResult* response); + void _build_partition_to_rowidxs_for_receiver_side_random_bucket( + const PTabletWriterAddBlockRequest& request, + std::unordered_map>* partition_to_rowidxs); + std::shared_ptr _get_partition_route_lock(int64_t partition_id); Status _get_current_seq(int64_t& cur_seq, const PTabletWriterAddBlockRequest& request); @@ -185,6 +199,10 @@ class BaseTabletsChannel { std::unordered_set _reducing_tablets; std::unordered_set _partition_ids; + std::unordered_map _random_bucket_partition_params; + std::shared_ptr _adaptive_random_bucket_state; + std::mutex _partition_route_locks_lock; + std::unordered_map> _partition_route_locks; static std::atomic _s_tablet_writer_count; diff --git a/be/src/load/delta_writer/delta_writer.cpp b/be/src/load/delta_writer/delta_writer.cpp index 2fd053f765ebe9..3b408793061d96 100644 --- a/be/src/load/delta_writer/delta_writer.cpp +++ b/be/src/load/delta_writer/delta_writer.cpp @@ -36,6 +36,7 @@ #include "core/block/block.h" #include "io/fs/file_writer.h" // IWYU pragma: keep #include "load/memtable/memtable_flush_executor.h" +#include "load/memtable/memtable_memory_limiter.h" #include "runtime/exec_env.h" #include "runtime/thread_context.h" #include "service/backend_options.h" @@ -123,6 +124,11 @@ void BaseDeltaWriter::set_tablet_load_rowset_num_info( collect_tablet_load_rowset_num_info(tablet, tablet_infos); } +int64_t BaseDeltaWriter::table_id() const { + DORIS_CHECK(_req.table_schema_param != nullptr); + return _req.table_schema_param->table_id(); +} + DeltaWriter::~DeltaWriter() = default; Status BaseDeltaWriter::init() { @@ -143,10 +149,18 @@ Status BaseDeltaWriter::init() { return Status::OK(); } -Status DeltaWriter::write(const Block* block, const DorisVector& row_idxs) { +Status DeltaWriter::write(const Block* block, const DorisVector& row_idxs, + bool* memtable_flushed) { + if (memtable_flushed != nullptr) { + *memtable_flushed = false; + } if (UNLIKELY(row_idxs.empty())) { return Status::OK(); } + if (_req.enable_table_memtable_backpressure) { + ExecEnv::GetInstance()->memtable_memory_limiter()->handle_table_memtable_backpressure( + nullptr, table_id()); + } _lock_watch.start(); std::lock_guard l(_lock); _lock_watch.stop(); @@ -160,13 +174,24 @@ Status DeltaWriter::write(const Block* block, const DorisVector& row_i std::this_thread::sleep_for(std::chrono::milliseconds(10)); } } - return _memtable_writer->write(block, row_idxs); + return _memtable_writer->write(block, row_idxs, memtable_flushed); } Status BaseDeltaWriter::wait_flush() { return _memtable_writer->wait_flush(); } +Status BaseDeltaWriter::flush_memtable_async() { + return _memtable_writer->flush_async(); +} + +Status DeltaWriter::flush_memtable_async() { + _lock_watch.start(); + std::lock_guard l(_lock); + _lock_watch.stop(); + return BaseDeltaWriter::flush_memtable_async(); +} + Status DeltaWriter::close() { _lock_watch.start(); std::lock_guard l(_lock); diff --git a/be/src/load/delta_writer/delta_writer.h b/be/src/load/delta_writer/delta_writer.h index ab9715b74b4d52..826231cd29222c 100644 --- a/be/src/load/delta_writer/delta_writer.h +++ b/be/src/load/delta_writer/delta_writer.h @@ -61,7 +61,8 @@ class BaseDeltaWriter { virtual ~BaseDeltaWriter(); - virtual Status write(const Block* block, const DorisVector& row_idxs) = 0; + virtual Status write(const Block* block, const DorisVector& row_idxs, + bool* memtable_flushed = nullptr) = 0; // flush the last memtable to flush queue, must call it before build_rowset() virtual Status close() = 0; @@ -81,8 +82,12 @@ class BaseDeltaWriter { // Wait all memtable in flush queue to be flushed Status wait_flush(); + virtual Status flush_memtable_async(); + int64_t partition_id() const { return _req.partition_id; } + int64_t table_id() const; + int64_t tablet_id() const { return _req.tablet_id; } int64_t txn_id() const { return _req.txn_id; } @@ -127,10 +132,13 @@ class DeltaWriter final : public BaseDeltaWriter { ~DeltaWriter() override; - Status write(const Block* block, const DorisVector& row_idxs) override; + Status write(const Block* block, const DorisVector& row_idxs, + bool* memtable_flushed = nullptr) override; Status close() override; + Status flush_memtable_async() override; + Status cancel_with_status(const Status& st) override; Status build_rowset() override; diff --git a/be/src/load/delta_writer/delta_writer_context.h b/be/src/load/delta_writer/delta_writer_context.h index 21d436c86358a1..218a8b0a2f1391 100644 --- a/be/src/load/delta_writer/delta_writer_context.h +++ b/be/src/load/delta_writer/delta_writer_context.h @@ -51,6 +51,7 @@ struct WriteRequest { bool write_file_cache = false; WriteRequestType write_req_type = WriteRequestType::DATA; std::string storage_vault_id; + bool enable_table_memtable_backpressure = false; }; struct GroupWriteRequest : public WriteRequest { diff --git a/be/src/load/delta_writer/delta_writer_v2.cpp b/be/src/load/delta_writer/delta_writer_v2.cpp index 78271f2a48202e..60e8019564e624 100644 --- a/be/src/load/delta_writer/delta_writer_v2.cpp +++ b/be/src/load/delta_writer/delta_writer_v2.cpp @@ -35,6 +35,7 @@ #include "core/block/block.h" #include "exec/sink/load_stream_stub.h" #include "io/fs/file_writer.h" // IWYU pragma: keep +#include "load/memtable/memtable_memory_limiter.h" #include "runtime/exec_env.h" #include "runtime/query_context.h" #include "service/backend_options.h" @@ -91,6 +92,11 @@ DeltaWriterV2::~DeltaWriterV2() { static_cast(_memtable_writer->cancel()); } +int64_t DeltaWriterV2::_table_id() const { + DORIS_CHECK(_req.table_schema_param != nullptr); + return _req.table_schema_param->table_id(); +} + Status DeltaWriterV2::init() { if (_is_init) { return Status::OK(); @@ -138,10 +144,18 @@ Status DeltaWriterV2::init() { return Status::OK(); } -Status DeltaWriterV2::write(const Block* block, const DorisVector& row_idxs) { +Status DeltaWriterV2::write(const Block* block, const DorisVector& row_idxs, + bool* memtable_flushed) { + if (memtable_flushed != nullptr) { + *memtable_flushed = false; + } if (UNLIKELY(row_idxs.empty())) { return Status::OK(); } + if (_req.enable_table_memtable_backpressure) { + ExecEnv::GetInstance()->memtable_memory_limiter()->handle_table_memtable_backpressure( + [state = _state]() { return state->is_cancelled(); }, _table_id()); + } _lock_watch.start(); std::lock_guard l(_lock); _lock_watch.stop(); @@ -161,7 +175,7 @@ Status DeltaWriterV2::write(const Block* block, const DorisVector& row } } SCOPED_RAW_TIMER(&_write_memtable_time); - return _memtable_writer->write(block, row_idxs); + return _memtable_writer->write(block, row_idxs, memtable_flushed); } Status DeltaWriterV2::close() { diff --git a/be/src/load/delta_writer/delta_writer_v2.h b/be/src/load/delta_writer/delta_writer_v2.h index d637c6afcbc2d1..1c284d4a418b4e 100644 --- a/be/src/load/delta_writer/delta_writer_v2.h +++ b/be/src/load/delta_writer/delta_writer_v2.h @@ -68,7 +68,8 @@ class DeltaWriterV2 { Status init(); - Status write(const Block* block, const DorisVector& row_idxs); + Status write(const Block* block, const DorisVector& row_idxs, + bool* memtable_flushed = nullptr); // flush the last memtable to flush queue, must call it before close_wait() Status close(); @@ -85,6 +86,7 @@ class DeltaWriterV2 { Status _build_current_tablet_schema(int64_t index_id, const OlapTableSchemaParam* table_schema_param, const TabletSchema& ori_tablet_schema); + int64_t _table_id() const; void _update_profile(RuntimeProfile* profile); diff --git a/be/src/load/memtable/memtable_memory_limiter.cpp b/be/src/load/memtable/memtable_memory_limiter.cpp index 8b44e04a2e6622..f98aea05f71937 100644 --- a/be/src/load/memtable/memtable_memory_limiter.cpp +++ b/be/src/load/memtable/memtable_memory_limiter.cpp @@ -24,7 +24,6 @@ #include "common/metrics/metrics.h" #include "load/memtable/memtable.h" #include "load/memtable/memtable_writer.h" -#include "runtime/workload_group/workload_group_manager.h" #include "util/mem_info.h" namespace doris { @@ -34,6 +33,12 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(memtable_memory_limiter_mem_consumption, Metr bvar::LatencyRecorder g_memtable_memory_limit_latency_ms("mm_limiter_limit_time_ms"); bvar::Adder g_memtable_memory_limit_waiting_threads("mm_limiter_waiting_threads"); +bvar::LatencyRecorder g_memtable_table_backpressure_latency_ms( + "mm_limiter_table_backpressure_time_ms"); +bvar::Adder g_memtable_table_backpressure_waiting_threads( + "mm_limiter_table_backpressure_waiting_threads"); +bvar::Status g_memtable_table_backpressure_pending_count( + "mm_limiter_table_backpressure_pending_count", 0); bvar::Status g_memtable_active_memory("mm_limiter_mem_active", 0); bvar::Status g_memtable_write_memory("mm_limiter_mem_write", 0); bvar::Status g_memtable_flush_memory("mm_limiter_mem_flush", 0); @@ -121,7 +126,69 @@ int64_t MemTableMemoryLimiter::_need_flush() { return need_flush - _queue_mem_usage - _flush_mem_usage; } -void MemTableMemoryLimiter::handle_memtable_flush(std::function cancel_check) { +int64_t MemTableMemoryLimiter::_table_flush_pending_memtable_count(int64_t table_id) { + int64_t pending_memtables = 0; + for (const auto& writer : _writers) { + auto writer_sptr = writer.lock(); + if (writer_sptr == nullptr) { + continue; + } + if (writer_sptr->table_id() == table_id) { + pending_memtables += writer_sptr->flush_pending_memtable_count(); + } + } + return pending_memtables; +} + +void MemTableMemoryLimiter::handle_table_memtable_backpressure(std::function cancel_check, + int64_t table_id) { + if (!config::enable_table_memtable_flush_backpressure) { + return; + } + const int64_t pending_count_limit = config::table_memtable_flush_pending_count_limit; + if (pending_count_limit <= 0) { + return; + } + DORIS_CHECK(table_id > 0); + + std::unique_lock l(_lock); + int64_t pending_count = _table_flush_pending_memtable_count(table_id); + if (pending_count < pending_count_limit) { + return; + } + + MonotonicStopWatch timer; + timer.start(); + g_memtable_table_backpressure_waiting_threads << 1; + while (pending_count >= pending_count_limit) { + g_memtable_table_backpressure_pending_count.set_value(pending_count); + LOG_EVERY_T(INFO, 1) << "table memtable flush backpressure: table_id=" << table_id + << ", pending_memtables=" << pending_count + << ", limit=" << pending_count_limit + << ", memtable writers num: " << _writers.size(); + if (cancel_check && cancel_check()) { + LOG(INFO) << "cancelled when waiting for table memtable flush backpressure" + << ", table_id=" << table_id << ", pending_memtables=" << pending_count + << ", limit=" << pending_count_limit; + g_memtable_table_backpressure_waiting_threads << -1; + return; + } + static_cast(_hard_limit_end_cond.wait_for(l, std::chrono::milliseconds(100))); + pending_count = _table_flush_pending_memtable_count(table_id); + } + g_memtable_table_backpressure_pending_count.set_value(pending_count); + g_memtable_table_backpressure_waiting_threads << -1; + timer.stop(); + int64_t time_ms = timer.elapsed_time() / 1000 / 1000; + g_memtable_table_backpressure_latency_ms << time_ms; + LOG(INFO) << "waited " << PrettyPrinter::print(timer.elapsed_time(), TUnit::TIME_NS) + << " for table memtable flush backpressure" + << ", table_id=" << table_id << ", pending_memtables=" << pending_count + << ", limit=" << pending_count_limit << ", memtable writers num: " << _writers.size(); +} + +void MemTableMemoryLimiter::handle_memtable_flush(std::function cancel_check, + WorkloadGroup*) { // Check the soft limit. DCHECK(_load_soft_mem_limit > 0); do { diff --git a/be/src/load/memtable/memtable_memory_limiter.h b/be/src/load/memtable/memtable_memory_limiter.h index 6fbcc3cf8c8073..11c5c0fdd61413 100644 --- a/be/src/load/memtable/memtable_memory_limiter.h +++ b/be/src/load/memtable/memtable_memory_limiter.h @@ -44,7 +44,8 @@ class MemTableMemoryLimiter { // If yes, it will flush memtable to try to reduce memory consumption. // Every write operation will call this API to check if need flush memtable OR hang // when memory is not available. - void handle_memtable_flush(std::function cancel_check); + void handle_memtable_flush(std::function cancel_check, WorkloadGroup* wg = nullptr); + void handle_table_memtable_backpressure(std::function cancel_check, int64_t table_id); void register_writer(std::weak_ptr writer); @@ -64,6 +65,7 @@ class MemTableMemoryLimiter { bool _hard_limit_reached(); bool _load_usage_low(); int64_t _need_flush(); + int64_t _table_flush_pending_memtable_count(int64_t table_id); int64_t _flush_active_memtables(int64_t need_flush); void _refresh_mem_tracker(); std::mutex _lock; diff --git a/be/src/load/memtable/memtable_writer.cpp b/be/src/load/memtable/memtable_writer.cpp index 3179939a1d6061..7904f70addfad9 100644 --- a/be/src/load/memtable/memtable_writer.cpp +++ b/be/src/load/memtable/memtable_writer.cpp @@ -86,7 +86,11 @@ Status MemTableWriter::init(std::shared_ptr rowset_writer, return Status::OK(); } -Status MemTableWriter::write(const Block* block, const DorisVector& row_idxs) { +Status MemTableWriter::write(const Block* block, const DorisVector& row_idxs, + bool* memtable_flushed) { + if (memtable_flushed != nullptr) { + *memtable_flushed = false; + } if (UNLIKELY(row_idxs.empty())) { return Status::OK(); } @@ -138,6 +142,9 @@ Status MemTableWriter::write(const Block* block, const DorisVector& ro } if (UNLIKELY(_mem_table->need_flush())) { RETURN_IF_ERROR(_flush_memtable()); + if (memtable_flushed != nullptr) { + *memtable_flushed = true; + } } return Status::OK(); @@ -171,7 +178,6 @@ Status MemTableWriter::flush_async() { // 1. call by local, from `VTabletWriterV2::_write_memtable`. // 2. call by remote, from `LoadChannelMgr::_get_load_channel`. // 3. call by daemon thread, from `handle_paused_queries` -> `flush_workload_group_memtables`. - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_resource_ctx->memory_context()->mem_tracker()); if (!_is_init || _is_closed) { // This writer is uninitialized or closed before flushing, do nothing. // We return OK instead of NOT_INITIALIZED or ALREADY_CLOSED. @@ -184,6 +190,9 @@ Status MemTableWriter::flush_async() { return _cancel_status; } + DCHECK(_resource_ctx != nullptr); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_resource_ctx->memory_context()->mem_tracker()); + VLOG_NOTICE << "flush memtable to reduce mem consumption. memtable size: " << PrettyPrinter::print_bytes(_mem_table->memory_usage()) << ", tablet: " << _req.tablet_id << ", load id: " << print_id(_req.load_id); @@ -358,6 +367,27 @@ uint64_t MemTableWriter::flush_running_count() const { return _flush_token == nullptr ? 0 : _flush_token->get_stats().flush_running_count.load(); } +int64_t MemTableWriter::table_id() const { + DORIS_CHECK(_req.table_schema_param != nullptr); + return _req.table_schema_param->table_id(); +} + +int64_t MemTableWriter::flush_pending_memtable_count() { + std::lock_guard l(_mem_table_ptr_lock); + int64_t memtable_count = 0; + for (const auto& mem_table : _freezed_mem_tables) { + auto mem_table_sptr = mem_table.lock(); + if (mem_table_sptr == nullptr) { + continue; + } + auto mem_type = mem_table_sptr->get_mem_type(); + if (mem_type == MemType::WRITE_FINISHED || mem_type == MemType::FLUSH) { + memtable_count++; + } + } + return memtable_count; +} + int64_t MemTableWriter::mem_consumption(MemType mem) { if (!_is_init) { // This method may be called before this writer is initialized. diff --git a/be/src/load/memtable/memtable_writer.h b/be/src/load/memtable/memtable_writer.h index 0c3a03926dc70f..30f03bc65f0976 100644 --- a/be/src/load/memtable/memtable_writer.h +++ b/be/src/load/memtable/memtable_writer.h @@ -62,7 +62,8 @@ class MemTableWriter { std::shared_ptr partial_update_info, std::shared_ptr wg_sptr, bool unique_key_mow = false); - Status write(const Block* block, const DorisVector& row_idxs); + Status write(const Block* block, const DorisVector& row_idxs, + bool* memtable_flushed = nullptr); // flush the last memtable to flush queue, must call it before close_wait() Status close(); @@ -83,6 +84,7 @@ class MemTableWriter { int64_t mem_consumption(MemType mem); int64_t active_memtable_mem_consumption(); + int64_t flush_pending_memtable_count(); // Submit current memtable to flush queue, and return without waiting. // This is currently for reducing mem consumption of this memtable writer. @@ -93,6 +95,8 @@ class MemTableWriter { int64_t tablet_id() const { return _req.tablet_id; } + int64_t table_id() const; + int64_t total_received_rows() const { return _total_received_rows; } const FlushStatistic& get_flush_token_stats(); diff --git a/be/src/storage/tablet_info.cpp b/be/src/storage/tablet_info.cpp index 34abb1136e8e1d..5865faf062f6fa 100644 --- a/be/src/storage/tablet_info.cpp +++ b/be/src/storage/tablet_info.cpp @@ -700,10 +700,15 @@ Status VOlapTablePartitionParam::generate_partition_from(const TOlapTablePartiti part_result = _obj_pool.add(new VOlapTablePartition(&_partition_block)); part_result->id = t_part.id; part_result->is_mutable = t_part.is_mutable; - // only load_to_single_tablet = true will set load_tablet_idx if (t_part.__isset.load_tablet_idx) { part_result->load_tablet_idx = t_part.load_tablet_idx; } + if (t_part.__isset.bucket_be_id) { + part_result->bucket_be_id = t_part.bucket_be_id; + } + if (t_part.__isset.local_bucket_seqs) { + part_result->local_bucket_seqs = t_part.local_bucket_seqs; + } if (_is_in_partition) { for (const auto& keys : t_part.in_keys) { @@ -790,6 +795,15 @@ Status VOlapTablePartitionParam::add_partitions( } part->num_buckets = t_part.num_buckets; + if (t_part.__isset.load_tablet_idx) { + part->load_tablet_idx = t_part.load_tablet_idx; + } + if (t_part.__isset.bucket_be_id) { + part->bucket_be_id = t_part.bucket_be_id; + } + if (t_part.__isset.local_bucket_seqs) { + part->local_bucket_seqs = t_part.local_bucket_seqs; + } auto num_indexes = _schema->indexes().size(); if (t_part.indexes.size() != num_indexes) { return Status::InternalError( @@ -862,6 +876,15 @@ Status VOlapTablePartitionParam::replace_partitions( } part->num_buckets = t_part.num_buckets; + if (t_part.__isset.load_tablet_idx) { + part->load_tablet_idx = t_part.load_tablet_idx; + } + if (t_part.__isset.bucket_be_id) { + part->bucket_be_id = t_part.bucket_be_id; + } + if (t_part.__isset.local_bucket_seqs) { + part->local_bucket_seqs = t_part.local_bucket_seqs; + } auto num_indexes = _schema->indexes().size(); if (t_part.indexes.size() != num_indexes) { return Status::InternalError( diff --git a/be/src/storage/tablet_info.h b/be/src/storage/tablet_info.h index 0982df56b8c526..079acc5bcb34f2 100644 --- a/be/src/storage/tablet_info.h +++ b/be/src/storage/tablet_info.h @@ -164,6 +164,13 @@ struct VOlapTablePartition { bool is_mutable; // -1 indicates partition with hash distribution int64_t load_tablet_idx = -1; + // FE-selected bucket owner BE for adaptive random bucket mode. -1 means use the current + // execution BE, which preserves the legacy BE-side calculation behaviour. + int64_t bucket_be_id = -1; + // Bucket indices (0-based) used by FIND_TABLET_RANDOM_BUCKET rotation. FE may preselect and + // send them; otherwise BE computes them from tablet location info and bucket_be_id/current BE. + // Empty means fallback to the fixed load_tablet_idx bucket. + std::vector local_bucket_seqs; int total_replica_num = 0; int load_required_replica_num = 0; // tablet_id -> set of backend_ids that have version gaps diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 63cec327c207b8..ec2e69a0482b1b 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -3678,4 +3678,15 @@ public static int metaServiceRpcRetryTimes() { + "obtaining partition version information when calculating the delete bitmap. Enabled " + "by default."}) public static boolean calc_delete_bitmap_get_versions_waiting_for_pending_txns = true; + + @ConfField(mutable = true, masterOnly = true, description = { + "Whether to enable adaptive random bucket load. When enabled, each BE computes its own local " + + "bucket set (buckets whose primary replica it hosts) from the tablet location info " + + "sent by FE, and rotates across those buckets once per-tablet write volume exceeds " + + "the threshold (default 200 MB). This reduces import memory pressure and improves " + + "throughput for random-distribution tables. Covers all load types uniformly.", + "是否启用自适应随机桶导入。开启后每个 BE 根据 FE 下发的 tablet 位置信息自行计算本地桶集合" + + "(持有主副本的桶),并在单个 tablet 写入量超过阈值(默认 200 MB)后在本地桶之间轮转。" + + "可降低导入内存压力并提升随机分桶表的吞吐量,覆盖所有导入类型。"}) + public static boolean enable_adaptive_random_bucket_load = true; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java index dc845d4451ba94..25ef0987c8c717 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -98,6 +98,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -175,6 +176,12 @@ public void init(TUniqueId loadId, long txnId, long dbId, long loadChannelTimeou "if load_to_single_tablet set to true," + " the olap table must be with random distribution"); } tSink.setLoadToSingleTablet(loadToSingleTablet); + // Signal BE to use FIND_TABLET_RANDOM_BUCKET from the start so that auto-partition + // tables whose initial partition list is empty still enter the correct mode. + if (Config.isCloudMode() && Config.enable_adaptive_random_bucket_load + && dstTable.getDefaultDistributionInfo() instanceof RandomDistributionInfo) { + tSink.setEnableAdaptiveRandomBucket(true); + } tSink.setTxnTimeoutS(txnExpirationS); String vaultId = dstTable.getStorageVaultId(); if (vaultId != null && !vaultId.isEmpty()) { @@ -555,6 +562,297 @@ private TOlapTablePartitionParam createDummyPartition(long dbId, OlapTable table return partitionParam; } + public static final class AdaptiveBucketAssignment { + private final long bucketBeId; + private final int loadTabletIdx; + private final List localBucketSeqs; + + public AdaptiveBucketAssignment(long bucketBeId, int loadTabletIdx, List localBucketSeqs) { + this.bucketBeId = bucketBeId; + this.loadTabletIdx = loadTabletIdx; + this.localBucketSeqs = new ArrayList<>(localBucketSeqs); + } + + public long getBucketBeId() { + return bucketBeId; + } + + public int getLoadTabletIdx() { + return loadTabletIdx; + } + + public List getLocalBucketSeqs() { + return localBucketSeqs; + } + } + + public static boolean shouldAssignAdaptiveRandomBucket(TOlapTableSink sink) { + return sink != null + && sink.isSetEnableAdaptiveRandomBucket() + && sink.isEnableAdaptiveRandomBucket() + && sink.isSetPartition() + && sink.getPartition() != null + && (!sink.getPartition().isSetDistributedColumns() + || sink.getPartition().getDistributedColumns().isEmpty()); + } + + public static Map> computeAdaptiveRandomBucketAssignments( + List sinkBackendIds, List partitions, + List tabletLocations, int planFragmentNum) { + Map> assignments = new HashMap<>(); + List orderedSinkBackendIds = sinkBackendIds.stream() + .distinct() + .sorted() + .collect(Collectors.toList()); + for (Long sinkBackendId : orderedSinkBackendIds) { + assignments.put(sinkBackendId, new HashMap<>()); + } + if (orderedSinkBackendIds.isEmpty() || partitions == null || tabletLocations == null) { + return assignments; + } + + Map tabletLocationMap = new HashMap<>(tabletLocations.size()); + for (TTabletLocation tabletLocation : tabletLocations) { + tabletLocationMap.put(tabletLocation.getTabletId(), tabletLocation); + } + + for (TOlapTablePartition partition : partitions) { + if (!partition.isSetLoadTabletIdx() || partition.getNumBuckets() <= 0 + || partition.getIndexes().isEmpty()) { + continue; + } + Map> beToBucketSeqs = buildBeToBucketSeqs(partition, tabletLocationMap); + long baseTabletIndex = partition.getLoadTabletIdx(); + int fallbackBucketIdx = (int) Math.floorMod(baseTabletIndex, (long) partition.getNumBuckets()); + int targetBucketNum = Math.min( + Math.min(orderedSinkBackendIds.size(), partition.getNumBuckets()), + Math.max(planFragmentNum, 1)); + if (targetBucketNum <= 0) { + continue; + } + + List rotatedSinkBackendIds = rotateSinkBackendIds(orderedSinkBackendIds, baseTabletIndex); + Map bucketToOwnerBe = buildBucketToOwnerBe(beToBucketSeqs); + List selectedBucketSeqs = selectAdaptiveBucketSeqs(rotatedSinkBackendIds, + beToBucketSeqs, bucketToOwnerBe, baseTabletIndex, partition.getNumBuckets(), targetBucketNum); + if (selectedBucketSeqs.isEmpty()) { + selectedBucketSeqs = Collections.singletonList(fallbackBucketIdx); + } + if (selectedBucketSeqs.size() != targetBucketNum) { + LOG.warn("Adaptive random bucket selected {} buckets instead of target {} for partition {}, " + + "sinkBackendIds={}, beToBucketSeqs={}, selectedBucketSeqs={}, fallbackBucketIdx={}", + selectedBucketSeqs.size(), targetBucketNum, partition.getId(), orderedSinkBackendIds, + beToBucketSeqs, selectedBucketSeqs, fallbackBucketIdx); + } + + Map> openedBeToBucketSeqs = buildBeToBucketSeqs(bucketToOwnerBe, selectedBucketSeqs); + Map bucketUseCounts = new HashMap<>(); + Map sinkAssignments = LOG.isInfoEnabled() ? new HashMap<>() : null; + for (Long sinkBackendId : rotatedSinkBackendIds) { + int bucketSeq = selectLeastUsedBucketSeq( + openedBeToBucketSeqs.get(sinkBackendId), bucketUseCounts, baseTabletIndex); + if (bucketSeq < 0) { + bucketSeq = selectLeastUsedBucketSeq(selectedBucketSeqs, bucketUseCounts, baseTabletIndex); + } + if (bucketSeq < 0) { + bucketSeq = fallbackBucketIdx; + } + long bucketBeId = bucketToOwnerBe.getOrDefault(bucketSeq, -1L); + if (bucketBeId <= 0) { + LOG.warn("Adaptive random bucket falls back to bucket {} without owner BE for partition {}, " + + "sinkBackendId={}, selectedBucketSeqs={}, beToBucketSeqs={}", + bucketSeq, partition.getId(), sinkBackendId, selectedBucketSeqs, beToBucketSeqs); + } + List localBucketSeqs = rotateBucketSeqsForStartBucket( + beToBucketSeqs.get(bucketBeId), bucketSeq); + assignments.get(sinkBackendId).put(partition.getId(), + new AdaptiveBucketAssignment(bucketBeId, bucketSeq, localBucketSeqs)); + bucketUseCounts.merge(bucketSeq, 1, Integer::sum); + if (sinkAssignments != null) { + sinkAssignments.put(sinkBackendId, + "bucket=" + bucketSeq + ",bucketBeId=" + bucketBeId + + ",localBucketSeqs=" + localBucketSeqs); + } + } + if (sinkAssignments != null) { + LOG.info("Adaptive random bucket plan partition={}, baseTabletIndex={}, targetBucketNum={}, " + + "sinkBackendIds={}, rotatedSinkBackendIds={}, beToBucketSeqs={}, " + + "selectedBucketSeqs={}, openedBeToBucketSeqs={}, sinkAssignments={}", + partition.getId(), baseTabletIndex, targetBucketNum, orderedSinkBackendIds, + rotatedSinkBackendIds, beToBucketSeqs, selectedBucketSeqs, openedBeToBucketSeqs, + sinkAssignments); + } + } + return assignments; + } + + public static void applyAdaptiveRandomBucketAssignments(List partitions, + Map partitionAssignments) { + if (partitions == null || partitionAssignments == null || partitionAssignments.isEmpty()) { + return; + } + for (TOlapTablePartition partition : partitions) { + AdaptiveBucketAssignment assignment = partitionAssignments.get(partition.getId()); + if (assignment == null) { + continue; + } + partition.setLoadTabletIdx(assignment.getLoadTabletIdx()); + if (assignment.getBucketBeId() > 0) { + partition.setBucketBeId(assignment.getBucketBeId()); + } else if (partition.isSetBucketBeId()) { + partition.unsetBucketBeId(); + } + if (!assignment.getLocalBucketSeqs().isEmpty()) { + partition.setLocalBucketSeqs(new ArrayList<>(assignment.getLocalBucketSeqs())); + } else if (partition.isSetLocalBucketSeqs()) { + partition.unsetLocalBucketSeqs(); + } + if (LOG.isInfoEnabled()) { + LOG.info("Adaptive random bucket apply partition={}, bucketBeId={}, loadTabletIdx={}, " + + "localBucketSeqs={}", + partition.getId(), assignment.getBucketBeId(), assignment.getLoadTabletIdx(), + assignment.getLocalBucketSeqs()); + } + } + } + + private static Map> buildBeToBucketSeqs(TOlapTablePartition partition, + Map tabletLocationMap) { + Map> beToBucketSeqs = new HashMap<>(); + List tablets = partition.getIndexes().get(0).getTablets(); + for (int bucketSeq = 0; bucketSeq < tablets.size(); bucketSeq++) { + TTabletLocation tabletLocation = tabletLocationMap.get(tablets.get(bucketSeq)); + if (tabletLocation == null || tabletLocation.getNodeIds() == null + || tabletLocation.getNodeIds().isEmpty()) { + continue; + } + List sortedNodeIds = new ArrayList<>(tabletLocation.getNodeIds()); + Collections.sort(sortedNodeIds); + long bucketBeId = sortedNodeIds.get(bucketSeq % sortedNodeIds.size()); + beToBucketSeqs.computeIfAbsent(bucketBeId, ignored -> new ArrayList<>()).add(bucketSeq); + } + return beToBucketSeqs; + } + + private static List rotateSinkBackendIds(List sinkBackendIds, long baseTabletIndex) { + if (sinkBackendIds.isEmpty()) { + return Collections.emptyList(); + } + int startIdx = (int) Math.floorMod(baseTabletIndex, (long) sinkBackendIds.size()); + List rotatedSinkBackendIds = new ArrayList<>(sinkBackendIds.size()); + for (int offset = 0; offset < sinkBackendIds.size(); offset++) { + rotatedSinkBackendIds.add(sinkBackendIds.get((startIdx + offset) % sinkBackendIds.size())); + } + return rotatedSinkBackendIds; + } + + private static Map buildBucketToOwnerBe(Map> beToBucketSeqs) { + Map bucketToOwnerBe = new HashMap<>(); + for (Map.Entry> entry : beToBucketSeqs.entrySet()) { + for (Integer bucketSeq : entry.getValue()) { + bucketToOwnerBe.put(bucketSeq, entry.getKey()); + } + } + return bucketToOwnerBe; + } + + private static List selectAdaptiveBucketSeqs(List rotatedSinkBackendIds, + Map> beToBucketSeqs, Map bucketToOwnerBe, + long baseTabletIndex, int numBuckets, int targetBucketNum) { + List selectedBucketSeqs = new ArrayList<>(targetBucketNum); + HashSet selectedBucketSet = new HashSet<>(targetBucketNum); + for (Long sinkBackendId : rotatedSinkBackendIds) { + int bucketSeq = selectBucketSeq(beToBucketSeqs.get(sinkBackendId), baseTabletIndex); + if (bucketSeq >= 0 && selectedBucketSet.add(bucketSeq)) { + selectedBucketSeqs.add(bucketSeq); + if (selectedBucketSeqs.size() >= targetBucketNum) { + return selectedBucketSeqs; + } + } + } + + HashSet sinkBackendIdSet = new HashSet<>(rotatedSinkBackendIds); + appendRemainingBucketSeqs(selectedBucketSeqs, selectedBucketSet, bucketToOwnerBe, sinkBackendIdSet, + baseTabletIndex, numBuckets, targetBucketNum, true); + appendRemainingBucketSeqs(selectedBucketSeqs, selectedBucketSet, bucketToOwnerBe, sinkBackendIdSet, + baseTabletIndex, numBuckets, targetBucketNum, false); + return selectedBucketSeqs; + } + + private static void appendRemainingBucketSeqs(List selectedBucketSeqs, HashSet selectedBucketSet, + Map bucketToOwnerBe, HashSet sinkBackendIdSet, long baseTabletIndex, + int numBuckets, int targetBucketNum, boolean preferSinkBackendBuckets) { + for (int offset = 0; offset < numBuckets && selectedBucketSeqs.size() < targetBucketNum; offset++) { + int bucketSeq = (int) Math.floorMod(baseTabletIndex + offset, (long) numBuckets); + if (selectedBucketSet.contains(bucketSeq)) { + continue; + } + Long ownerBeId = bucketToOwnerBe.get(bucketSeq); + if (preferSinkBackendBuckets && (ownerBeId == null || !sinkBackendIdSet.contains(ownerBeId))) { + continue; + } + if (!preferSinkBackendBuckets && ownerBeId != null && sinkBackendIdSet.contains(ownerBeId)) { + continue; + } + selectedBucketSet.add(bucketSeq); + selectedBucketSeqs.add(bucketSeq); + } + } + + private static Map> buildBeToBucketSeqs(Map bucketToOwnerBe, + List bucketSeqs) { + Map> beToBucketSeqs = new HashMap<>(); + for (Integer bucketSeq : bucketSeqs) { + Long ownerBeId = bucketToOwnerBe.get(bucketSeq); + if (ownerBeId == null) { + continue; + } + beToBucketSeqs.computeIfAbsent(ownerBeId, ignored -> new ArrayList<>()).add(bucketSeq); + } + return beToBucketSeqs; + } + + private static int selectBucketSeq(List bucketSeqs, long baseTabletIndex) { + if (bucketSeqs == null || bucketSeqs.isEmpty()) { + return -1; + } + int bucketPos = (int) Math.floorMod(baseTabletIndex, (long) bucketSeqs.size()); + return bucketSeqs.get(bucketPos); + } + + private static int selectLeastUsedBucketSeq(List bucketSeqs, Map bucketUseCounts, + long baseTabletIndex) { + if (bucketSeqs == null || bucketSeqs.isEmpty()) { + return -1; + } + int startIdx = (int) Math.floorMod(baseTabletIndex, (long) bucketSeqs.size()); + int selectedBucketSeq = -1; + int selectedUseCount = Integer.MAX_VALUE; + for (int offset = 0; offset < bucketSeqs.size(); offset++) { + int candidateBucketSeq = bucketSeqs.get((startIdx + offset) % bucketSeqs.size()); + int candidateUseCount = bucketUseCounts.getOrDefault(candidateBucketSeq, 0); + if (selectedBucketSeq < 0 || candidateUseCount < selectedUseCount) { + selectedBucketSeq = candidateBucketSeq; + selectedUseCount = candidateUseCount; + } + } + return selectedBucketSeq; + } + + private static List rotateBucketSeqsForStartBucket(List bucketSeqs, int startBucketSeq) { + if (bucketSeqs == null || bucketSeqs.isEmpty()) { + return Collections.singletonList(startBucketSeq); + } + int startIdx = bucketSeqs.indexOf(startBucketSeq); + Preconditions.checkState(startIdx >= 0, + "start bucket %s must exist in bucketSeqs %s", startBucketSeq, bucketSeqs); + List rotatedBucketSeqs = new ArrayList<>(bucketSeqs.size()); + for (int offset = 0; offset < bucketSeqs.size(); offset++) { + rotatedBucketSeqs.add(bucketSeqs.get((startIdx + offset) % bucketSeqs.size())); + } + return rotatedBucketSeqs; + } + private TOlapTablePartitionParam createPartition(long dbId, OlapTable table) throws UserException { TOlapTablePartitionParam partitionParam = new TOlapTablePartitionParam(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 5181a77b042f57..486fee7c08ef56 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -60,6 +60,7 @@ import org.apache.doris.planner.MultiCastDataSink; import org.apache.doris.planner.MultiCastPlanFragment; import org.apache.doris.planner.OlapScanNode; +import org.apache.doris.planner.OlapTableSink; import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.PlanFragmentId; import org.apache.doris.planner.PlanNode; @@ -96,6 +97,7 @@ import org.apache.doris.thrift.PaloInternalServiceVersion; import org.apache.doris.thrift.TAIResource; import org.apache.doris.thrift.TBrokerScanRange; +import org.apache.doris.thrift.TDataSink; import org.apache.doris.thrift.TDataSinkType; import org.apache.doris.thrift.TDescriptorTable; import org.apache.doris.thrift.TErrorTabletInfo; @@ -105,6 +107,7 @@ import org.apache.doris.thrift.TFileScanRangeParams; import org.apache.doris.thrift.TFragmentInstanceReport; import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TOlapTableSink; import org.apache.doris.thrift.TPaloScanRange; import org.apache.doris.thrift.TPipelineFragmentParams; import org.apache.doris.thrift.TPipelineFragmentParamsList; @@ -155,6 +158,7 @@ import java.security.SecureRandom; import java.time.LocalDateTime; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -527,6 +531,54 @@ public Map getBeToInstancesNum() { return result; } + private static void assignAdaptiveRandomBucketForFragment( + Collection fragmentParamsList) { + List sinkParams = fragmentParamsList.stream() + .filter(param -> param.getFragment().getOutputSink() != null + && param.getFragment().getOutputSink().getType() == TDataSinkType.OLAP_TABLE_SINK) + .collect(Collectors.toList()); + if (sinkParams.isEmpty()) { + return; + } + TOlapTableSink sink = sinkParams.get(0).getFragment().getOutputSink().getOlapTableSink(); + if (!OlapTableSink.shouldAssignAdaptiveRandomBucket(sink)) { + return; + } + List sinkBackendIds = sinkParams.stream() + .map(TPipelineFragmentParams::getBackendId) + .distinct() + .sorted() + .collect(Collectors.toList()); + int planFragmentNum = sinkParams.stream() + .mapToInt(TPipelineFragmentParams::getLocalParamsSize) + .sum(); + if (LOG.isInfoEnabled()) { + LOG.info("Adaptive random bucket planning in legacy fragment={}, sinkBackendIds={}, " + + "planFragmentNum={}", + sinkParams.get(0).getFragmentId(), sinkBackendIds, planFragmentNum); + } + Map> assignments = + OlapTableSink.computeAdaptiveRandomBucketAssignments(sinkBackendIds, + sink.getPartition().getPartitions(), sink.getLocation().getTablets(), planFragmentNum); + for (TPipelineFragmentParams sinkParam : sinkParams) { + Map partitionAssignments = + assignments.get(sinkParam.getBackendId()); + if (partitionAssignments == null) { + continue; + } + TOlapTableSink copiedSink = deepCopyOlapTableSinkForCurrentBackend(sinkParam); + OlapTableSink.applyAdaptiveRandomBucketAssignments( + copiedSink.getPartition().getPartitions(), + partitionAssignments); + } + } + + private static TOlapTableSink deepCopyOlapTableSinkForCurrentBackend(TPipelineFragmentParams sinkParam) { + TDataSink copiedOutputSink = sinkParam.getFragment().getOutputSink().deepCopy(); + sinkParam.getFragment().setOutputSink(copiedOutputSink); + return copiedOutputSink.getOlapTableSink(); + } + // Initialize protected void prepare() throws UserException { for (PlanFragment fragment : fragments) { @@ -887,6 +939,7 @@ protected void sendPipelineCtx() throws Exception { } ++backendIdx; } + assignAdaptiveRandomBucketForFragment(tParams.values()); for (Map.Entry entry : tParams.entrySet()) { if (entry.getValue().getFragment().getOutputSink() != null && entry.getValue().getFragment().getOutputSink().getType() diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java index 4f8499970c98dc..8b50a30227eb54 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java @@ -55,10 +55,12 @@ import org.apache.doris.qe.CoordinatorContext; import org.apache.doris.thrift.PaloInternalServiceVersion; import org.apache.doris.thrift.TAIResource; +import org.apache.doris.thrift.TDataSink; import org.apache.doris.thrift.TDataSinkType; import org.apache.doris.thrift.TExpr; import org.apache.doris.thrift.TFileScanRangeParams; import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TOlapTableSink; import org.apache.doris.thrift.TPipelineFragmentParams; import org.apache.doris.thrift.TPipelineFragmentParamsList; import org.apache.doris.thrift.TPipelineInstanceParams; @@ -97,6 +99,7 @@ import java.util.TreeSet; import java.util.function.BiConsumer; import java.util.function.Supplier; +import java.util.stream.Collectors; public class ThriftPlansBuilder { private static final Logger LOG = LogManager.getLogger(ThriftPlansBuilder.class); @@ -250,7 +253,22 @@ private static void setParamsForOlapTableSink(List dist } } + Map> olapSinkParamsByFragmentId = Maps.newLinkedHashMap(); + for (Entry kv : fragmentsGroupByWorker.entrySet()) { + for (TPipelineFragmentParams fragmentParams : kv.getValue().getParamsList()) { + TDataSink outputSink = fragmentParams.getFragment().getOutputSink(); + if (outputSink != null && outputSink.getType() == TDataSinkType.OLAP_TABLE_SINK) { + olapSinkParamsByFragmentId.computeIfAbsent(fragmentParams.getFragmentId(), + ignored -> new ArrayList<>()).add(fragmentParams); + } + } + } + for (List sinkParams : olapSinkParamsByFragmentId.values()) { + assignAdaptiveRandomBucketForSinkParams(sinkParams); + } + ConnectContext connectContext = coordinatorContext.connectContext; + for (Entry kv : fragmentsGroupByWorker.entrySet()) { TPipelineFragmentParamsList fragments = kv.getValue(); for (TPipelineFragmentParams fragmentParams : fragments.getParamsList()) { @@ -269,6 +287,49 @@ private static void setParamsForOlapTableSink(List dist } } + private static void assignAdaptiveRandomBucketForSinkParams(List sinkParams) { + if (sinkParams.isEmpty()) { + return; + } + TOlapTableSink sink = sinkParams.get(0).getFragment().getOutputSink().getOlapTableSink(); + if (!OlapTableSink.shouldAssignAdaptiveRandomBucket(sink)) { + return; + } + List sinkBackendIds = sinkParams.stream() + .map(TPipelineFragmentParams::getBackendId) + .distinct() + .sorted() + .collect(Collectors.toList()); + int planFragmentNum = sinkParams.stream() + .mapToInt(TPipelineFragmentParams::getLocalParamsSize) + .sum(); + if (LOG.isInfoEnabled()) { + LOG.info("Adaptive random bucket planning in nereids fragment={}, sinkBackendIds={}, " + + "planFragmentNum={}", + sinkParams.get(0).getFragmentId(), sinkBackendIds, planFragmentNum); + } + Map> assignments = + OlapTableSink.computeAdaptiveRandomBucketAssignments(sinkBackendIds, + sink.getPartition().getPartitions(), sink.getLocation().getTablets(), planFragmentNum); + for (TPipelineFragmentParams sinkParam : sinkParams) { + Map partitionAssignments = + assignments.get(sinkParam.getBackendId()); + if (partitionAssignments == null) { + continue; + } + TOlapTableSink copiedSink = deepCopyOlapTableSinkForCurrentBackend(sinkParam); + OlapTableSink.applyAdaptiveRandomBucketAssignments( + copiedSink.getPartition().getPartitions(), + partitionAssignments); + } + } + + private static TOlapTableSink deepCopyOlapTableSinkForCurrentBackend(TPipelineFragmentParams sinkParam) { + TDataSink copiedOutputSink = sinkParam.getFragment().getOutputSink().deepCopy(); + sinkParam.getFragment().setOutputSink(copiedOutputSink); + return copiedOutputSink.getOlapTableSink(); + } + private static Multiset computeInstanceNumPerWorker( List distributedPlans) { Multiset workerCounter = LinkedHashMultiset.create(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 42a63d90961850..17759957ea3892 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -29,6 +29,7 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.DistributionInfo; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MaterializedIndex; import org.apache.doris.catalog.OlapTable; @@ -106,6 +107,8 @@ import org.apache.doris.nereids.trees.plans.commands.info.DropPartitionOp; import org.apache.doris.nereids.trees.plans.commands.info.LabelNameInfo; import org.apache.doris.nereids.trees.plans.commands.info.ReplacePartitionOp; +import org.apache.doris.nereids.trees.plans.distribute.PipelineDistributedPlan; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJob; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.planner.OlapTableSink; import org.apache.doris.qe.ConnectContext; @@ -346,6 +349,7 @@ import java.util.Optional; import java.util.Random; import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -4463,6 +4467,9 @@ public TCreatePartitionResult createPartition(TCreatePartitionRequest request) t List tablets = new ArrayList<>(); List slaveTablets = new ArrayList<>(); List partitions = Lists.newArrayList(); + Backend requestBackend = request.isSetBeEndpoint() ? resolveBeEndpoint(request.getBeEndpoint()) : null; + long adaptiveBucketBeId = requestBackend != null ? requestBackend.getId() : -1L; + TUniqueId queryId = request.isSetQueryId() ? request.getQueryId() : null; for (String partitionName : addPartitionClauseMap.keySet()) { Partition partition = table.getPartition(partitionName); // For thread safety, we preserve the tablet distribution information of each partition @@ -4486,12 +4493,25 @@ public TCreatePartitionResult createPartition(TCreatePartitionRequest request) t tPartition.setNumBuckets(index.getTablets().size()); } tPartition.setIsMutable(olapTable.getPartitionInfo().getIsMutable(partition.getId())); + if (partition.getDistributionInfo().getType() == DistributionInfo.DistributionInfoType.RANDOM) { + try { + int tabletIndex = Env.getCurrentEnv().getTabletLoadIndexRecorderMgr() + .getCurrentTabletLoadIndex(dbId, olapTable.getId(), partition); + tPartition.setLoadTabletIdx(tabletIndex); + } catch (UserException ex) { + errorStatus.setErrorMsgs(Lists.newArrayList(ex.getMessage())); + result.setStatus(errorStatus); + LOG.warn("send create partition error status: {}", result); + return result; + } + } partitions.add(tPartition); // tablet if (needUseCache && Env.getCurrentGlobalTransactionMgr().getAutoPartitionCacheMgr() .getAutoPartitionInfo(txnId, partition.getId(), partitionTablets, partitionSlaveTablets)) { + assignAdaptiveBucketToPartition(tPartition, partitionTablets, adaptiveBucketBeId, queryId); // fast path, if cached tablets.addAll(partitionTablets); slaveTablets.addAll(partitionSlaveTablets); @@ -4559,6 +4579,8 @@ public TCreatePartitionResult createPartition(TCreatePartitionRequest request) t } } + assignAdaptiveBucketToPartition(tPartition, partitionTablets, adaptiveBucketBeId, queryId); + if (needUseCache) { Env.getCurrentGlobalTransactionMgr().getAutoPartitionCacheMgr() .getOrSetAutoPartitionInfo(txnId, partition.getId(), partitionTablets, @@ -4780,6 +4802,9 @@ public TReplacePartitionResult replacePartition(TReplacePartitionRequest request List tablets = new ArrayList<>(); List slaveTablets = new ArrayList<>(); PartitionInfo partitionInfo = olapTable.getPartitionInfo(); + Backend requestBackend = request.isSetBeEndpoint() ? resolveBeEndpoint(request.getBeEndpoint()) : null; + long adaptiveBucketBeId = requestBackend != null ? requestBackend.getId() : -1L; + TUniqueId queryId = request.isSetQueryId() ? request.getQueryId() : null; for (long partitionId : resultPartitionIds) { Partition partition = olapTable.getPartition(partitionId); // For thread safety, we preserve the tablet distribution information of each partition @@ -4805,12 +4830,25 @@ public TReplacePartitionResult replacePartition(TReplacePartitionRequest request tPartition.setNumBuckets(index.getTablets().size()); } tPartition.setIsMutable(olapTable.getPartitionInfo().getIsMutable(partition.getId())); + if (partition.getDistributionInfo().getType() == DistributionInfo.DistributionInfoType.RANDOM) { + try { + int tabletIndex = Env.getCurrentEnv().getTabletLoadIndexRecorderMgr() + .getCurrentTabletLoadIndex(dbId, olapTable.getId(), partition); + tPartition.setLoadTabletIdx(tabletIndex); + } catch (UserException ex) { + errorStatus.setErrorMsgs(Lists.newArrayList(ex.getMessage())); + result.setStatus(errorStatus); + LOG.warn("send replace partition error status: {}", result); + return result; + } + } partitions.add(tPartition); // tablet if (needUseCache && txnId != 0 && Env.getCurrentGlobalTransactionMgr().getAutoPartitionCacheMgr() .getAutoPartitionInfo(txnId, partition.getId(), partitionTablets, partitionSlaveTablets)) { + assignAdaptiveBucketToPartition(tPartition, partitionTablets, adaptiveBucketBeId, queryId); // fast path, if cached tablets.addAll(partitionTablets); slaveTablets.addAll(partitionSlaveTablets); @@ -4878,6 +4916,8 @@ public TReplacePartitionResult replacePartition(TReplacePartitionRequest request } } + assignAdaptiveBucketToPartition(tPartition, partitionTablets, adaptiveBucketBeId, queryId); + if (DebugPointUtil.isEnable("FE.FrontendServiceImpl.replacePartition.DisableCache")) { needUseCache = false; } @@ -5569,4 +5609,94 @@ private TStatus checkMaster() { } return status; } + + private static final class AdaptiveBucketSinkContext { + private final List sinkBackendIds; + private final int planFragmentNum; + + private AdaptiveBucketSinkContext(List sinkBackendIds, int planFragmentNum) { + this.sinkBackendIds = sinkBackendIds; + this.planFragmentNum = planFragmentNum; + } + } + + private static AdaptiveBucketSinkContext collectAdaptiveBucketSinkContext(TUniqueId queryId, long currentBeId) { + if (queryId == null) { + return new AdaptiveBucketSinkContext(Lists.newArrayList(currentBeId), 1); + } + Coordinator coordinator = QeProcessorImpl.INSTANCE.getCoordinator(queryId); + if (!(coordinator instanceof NereidsCoordinator)) { + return new AdaptiveBucketSinkContext(Lists.newArrayList(currentBeId), 1); + } + Set sinkBackendIds = new TreeSet<>(); + int planFragmentNum = 0; + for (PipelineDistributedPlan distributedPlan : + ((NereidsCoordinator) coordinator).getCoordinatorContext().distributedPlans) { + if (!(distributedPlan.getFragmentJob().getFragment().getSink() instanceof OlapTableSink)) { + continue; + } + planFragmentNum += distributedPlan.getInstanceJobs().size(); + for (AssignedJob assignedJob : distributedPlan.getInstanceJobs()) { + sinkBackendIds.add(assignedJob.getAssignedWorker().id()); + } + } + if (sinkBackendIds.isEmpty()) { + sinkBackendIds.add(currentBeId); + } + return new AdaptiveBucketSinkContext(new ArrayList<>(sinkBackendIds), Math.max(planFragmentNum, 1)); + } + + private static void assignAdaptiveBucketToPartition(TOlapTablePartition partition, + List partitionTablets, long currentBeId, TUniqueId queryId) { + if (!Config.enable_adaptive_random_bucket_load + || !partition.isSetLoadTabletIdx() || currentBeId <= 0) { + return; + } + AdaptiveBucketSinkContext sinkContext = collectAdaptiveBucketSinkContext(queryId, currentBeId); + if (LOG.isInfoEnabled()) { + LOG.info("Adaptive random bucket replanning partition={}, currentBeId={}, queryId={}, " + + "sinkBackendIds={}, planFragmentNum={}", + partition.getId(), currentBeId, queryId, sinkContext.sinkBackendIds, + sinkContext.planFragmentNum); + } + Map> assignments = + OlapTableSink.computeAdaptiveRandomBucketAssignments( + sinkContext.sinkBackendIds, Lists.newArrayList(partition), partitionTablets, + sinkContext.planFragmentNum); + Map partitionAssignments = assignments.get(currentBeId); + if (partitionAssignments != null) { + OlapTableSink.AdaptiveBucketAssignment assignment = partitionAssignments.get(partition.getId()); + if (LOG.isInfoEnabled() && assignment != null) { + LOG.info("Adaptive random bucket replan result partition={}, currentBeId={}, bucketBeId={}, " + + "loadTabletIdx={}, localBucketSeqs={}", + partition.getId(), currentBeId, assignment.getBucketBeId(), + assignment.getLoadTabletIdx(), assignment.getLocalBucketSeqs()); + } + OlapTableSink.applyAdaptiveRandomBucketAssignments( + Lists.newArrayList(partition), partitionAssignments); + } else { + LOG.warn("Adaptive random bucket found no partition assignments for partition {}, currentBeId={}, " + + "queryId={}, sinkBackendIds={}", + partition.getId(), currentBeId, queryId, sinkContext.sinkBackendIds); + } + } + + /** + * Resolves a BE endpoint string ("host:heartbeat_port") to a Backend object. + * Returns null if the endpoint is malformed or the backend cannot be found. + */ + private static Backend resolveBeEndpoint(String beEndpoint) { + int colonIdx = beEndpoint.lastIndexOf(':'); + if (colonIdx < 0) { + return null; + } + String host = beEndpoint.substring(0, colonIdx); + try { + int port = Integer.parseInt(beEndpoint.substring(colonIdx + 1)); + return Env.getCurrentSystemInfo().getBackendWithHeartbeatPort(host, port); + } catch (NumberFormatException e) { + return null; + } + } + } diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 43e77cd037e3d4..e881940697e6bf 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -103,6 +103,11 @@ message PTabletWithPartition { required int64 tablet_id = 2; } +message PRandomBucketPartitionParam { + required int64 partition_id = 1; + repeated int64 ordered_tablet_ids = 2; +} + message PTabletLoadRowsetInfo { required int32 current_rowset_nums = 1; required int32 max_config_rowset_nums = 2; @@ -148,6 +153,8 @@ message PTabletWriterOpenRequest { optional string storage_vault_id = 18; optional int32 sender_id = 19; optional int64 workload_group_id = 20; + optional bool is_receiver_side_random_bucket = 21 [default = false]; + repeated PRandomBucketPartitionParam random_bucket_partitions = 22; }; message PTabletWriterOpenResult { @@ -205,6 +212,7 @@ message PTabletWriterAddBlockRequest { optional bool is_single_tablet_block = 14 [default = false]; // for auto-partition first stage close, we should hang. optional bool hang_wait = 15 [default = false]; + optional bool is_receiver_side_random_bucket = 16 [default = false]; }; message PSlaveTabletNodes { @@ -1241,4 +1249,3 @@ service PBackendService { rpc fetch_peer_data(PFetchPeerDataRequest) returns (PFetchPeerDataResponse); rpc request_cdc_client(PRequestCdcClientRequest) returns (PRequestCdcClientResult); }; - diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift index 0d6d9126a06f2a..ac7796ac0ad21d 100644 --- a/gensrc/thrift/DataSinks.thrift +++ b/gensrc/thrift/DataSinks.thrift @@ -304,6 +304,14 @@ struct TOlapTableSink { 23: optional double max_filter_ratio 24: optional string storage_vault_id + + // When true, FE should assign each sink a bucket_be_id/load_tablet_idx pair for random + // distribution partitions. BE then derives the bucket sequence owned by bucket_be_id from + // the location info and rotates within that sequence once per-tablet write volume exceeds + // the threshold (default 200 MB). This flag is set regardless of whether the initial + // partition list is empty, so auto-partition tables whose first partitions arrive at + // runtime still enter the correct mode from the start. + 25: optional bool enable_adaptive_random_bucket } struct THiveLocationParams { diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index 432da0d49c3f9a..dd879e13392d0a 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -278,13 +278,20 @@ struct TOlapTablePartition { 9: optional bool is_mutable = true // only used in List Partition 10: optional bool is_default_partition; - // only used in random distribution scenario to make data distributed even + // only used in random distribution scenario: + // - legacy mode: global round-robin index / fixed bucket index + // - adaptive random bucket mode: FE-selected starting bucket seq for this sink 11: optional i64 load_tablet_idx 12: optional i32 total_replica_num 13: optional i32 load_required_replica_num // tablet_id -> list of backend_ids that have version gaps (lastFailedVersion >= 0) // used by BE to exclude these backends from success counting in majority write 14: optional map> tablet_version_gap_backends + // only used in adaptive random bucket mode: FE-selected bucket owner BE for this sink + 15: optional i64 bucket_be_id + // only used in adaptive random bucket mode: FE-selected bucket seqs for this sink. + // When set, BE uses them directly and skips recomputing from tablet locations. + 16: optional list local_bucket_seqs } struct TOlapTablePartitionParam { From 63f90409b4810cf266831c498125b311583d1a07 Mon Sep 17 00:00:00 2001 From: laihui Date: Mon, 8 Jun 2026 22:48:38 +0800 Subject: [PATCH 2/2] [fix](load) Preserve partitions on receiver random bucket close --- be/src/exec/sink/writer/vtablet_writer.cpp | 54 +++++++++++++--------- 1 file changed, 33 insertions(+), 21 deletions(-) diff --git a/be/src/exec/sink/writer/vtablet_writer.cpp b/be/src/exec/sink/writer/vtablet_writer.cpp index a551bbbba26c5e..a8b0e6dfc2dc23 100644 --- a/be/src/exec/sink/writer/vtablet_writer.cpp +++ b/be/src/exec/sink/writer/vtablet_writer.cpp @@ -46,14 +46,8 @@ #include #include "cloud/config.h" -#include "common/config.h" -#include "core/data_type/data_type.h" -#include "cpp/sync_point.h" -#include "exec/sink/vrow_distribution.h" -#include "exprs/vexpr_fwd.h" -#include "runtime/runtime_profile.h" - #include "common/compiler_util.h" // IWYU pragma: keep +#include "common/config.h" #include "common/logging.h" #include "common/metrics/doris_metrics.h" #include "common/object_pool.h" @@ -62,14 +56,19 @@ #include "core/block/block.h" #include "core/column/column.h" #include "core/column/column_const.h" +#include "core/data_type/data_type.h" #include "core/data_type/data_type_nullable.h" +#include "cpp/sync_point.h" +#include "exec/sink/vrow_distribution.h" #include "exec/sink/vtablet_block_convertor.h" #include "exec/sink/vtablet_finder.h" #include "exprs/vexpr.h" +#include "exprs/vexpr_fwd.h" #include "runtime/descriptors.h" #include "runtime/exec_env.h" #include "runtime/memory/memory_reclamation.h" #include "runtime/query_context.h" +#include "runtime/runtime_profile.h" #include "runtime/runtime_state.h" #include "runtime/thread_context.h" #include "service/backend_options.h" @@ -705,19 +704,20 @@ void VNodeChannel::_open_internal(bool is_incremental) { for (auto bucket_seq : local_bucket_seqs) { if (bucket_seq < 0 || bucket_seq >= cast_set(full_ordered_tablets->size())) { - LOG(WARNING) << "invalid local bucket seq, load_id=" << _parent->_load_id - << ", partition_id=" << partition_id - << ", bucket_seq=" << bucket_seq - << ", full_ordered_tablets_size=" - << full_ordered_tablets->size(); + LOG(WARNING) + << "invalid local bucket seq, load_id=" << _parent->_load_id + << ", partition_id=" << partition_id + << ", bucket_seq=" << bucket_seq + << ", full_ordered_tablets_size=" << full_ordered_tablets->size(); continue; } auto tablet_id = (*full_ordered_tablets)[bucket_seq]; if (!partition_to_local_tablets[partition_id].contains(tablet_id)) { - LOG(WARNING) << "skip non-local tablet selected by local bucket seq, load_id=" - << _parent->_load_id << ", partition_id=" << partition_id - << ", bucket_seq=" << bucket_seq - << ", tablet_id=" << tablet_id << ", node_id=" << _node_id; + LOG(WARNING) + << "skip non-local tablet selected by local bucket seq, load_id=" + << _parent->_load_id << ", partition_id=" << partition_id + << ", bucket_seq=" << bucket_seq << ", tablet_id=" << tablet_id + << ", node_id=" << _node_id; continue; } random_bucket_partition->add_ordered_tablet_ids(tablet_id); @@ -1010,8 +1010,9 @@ void VNodeChannel::try_send_pending_block(RuntimeState* state) { // tablet_ids has already set when add row request->set_packet_seq(_next_packet_seq); auto block = mutable_block->to_block(); - int request_rows = request->is_receiver_side_random_bucket() ? request->partition_ids_size() - : request->tablet_ids_size(); + int request_rows = request->is_receiver_side_random_bucket() && !request->eos() + ? request->partition_ids_size() + : request->tablet_ids_size(); CHECK(block.rows() == request_rows) << "block rows: " << block.rows() << ", request_rows: " << request_rows; if (block.rows() > 0) { @@ -1054,7 +1055,7 @@ void VNodeChannel::try_send_pending_block(RuntimeState* state) { } if (request->eos()) { - if (!request->is_receiver_side_random_bucket()) { + if (!request->is_receiver_side_random_bucket() || !request->has_block()) { for (auto pid : _parent->_tablet_finder->partition_ids()) { request->add_partition_ids(pid); } @@ -1405,14 +1406,25 @@ void VNodeChannel::mark_close(bool hang_wait) { return; } - _cur_add_block_request->set_eos(true); - _cur_add_block_request->set_hang_wait(hang_wait); + bool need_receiver_side_random_bucket_eos = + _cur_add_block_request->is_receiver_side_random_bucket(); { std::lock_guard l(_pending_batches_lock); if (!_cur_mutable_block) [[unlikely]] { // never had a block arrived. add a dummy block _cur_mutable_block = MutableBlock::create_unique(); } + if (need_receiver_side_random_bucket_eos && _cur_mutable_block->rows() > 0) { + _cur_add_block_request->set_eos(false); + auto tmp_add_block_request = + std::make_shared(*_cur_add_block_request); + _pending_blocks.emplace(std::move(_cur_mutable_block), tmp_add_block_request); + _pending_batches_num++; + _cur_add_block_request->clear_partition_ids(); + _cur_mutable_block = MutableBlock::create_unique(); + } + _cur_add_block_request->set_eos(true); + _cur_add_block_request->set_hang_wait(hang_wait); auto tmp_add_block_request = std::make_shared(*_cur_add_block_request); // when prepare to close, add block to queue so that try_send_pending_block thread will send it.