diff --git a/be/src/cloud/cloud_delta_writer.cpp b/be/src/cloud/cloud_delta_writer.cpp index c54d26921d6ae7..507a1a8f99a87c 100644 --- a/be/src/cloud/cloud_delta_writer.cpp +++ b/be/src/cloud/cloud_delta_writer.cpp @@ -22,6 +22,8 @@ #include "cloud/cloud_storage_engine.h" #include "cloud/config.h" #include "load/delta_writer/delta_writer.h" +#include "load/memtable/memtable_memory_limiter.h" +#include "runtime/exec_env.h" #include "runtime/thread_context.h" namespace doris { @@ -64,10 +66,22 @@ Status CloudDeltaWriter::batch_init(std::vector writers) { return cloud::bthread_fork_join(tasks, 10); } -Status CloudDeltaWriter::write(const Block* block, const DorisVector& row_idxs) { +Status CloudDeltaWriter::write(const Block* block, const DorisVector& row_idxs, + bool* memtable_flushed) { + if (memtable_flushed != nullptr) { + *memtable_flushed = false; + } if (row_idxs.empty()) [[unlikely]] { return Status::OK(); } + if (_req.enable_table_memtable_backpressure) { + ExecEnv::GetInstance()->memtable_memory_limiter()->handle_table_memtable_backpressure( + [this]() { + std::lock_guard lock(_mtx); + return _is_cancelled; + }, + table_id()); + } std::lock_guard lock(_mtx); CHECK(_is_init || _is_cancelled); { @@ -77,7 +91,7 @@ Status CloudDeltaWriter::write(const Block* block, const DorisVector& std::this_thread::sleep_for(std::chrono::milliseconds(10)); } } - return _memtable_writer->write(block, row_idxs); + return _memtable_writer->write(block, row_idxs, memtable_flushed); } Status CloudDeltaWriter::close() { @@ -86,6 +100,11 @@ Status CloudDeltaWriter::close() { return _memtable_writer->close(); } +Status CloudDeltaWriter::flush_memtable_async() { + std::lock_guard lock(_mtx); + return BaseDeltaWriter::flush_memtable_async(); +} + Status CloudDeltaWriter::cancel_with_status(const Status& st) { std::lock_guard lock(_mtx); return BaseDeltaWriter::cancel_with_status(st); diff --git a/be/src/cloud/cloud_delta_writer.h b/be/src/cloud/cloud_delta_writer.h index 614bfd0f16af0b..6ab9251213fdbe 100644 --- a/be/src/cloud/cloud_delta_writer.h +++ b/be/src/cloud/cloud_delta_writer.h @@ -33,10 +33,13 @@ class CloudDeltaWriter final : public BaseDeltaWriter { const UniqueId& load_id); ~CloudDeltaWriter() override; - Status write(const Block* block, const DorisVector& row_idxs) override; + Status write(const Block* block, const DorisVector& row_idxs, + bool* memtable_flushed = nullptr) override; Status close() override; + Status flush_memtable_async() override; + Status cancel_with_status(const Status& st) override; Status build_rowset() override; diff --git a/be/src/cloud/cloud_tablets_channel.cpp b/be/src/cloud/cloud_tablets_channel.cpp index 35ba9d964f4201..a8f3c6ee24cab6 100644 --- a/be/src/cloud/cloud_tablets_channel.cpp +++ b/be/src/cloud/cloud_tablets_channel.cpp @@ -64,6 +64,25 @@ Status CloudTabletsChannel::add_batch(const PTabletWriterAddBlockRequest& reques return Status::OK(); } + if (request.is_receiver_side_random_bucket()) { + std::unordered_map> partition_to_rowidxs; + RETURN_IF_ERROR(_build_partition_to_rowidxs_for_receiver_side_random_bucket( + request, &partition_to_rowidxs)); + if (!partition_to_rowidxs.empty()) { + std::unordered_set partition_ids; + partition_ids.reserve(partition_to_rowidxs.size()); + for (const auto& [partition_id, _] : partition_to_rowidxs) { + partition_ids.insert(partition_id); + } + { + std::lock_guard l(_tablet_writers_lock); + RETURN_IF_ERROR(_init_writers_by_partition_ids(partition_ids)); + } + } + return _write_block_data_for_receiver_side_random_bucket(request, cur_seq, + partition_to_rowidxs, response); + } + std::unordered_map> tablet_to_rowidxs; _build_tablet_to_rowidxs(request, &tablet_to_rowidxs); diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 86b7d12e0aa9df..32d0483f33e779 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -749,6 +749,8 @@ DEFINE_mInt32(memory_gc_sleep_time_ms, "500"); // max write buffer size before flush, default 200MB DEFINE_mInt64(write_buffer_size, "209715200"); DEFINE_mBool(enable_adaptive_write_buffer_size, "true"); +// Whether random bucket load rotates to the next local bucket when memtable flushes. +DEFINE_mBool(enable_adaptive_random_bucket_load_bucket_rotation, "true"); // max buffer size used in memtable for the aggregated table, default 400MB DEFINE_mInt64(write_buffer_size_for_agg, "104857600"); DEFINE_mInt64(min_write_buffer_size_for_partial_update, "1048576"); @@ -858,6 +860,11 @@ DEFINE_mDouble(min_flush_thread_num_per_cpu, "0.5"); // Whether to enable adaptive flush thread adjustment DEFINE_mBool(enable_adaptive_flush_threads, "true"); +// Whether to block writes when one table has too many pending flush memtables on this BE. +DEFINE_mBool(enable_table_memtable_flush_backpressure, "true"); +// Max pending flush memtables for one table on this BE before blocking new writes. +DEFINE_mInt32(table_memtable_flush_pending_count_limit, "10"); + // config for tablet meta checkpoint DEFINE_mInt32(tablet_meta_checkpoint_min_new_rowsets_num, "10"); DEFINE_mInt32(tablet_meta_checkpoint_min_interval_secs, "600"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 0b415ed5d2c4ae..50c99a1b1f009f 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -807,6 +807,8 @@ DECLARE_mInt32(memory_gc_sleep_time_ms); // max write buffer size before flush, default 200MB DECLARE_mInt64(write_buffer_size); DECLARE_mBool(enable_adaptive_write_buffer_size); +// Whether random bucket load rotates to the next local bucket when memtable flushes. +DECLARE_mBool(enable_adaptive_random_bucket_load_bucket_rotation); // max buffer size used in memtable for the aggregated table, default 400MB DECLARE_mInt64(write_buffer_size_for_agg); @@ -913,6 +915,11 @@ DECLARE_mDouble(min_flush_thread_num_per_cpu); // Whether to enable adaptive flush thread adjustment DECLARE_mBool(enable_adaptive_flush_threads); +// Whether to block writes when one table has too many pending flush memtables on this BE. +DECLARE_mBool(enable_table_memtable_flush_backpressure); +// Max pending flush memtables for one table on this BE before blocking new writes. +DECLARE_mInt32(table_memtable_flush_pending_count_limit); + // config for tablet meta checkpoint DECLARE_mInt32(tablet_meta_checkpoint_min_new_rowsets_num); DECLARE_mInt32(tablet_meta_checkpoint_min_interval_secs); diff --git a/be/src/exec/sink/delta_writer_v2_pool.cpp b/be/src/exec/sink/delta_writer_v2_pool.cpp index de03011bf2eaa7..9cfcdf87b61078 100644 --- a/be/src/exec/sink/delta_writer_v2_pool.cpp +++ b/be/src/exec/sink/delta_writer_v2_pool.cpp @@ -17,6 +17,7 @@ #include "exec/sink/delta_writer_v2_pool.h" +#include "exec/sink/vtablet_finder.h" #include "load/delta_writer/delta_writer_v2.h" #include "runtime/runtime_profile.h" diff --git a/be/src/exec/sink/vrow_distribution.cpp b/be/src/exec/sink/vrow_distribution.cpp index 222dc7f8bd2989..ac3ce5d8715061 100644 --- a/be/src/exec/sink/vrow_distribution.cpp +++ b/be/src/exec/sink/vrow_distribution.cpp @@ -112,6 +112,7 @@ Status VRowDistribution::automatic_create_partition() { request.__set_be_endpoint(be_endpoint); request.__set_write_single_replica(_write_single_replica); request.__set_load_to_single_tablet(_tablet_finder->is_find_tablet_every_sink()); + request.__set_enable_adaptive_random_bucket(_tablet_finder->is_adaptive_random_bucket()); if (_state && _state->get_query_ctx()) { // Pass query_id to FE so it can determine if this is a multi-instance load by checking Coordinator request.__set_query_id(_state->get_query_ctx()->query_id()); @@ -146,13 +147,14 @@ Status VRowDistribution::automatic_create_partition() { Status status(Status::create(result.status)); VLOG_NOTICE << "automatic partition rpc end response " << result; if (result.status.status_code == TStatusCode::OK) { - RETURN_IF_ERROR(_create_partition_callback(_caller, &result)); - // add new created partitions + // Add new partitions before incremental open because adaptive random bucket builds + // sender/receiver routing params from _vpartition. RETURN_IF_ERROR(_vpartition->add_partitions(result.partitions)); for (const auto& part : result.partitions) { _new_partition_ids.insert(part.id); VLOG_TRACE << "record new id: " << part.id; } + RETURN_IF_ERROR(_create_partition_callback(_caller, &result)); } // Record this request's elapsed time @@ -214,6 +216,7 @@ Status VRowDistribution::_replace_overwriting_partition() { std::string be_endpoint = BackendOptions::get_be_endpoint(); request.__set_be_endpoint(be_endpoint); request.__set_load_to_single_tablet(_tablet_finder->is_find_tablet_every_sink()); + request.__set_enable_adaptive_random_bucket(_tablet_finder->is_adaptive_random_bucket()); if (_state && _state->get_query_ctx()) { // Pass query_id to FE so it can determine if this is a multi-instance load by checking Coordinator request.__set_query_id(_state->get_query_ctx()->query_id()); @@ -248,17 +251,18 @@ Status VRowDistribution::_replace_overwriting_partition() { Status status(Status::create(result.status)); VLOG_NOTICE << "auto detect replace partition result: " << result; if (result.status.status_code == TStatusCode::OK) { - // Reuse the function as the args' structure are same. It adds nodes/locations - // and waits for incremental_open before the new tablets become routable. - auto result_as_create = cast_as_create_result(result); - RETURN_IF_ERROR(_create_partition_callback(_caller, &result_as_create)); // record new partitions for (const auto& part : result.partitions) { _new_partition_ids.insert(part.id); VLOG_TRACE << "record new id: " << part.id; } // replace data in _partitions + // Adaptive random bucket builds sender/receiver routing params from _vpartition during + // incremental open, so the replacement must be visible first. RETURN_IF_ERROR(_vpartition->replace_partitions(request_part_ids, result.partitions)); + // Reuse the function as the args' structure are same. It adds nodes/locations. + auto result_as_create = cast_as_create_result(result); + RETURN_IF_ERROR(_create_partition_callback(_caller, &result_as_create)); } return status; @@ -292,7 +296,9 @@ void VRowDistribution::_filter_block_by_skip(Block* block, RowPartTabletIds& row if (!_skip[i]) { row_ids.emplace_back(i); partition_ids.emplace_back(_partitions[i]->id); - tablet_ids.emplace_back(_tablet_ids[i]); + if (!_tablet_finder->is_adaptive_random_bucket()) { + tablet_ids.emplace_back(_tablet_ids[i]); + } } } } @@ -315,7 +321,9 @@ Status VRowDistribution::_filter_block_by_skip_and_where_clause( if (nullable_column->get_bool_inline(i) && !_skip[i]) { row_ids.emplace_back(i); partition_ids.emplace_back(_partitions[i]->id); - tablet_ids.emplace_back(_tablet_ids[i]); + if (!_tablet_finder->is_adaptive_random_bucket()) { + tablet_ids.emplace_back(_tablet_ids[i]); + } } } } else if (const auto* const_column = check_and_get_column(*filter_column)) { @@ -334,7 +342,9 @@ Status VRowDistribution::_filter_block_by_skip_and_where_clause( if (filter[i] != 0 && !_skip[i]) { row_ids.emplace_back(i); partition_ids.emplace_back(_partitions[i]->id); - tablet_ids.emplace_back(_tablet_ids[i]); + if (!_tablet_finder->is_adaptive_random_bucket()) { + tablet_ids.emplace_back(_tablet_ids[i]); + } } } } @@ -345,7 +355,9 @@ Status VRowDistribution::_filter_block_by_skip_and_where_clause( Status VRowDistribution::_filter_block(Block* block, std::vector& row_part_tablet_ids) { for (int i = 0; i < _schema->indexes().size(); i++) { - _get_tablet_ids(block, i, _tablet_ids); + if (!_tablet_finder->is_adaptive_random_bucket()) { + _get_tablet_ids(block, i, _tablet_ids); + } auto& where_clause = _schema->indexes()[i]->where_clause; if (where_clause != nullptr) { RETURN_IF_ERROR(_filter_block_by_skip_and_where_clause(block, where_clause, @@ -525,7 +537,9 @@ void VRowDistribution::_reset_row_part_tablet_ids( // This is important for performance. row_ids.reserve(rows); partition_ids.reserve(rows); - tablet_ids.reserve(rows); + if (!_tablet_finder->is_adaptive_random_bucket()) { + tablet_ids.reserve(rows); + } } } diff --git a/be/src/exec/sink/vrow_distribution.h b/be/src/exec/sink/vrow_distribution.h index afc669f95bc63d..cdd4f5f88cbdbd 100644 --- a/be/src/exec/sink/vrow_distribution.h +++ b/be/src/exec/sink/vrow_distribution.h @@ -57,7 +57,12 @@ class RowPartTabletIds { std::string value; value.reserve(row_ids.size() * 15); for (int i = 0; i < row_ids.size(); i++) { - value.append(fmt::format("[{}, {}, {}]", row_ids[i], partition_ids[i], tablet_ids[i])); + if (i < tablet_ids.size()) { + value.append( + fmt::format("[{}, {}, {}]", row_ids[i], partition_ids[i], tablet_ids[i])); + } else { + value.append(fmt::format("[{}, {}]", row_ids[i], partition_ids[i])); + } } return value; } diff --git a/be/src/exec/sink/vtablet_finder.cpp b/be/src/exec/sink/vtablet_finder.cpp index edbf6475560d09..189d7f57cf8cc7 100644 --- a/be/src/exec/sink/vtablet_finder.cpp +++ b/be/src/exec/sink/vtablet_finder.cpp @@ -22,16 +22,87 @@ #include #include +#include #include #include #include "common/compiler_util.h" // IWYU pragma: keep +#include "common/config.h" #include "common/status.h" #include "core/block/block.h" #include "runtime/runtime_state.h" #include "storage/tablet_info.h" namespace doris { + +void AdaptiveRandomBucketState::init_partition(int32_t sender_id, int64_t partition_id, + const std::vector& tablets, + const std::vector& bucket_seqs, + int32_t start_tablet_idx) { + if (sender_id < 0 || partition_id < 0 || tablets.empty()) { + return; + } + std::lock_guard lock(_mutex); + auto& partition_states = _sender_partition_states[sender_id]; + if (partition_states.contains(partition_id)) { + return; + } + + PartitionState state; + state.partition_id = partition_id; + state.tablets = tablets; + state.bucket_seqs = bucket_seqs; + if (start_tablet_idx >= 0 && start_tablet_idx < state.tablets.size()) { + state.tablet_pos = start_tablet_idx; + } + state.current_tablet_id = state.tablets[state.tablet_pos]; + + partition_states.emplace(partition_id, std::move(state)); + LOG(INFO) << "FIND_TABLET_RANDOM_BUCKET: load_id=" << _load_id << ", sender_id=" << sender_id + << ", partition=" << partition_id << ", local tablet count=" << tablets.size() + << ", start tablet=" << partition_states.at(partition_id).current_tablet_id; +} + +int64_t AdaptiveRandomBucketState::current_tablet(int32_t sender_id, int64_t partition_id) { + std::lock_guard lock(_mutex); + auto sender_it = _sender_partition_states.find(sender_id); + if (sender_it == _sender_partition_states.end()) { + return -1; + } + auto it = sender_it->second.find(partition_id); + if (it == sender_it->second.end()) { + return -1; + } + return it->second.current_tablet_id; +} + +void AdaptiveRandomBucketState::rotate_by_tablet(int32_t sender_id, int64_t partition_id, + int64_t tablet_id) { + if (!config::enable_adaptive_random_bucket_load_bucket_rotation) { + return; + } + std::lock_guard lock(_mutex); + auto sender_it = _sender_partition_states.find(sender_id); + if (sender_it == _sender_partition_states.end()) { + return; + } + auto state_it = sender_it->second.find(partition_id); + if (state_it == sender_it->second.end()) { + return; + } + auto& state = state_it->second; + if (state.current_tablet_id != tablet_id) { + return; + } + int32_t next_pos = (state.tablet_pos + 1) % static_cast(state.tablets.size()); + LOG(INFO) << "FIND_TABLET_RANDOM_BUCKET: load_id=" << _load_id << ", sender_id=" << sender_id + << ", partition=" << state.partition_id << " rotate tablet " + << state.current_tablet_id << " -> " << state.tablets[next_pos] + << " after tablet=" << tablet_id << " memtable flushed"; + state.tablet_pos = next_pos; + state.current_tablet_id = state.tablets[next_pos]; +} + Status OlapTabletFinder::find_tablets(RuntimeState* state, Block* block, int rows, std::vector& partitions, std::vector& tablet_index, std::vector& skip, @@ -82,8 +153,11 @@ Status OlapTabletFinder::find_tablets(RuntimeState* state, Block* block, int row if (_find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_ROW) { _vpartition->find_tablets(block, qualified_rows, partitions, tablet_index); + } else if (_find_tablet_mode == FindTabletMode::FIND_TABLET_RANDOM_BUCKET) { + // Receiver-side random bucket mode only needs partition ids on sender side. + // The receiver decides the concrete tablet from its local ordered tablet list. } else { - // for random distribution + // FIND_TABLET_EVERY_BATCH / FIND_TABLET_EVERY_SINK _vpartition->find_tablets(block, qualified_rows, partitions, tablet_index, &_partition_to_tablet_map); if (_find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_BATCH) { diff --git a/be/src/exec/sink/vtablet_finder.h b/be/src/exec/sink/vtablet_finder.h index 32b173985e61e9..f475dd1b933e93 100644 --- a/be/src/exec/sink/vtablet_finder.h +++ b/be/src/exec/sink/vtablet_finder.h @@ -19,15 +19,45 @@ #include #include +#include +#include +#include +#include #include "common/status.h" #include "core/block/block.h" #include "exec/common/hash_table/phmap_fwd_decl.h" #include "storage/tablet_info.h" #include "util/bitmap.h" +#include "util/uid_util.h" namespace doris { +class AdaptiveRandomBucketState { +public: + explicit AdaptiveRandomBucketState(UniqueId load_id) : _load_id(load_id) {} + + void init_partition(int32_t sender_id, int64_t partition_id, + const std::vector& tablets, + const std::vector& bucket_seqs, int32_t start_tablet_idx); + int64_t current_tablet(int32_t sender_id, int64_t partition_id); + void rotate_by_tablet(int32_t sender_id, int64_t partition_id, int64_t tablet_id); + +private: + struct PartitionState { + int64_t partition_id = -1; + std::vector tablets; + std::vector bucket_seqs; + int32_t tablet_pos = 0; + int64_t current_tablet_id = -1; + }; + + std::mutex _mutex; + UniqueId _load_id; + std::unordered_map> + _sender_partition_states; +}; + class OlapTabletFinder { public: // FIND_TABLET_EVERY_ROW is used for hash distribution info, which indicates that we @@ -37,7 +67,13 @@ class OlapTabletFinder { // FIND_TABLET_EVERY_SINK is used for random distribution info when load_to_single_tablet set to true, // which indicates that we should only compute tablet index in the corresponding partition once for the // whole time in olap table sink - enum FindTabletMode { FIND_TABLET_EVERY_ROW, FIND_TABLET_EVERY_BATCH, FIND_TABLET_EVERY_SINK }; + // FIND_TABLET_RANDOM_BUCKET is used for V1 receiver-side random bucket mode. + enum FindTabletMode { + FIND_TABLET_EVERY_ROW, + FIND_TABLET_EVERY_BATCH, + FIND_TABLET_EVERY_SINK, + FIND_TABLET_RANDOM_BUCKET + }; OlapTabletFinder(VOlapTablePartitionParam* vpartition, FindTabletMode mode) : _vpartition(vpartition), _find_tablet_mode(mode), _filter_bitmap(1024) {}; @@ -51,7 +87,14 @@ class OlapTabletFinder { return _find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_SINK; } - bool is_single_tablet() { return _partition_to_tablet_map.size() == 1; } + bool is_adaptive_random_bucket() const { + return _find_tablet_mode == FindTabletMode::FIND_TABLET_RANDOM_BUCKET; + } + + bool is_single_tablet() { + return _find_tablet_mode != FindTabletMode::FIND_TABLET_RANDOM_BUCKET && + _partition_to_tablet_map.size() == 1; + } // all partitions for multi find-processes of its relative writer. const flat_hash_set& partition_ids() { return _partition_ids; } diff --git a/be/src/exec/sink/writer/vtablet_writer.cpp b/be/src/exec/sink/writer/vtablet_writer.cpp index 47b67cbfc9e8d1..8caa11e5bef251 100644 --- a/be/src/exec/sink/writer/vtablet_writer.cpp +++ b/be/src/exec/sink/writer/vtablet_writer.cpp @@ -41,22 +41,13 @@ #include #include #include +#include #include #include #include "cloud/config.h" -#include "common/config.h" -#include "core/data_type/data_type.h" -#include "cpp/sync_point.h" -#include "exec/sink/vrow_distribution.h" -#include "exprs/vexpr_fwd.h" -#include "runtime/runtime_profile.h" - -#ifdef DEBUG -#include -#endif - #include "common/compiler_util.h" // IWYU pragma: keep +#include "common/config.h" #include "common/logging.h" #include "common/metrics/doris_metrics.h" #include "common/object_pool.h" @@ -65,14 +56,19 @@ #include "core/block/block.h" #include "core/column/column.h" #include "core/column/column_const.h" +#include "core/data_type/data_type.h" #include "core/data_type/data_type_nullable.h" +#include "cpp/sync_point.h" +#include "exec/sink/vrow_distribution.h" #include "exec/sink/vtablet_block_convertor.h" #include "exec/sink/vtablet_finder.h" #include "exprs/vexpr.h" +#include "exprs/vexpr_fwd.h" #include "runtime/descriptors.h" #include "runtime/exec_env.h" #include "runtime/memory/memory_reclamation.h" #include "runtime/query_context.h" +#include "runtime/runtime_profile.h" #include "runtime/runtime_state.h" #include "runtime/thread_context.h" #include "service/backend_options.h" @@ -133,6 +129,21 @@ Status IndexChannel::init(RuntimeState* state, const std::vectorsecond; } channel->add_tablet(tablet); + if (_parent->_tablet_finder->is_adaptive_random_bucket() && config::is_cloud_mode()) { + for (const auto* part : _parent->_vpartition->get_partitions()) { + if (part->id != tablet.partition_id) { + continue; + } + const auto bucket_be_id = part->bucket_be_id > 0 + ? part->bucket_be_id + : BackendOptions::get_backend_id(); + if (bucket_be_id != replica_node_id) { + continue; + } + _channels_by_partition.emplace(tablet.partition_id, channel); + break; + } + } if (_parent->_write_single_replica) { auto* slave_location = _parent->_slave_location->find_tablet(tablet.tablet_id); if (slave_location != nullptr) { @@ -573,6 +584,8 @@ Status VNodeChannel::init(RuntimeState* state) { _cur_add_block_request->set_sender_id(_parent->_sender_id); _cur_add_block_request->set_backend_id(_node_id); _cur_add_block_request->set_eos(false); + _cur_add_block_request->set_is_receiver_side_random_bucket( + _parent->_tablet_finder->is_adaptive_random_bucket()); // add block closure // Has to using value to capture _task_exec_ctx because tablet writer may destroyed during callback. @@ -632,6 +645,8 @@ void VNodeChannel::_open_internal(bool is_incremental) { if (_parent->_t_sink.olap_table_sink.__isset.storage_vault_id) { request->set_storage_vault_id(_parent->_t_sink.olap_table_sink.storage_vault_id); } + request->set_is_receiver_side_random_bucket( + _parent->_tablet_finder->is_adaptive_random_bucket()); std::set deduper; for (auto& tablet : _tablets_wait_open) { if (deduper.contains(tablet.tablet_id)) { @@ -657,6 +672,79 @@ void VNodeChannel::_open_internal(bool is_incremental) { request->set_txn_expiration(_parent->_txn_expiration); request->set_write_file_cache(_parent->_write_file_cache); + if (_parent->_tablet_finder->is_adaptive_random_bucket()) { + std::unordered_map> partition_to_ordered_tablets; + std::unordered_map> partition_to_local_tablets; + for (const auto& tablet : _all_tablets) { + partition_to_ordered_tablets[tablet.partition_id].push_back(tablet.tablet_id); + partition_to_local_tablets[tablet.partition_id].insert(tablet.tablet_id); + } + std::unordered_map id_to_partition; + for (const auto* part : _parent->_vpartition->get_partitions()) { + id_to_partition.emplace(part->id, part); + } + for (const auto& [partition_id, ordered_tablets] : partition_to_ordered_tablets) { + auto partition_it = id_to_partition.find(partition_id); + if (partition_it == id_to_partition.end()) { + LOG(WARNING) << "unknown partition for adaptive random bucket, load_id=" + << _parent->_load_id << ", partition_id=" << partition_id; + continue; + } + std::vector selected_ordered_tablets; + const auto& local_bucket_seqs = partition_it->second->local_bucket_seqs; + if (!local_bucket_seqs.empty()) { + const std::vector* full_ordered_tablets = nullptr; + for (const auto& index : partition_it->second->indexes) { + if (index.index_id == _index_channel->_index_id) { + full_ordered_tablets = &index.tablets; + break; + } + } + if (full_ordered_tablets == nullptr) { + LOG(WARNING) << "unknown index for adaptive random bucket, load_id=" + << _parent->_load_id << ", partition_id=" << partition_id + << ", index_id=" << _index_channel->_index_id; + continue; + } + for (auto bucket_seq : local_bucket_seqs) { + if (bucket_seq < 0 || + bucket_seq >= cast_set(full_ordered_tablets->size())) { + LOG(WARNING) + << "invalid local bucket seq, load_id=" << _parent->_load_id + << ", partition_id=" << partition_id + << ", bucket_seq=" << bucket_seq + << ", full_ordered_tablets_size=" << full_ordered_tablets->size(); + continue; + } + auto tablet_id = (*full_ordered_tablets)[bucket_seq]; + if (!partition_to_local_tablets[partition_id].contains(tablet_id)) { + LOG(WARNING) + << "skip non-local tablet selected by local bucket seq, load_id=" + << _parent->_load_id << ", partition_id=" << partition_id + << ", bucket_seq=" << bucket_seq << ", tablet_id=" << tablet_id + << ", node_id=" << _node_id; + continue; + } + selected_ordered_tablets.push_back(tablet_id); + } + } else { + selected_ordered_tablets = ordered_tablets; + } + if (selected_ordered_tablets.empty()) { + VLOG_DEBUG << "skip adaptive random bucket partition without selected local " + "tablet, load_id=" + << _parent->_load_id << ", partition_id=" << partition_id + << ", node_id=" << _node_id; + continue; + } + auto* random_bucket_partition = request->add_random_bucket_partitions(); + random_bucket_partition->set_partition_id(partition_id); + for (auto tablet_id : selected_ordered_tablets) { + random_bucket_partition->add_ordered_tablet_ids(tablet_id); + } + } + } + if (_wg_id > 0) { request->set_workload_group_id(_wg_id); } @@ -728,9 +816,11 @@ Status VNodeChannel::open_wait() { Status VNodeChannel::add_block(Block* block, const Payload* payload) { SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get()); - if (payload->second.empty()) { + if (payload->row_part_tablet_ids == nullptr || payload->row_ids == nullptr || + payload->row_ids->empty()) { return Status::OK(); } + DCHECK_EQ(payload->row_ids->size(), payload->route_idxs.size()); // If add_block() when _eos_is_produced==true, there must be sth wrong, we can only mark this channel as failed. auto st = none_of({_cancelled, _eos_is_produced}); if (!st.ok()) { @@ -777,13 +867,17 @@ Status VNodeChannel::add_block(Block* block, const Payload* payload) { } SCOPED_RAW_TIMER(&_stat.append_node_channel_ns); - st = block->append_to_block_by_selector(_cur_mutable_block.get(), *(payload->first)); + st = block->append_to_block_by_selector(_cur_mutable_block.get(), *payload->row_ids); if (!st.ok()) { _cancel_with_msg(fmt::format("{}, err: {}", channel_info(), st.to_string())); return st; } - for (auto tablet_id : payload->second) { - _cur_add_block_request->add_tablet_ids(tablet_id); + auto* row_part_tablet_ids = payload->row_part_tablet_ids; + for (uint32_t route_idx : payload->route_idxs) { + _cur_add_block_request->add_partition_ids(row_part_tablet_ids->partition_ids[route_idx]); + if (!_parent->_tablet_finder->is_adaptive_random_bucket()) { + _cur_add_block_request->add_tablet_ids(row_part_tablet_ids->tablet_ids[route_idx]); + } } _write_bytes.fetch_add(_cur_mutable_block->bytes()); @@ -808,6 +902,7 @@ Status VNodeChannel::add_block(Block* block, const Payload* payload) { } _cur_mutable_block = MutableBlock::create_unique(block->clone_empty()); _cur_add_block_request->clear_tablet_ids(); + _cur_add_block_request->clear_partition_ids(); } return Status::OK(); @@ -930,9 +1025,18 @@ void VNodeChannel::try_send_pending_block(RuntimeState* state) { // tablet_ids has already set when add row request->set_packet_seq(_next_packet_seq); auto block = mutable_block->to_block(); - CHECK(block.rows() == request->tablet_ids_size()) - << "block rows: " << block.rows() - << ", tablet_ids_size: " << request->tablet_ids_size(); + int request_rows = request->is_receiver_side_random_bucket() && !request->eos() + ? request->partition_ids_size() + : request->tablet_ids_size(); + if (block.rows() != request_rows) { + cancel( + fmt::format("{}, err: invalid add block request row count, block rows: {}, " + "request rows: {}, receiver_side_random_bucket: {}, eos: {}", + channel_info(), block.rows(), request_rows, + request->is_receiver_side_random_bucket(), request->eos())); + _send_block_callback->clear_in_flight(); + return; + } if (block.rows() > 0) { SCOPED_ATOMIC_TIMER(&_serialize_batch_ns); size_t uncompressed_bytes = 0, compressed_bytes = 0; @@ -973,8 +1077,10 @@ void VNodeChannel::try_send_pending_block(RuntimeState* state) { } if (request->eos()) { - for (auto pid : _parent->_tablet_finder->partition_ids()) { - request->add_partition_ids(pid); + if (!request->is_receiver_side_random_bucket() || !request->has_block()) { + for (auto pid : _parent->_tablet_finder->partition_ids()) { + request->add_partition_ids(pid); + } } request->set_write_single_replica(_parent->_write_single_replica); @@ -1322,14 +1428,25 @@ void VNodeChannel::mark_close(bool hang_wait) { return; } - _cur_add_block_request->set_eos(true); - _cur_add_block_request->set_hang_wait(hang_wait); + bool need_receiver_side_random_bucket_eos = + _cur_add_block_request->is_receiver_side_random_bucket(); { std::lock_guard l(_pending_batches_lock); if (!_cur_mutable_block) [[unlikely]] { // never had a block arrived. add a dummy block _cur_mutable_block = MutableBlock::create_unique(); } + if (need_receiver_side_random_bucket_eos && _cur_mutable_block->rows() > 0) { + _cur_add_block_request->set_eos(false); + auto tmp_add_block_request = + std::make_shared(*_cur_add_block_request); + _pending_blocks.emplace(std::move(_cur_mutable_block), tmp_add_block_request); + _pending_batches_num++; + _cur_add_block_request->clear_partition_ids(); + _cur_mutable_block = MutableBlock::create_unique(); + } + _cur_add_block_request->set_eos(true); + _cur_add_block_request->set_hang_wait(hang_wait); auto tmp_add_block_request = std::make_shared(*_cur_add_block_request); // when prepare to close, add block to queue so that try_send_pending_block thread will send it. @@ -1535,13 +1652,21 @@ Status VTabletWriter::_init(RuntimeState* state, RuntimeProfile* profile) { if (table_sink.__isset.send_batch_parallelism && table_sink.send_batch_parallelism > 1) { _send_batch_parallelism = table_sink.send_batch_parallelism; } - // if distributed column list is empty, we can ensure that tablet is with random distribution info - // and if load_to_single_tablet is set and set to true, we should find only one tablet in one partition - // for the whole olap table sink + // If distributed column list is empty, the table uses random distribution. + // Mode priority (highest to lowest): + // 1. FIND_TABLET_EVERY_SINK: load_to_single_tablet=true (legacy single-tablet mode). + // 2. FIND_TABLET_RANDOM_BUCKET: FE set enable_adaptive_random_bucket on the sink, + // meaning enable_adaptive_random_bucket_load is ON. Using a sink-level flag (mirroring + // load_to_single_tablet) ensures the mode is fixed correctly when the initial + // partition list is empty (e.g. auto-partition tables on first load). + // 3. FIND_TABLET_EVERY_BATCH: default round-robin per batch. auto find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_ROW; if (table_sink.partition.distributed_columns.empty()) { if (table_sink.__isset.load_to_single_tablet && table_sink.load_to_single_tablet) { find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_SINK; + } else if (table_sink.__isset.enable_adaptive_random_bucket && + table_sink.enable_adaptive_random_bucket && config::is_cloud_mode()) { + find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_RANDOM_BUCKET; } else { find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_BATCH; } @@ -2012,43 +2137,72 @@ Status VTabletWriter::close(Status exec_status) { return _close_status; } -void VTabletWriter::_generate_one_index_channel_payload( +Status VTabletWriter::_generate_one_index_channel_payload( RowPartTabletIds& row_part_tablet_id, int32_t index_idx, ChannelDistributionPayload& channel_payload) { auto& row_ids = row_part_tablet_id.row_ids; + auto& partition_ids = row_part_tablet_id.partition_ids; auto& tablet_ids = row_part_tablet_id.tablet_ids; size_t row_cnt = row_ids.size(); for (size_t i = 0; i < row_ids.size(); i++) { + if (_tablet_finder->is_adaptive_random_bucket() && config::is_cloud_mode()) { + auto partition_it = _channels[index_idx]->_channels_by_partition.find(partition_ids[i]); + if (partition_it == _channels[index_idx]->_channels_by_partition.end()) { + return Status::InternalError( + "unknown partition channel, load_id={}, index_id={}, partition_id={}", + print_id(_load_id), _channels[index_idx]->_index_id, partition_ids[i]); + } + auto payload_it = + channel_payload.find(partition_it->second.get()); // + if (payload_it == channel_payload.end()) { + auto [tmp_it, _] = channel_payload.emplace( + partition_it->second.get(), + Payload {std::make_unique(), &row_part_tablet_id, + std::vector()}); + payload_it = tmp_it; + payload_it->second.row_ids->reserve(row_cnt); + payload_it->second.route_idxs.reserve(row_cnt); + } + payload_it->second.row_ids->push_back(row_ids[i]); + payload_it->second.route_idxs.push_back(cast_set(i)); + continue; + } + // (tablet_id, VNodeChannel) where this tablet locate auto it = _channels[index_idx]->_channels_by_tablet.find(tablet_ids[i]); - DCHECK(it != _channels[index_idx]->_channels_by_tablet.end()) - << "unknown tablet, tablet_id=" << tablet_ids[i]; + if (it == _channels[index_idx]->_channels_by_tablet.end()) { + return Status::InternalError("unknown tablet, load_id={}, index_id={}, tablet_id={}", + print_id(_load_id), _channels[index_idx]->_index_id, + tablet_ids[i]); + } std::vector>& tablet_locations = it->second; for (const auto& locate_node : tablet_locations) { auto payload_it = channel_payload.find(locate_node.get()); // if (payload_it == channel_payload.end()) { auto [tmp_it, _] = channel_payload.emplace( - locate_node.get(), - Payload {std::make_unique(), std::vector()}); + locate_node.get(), Payload {std::make_unique(), + &row_part_tablet_id, std::vector()}); payload_it = tmp_it; - payload_it->second.first->reserve(row_cnt); - payload_it->second.second.reserve(row_cnt); + payload_it->second.row_ids->reserve(row_cnt); + payload_it->second.route_idxs.reserve(row_cnt); } - payload_it->second.first->push_back(row_ids[i]); - payload_it->second.second.push_back(tablet_ids[i]); + payload_it->second.row_ids->push_back(row_ids[i]); + payload_it->second.route_idxs.push_back(cast_set(i)); } } + return Status::OK(); } -void VTabletWriter::_generate_index_channels_payloads( +Status VTabletWriter::_generate_index_channels_payloads( std::vector& row_part_tablet_ids, ChannelDistributionPayloadVec& payload) { for (int i = 0; i < _schema->indexes().size(); i++) { - _generate_one_index_channel_payload(row_part_tablet_ids[i], i, payload[i]); + RETURN_IF_ERROR(_generate_one_index_channel_payload(row_part_tablet_ids[i], i, payload[i])); } + return Status::OK(); } Status VTabletWriter::write(RuntimeState* state, doris::Block& input_block) { @@ -2091,8 +2245,10 @@ Status VTabletWriter::write(RuntimeState* state, doris::Block& input_block) { ChannelDistributionPayloadVec channel_to_payload; channel_to_payload.resize(_channels.size()); - _generate_index_channels_payloads(_row_part_tablet_ids, channel_to_payload); + Status generate_payload_status = + _generate_index_channels_payloads(_row_part_tablet_ids, channel_to_payload); _row_distribution_watch.stop(); + RETURN_IF_ERROR(generate_payload_status); // Add block to node channel for (size_t i = 0; i < _channels.size(); i++) { diff --git a/be/src/exec/sink/writer/vtablet_writer.h b/be/src/exec/sink/writer/vtablet_writer.h index d3e6e8da0f1af0..8206665a407ae3 100644 --- a/be/src/exec/sink/writer/vtablet_writer.h +++ b/be/src/exec/sink/writer/vtablet_writer.h @@ -220,8 +220,11 @@ struct WriterStats { VNodeChannelStat channel_stat; }; -// pair -using Payload = std::pair, std::vector>; +struct Payload { + std::unique_ptr row_ids; + RowPartTabletIds* row_part_tablet_ids = nullptr; + std::vector route_idxs; +}; // every NodeChannel keeps a data transmission channel with one BE. for multiple times open, it has a dozen of requests and corresponding closures. class VNodeChannel { @@ -594,6 +597,8 @@ class IndexChannel { std::unordered_map> _node_channels; // from tablet_id to backend channel std::unordered_map>> _channels_by_tablet; + // from partition_id to FE-planned bucket owner channel in cloud receiver-side random bucket mode + std::unordered_map> _channels_by_partition; bool _has_inc_node = false; // lock to protect _failed_channels and _failed_channels_msgs @@ -651,12 +656,12 @@ class VTabletWriter final : public AsyncResultWriter { Status _init(RuntimeState* state, RuntimeProfile* profile); - void _generate_one_index_channel_payload(RowPartTabletIds& row_part_tablet_tuple, - int32_t index_idx, - ChannelDistributionPayload& channel_payload); + Status _generate_one_index_channel_payload(RowPartTabletIds& row_part_tablet_tuple, + int32_t index_idx, + ChannelDistributionPayload& channel_payload); - void _generate_index_channels_payloads(std::vector& row_part_tablet_ids, - ChannelDistributionPayloadVec& payload); + Status _generate_index_channels_payloads(std::vector& row_part_tablet_ids, + ChannelDistributionPayloadVec& payload); void _cancel_all_channel(Status status); diff --git a/be/src/exec/sink/writer/vtablet_writer_v2.cpp b/be/src/exec/sink/writer/vtablet_writer_v2.cpp index 06e35210635208..6ee036de1bdf10 100644 --- a/be/src/exec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/exec/sink/writer/vtablet_writer_v2.cpp @@ -158,9 +158,10 @@ Status VTabletWriterV2::_init(RuntimeState* state, RuntimeProfile* profile) { _location = _pool->add(new OlapTableLocationParam(table_sink.location)); _nodes_info = _pool->add(new DorisNodesInfo(table_sink.nodes_info)); - // if distributed column list is empty, we can ensure that tablet is with random distribution info - // and if load_to_single_tablet is set and set to true, we should find only one tablet in one partition - // for the whole olap table sink + // If distributed column list is empty, the table uses random distribution. + // Mode priority (highest to lowest): + // 1. FIND_TABLET_EVERY_SINK: load_to_single_tablet=true (legacy single-tablet mode). + // 2. FIND_TABLET_EVERY_BATCH: default round-robin per batch. auto find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_ROW; if (table_sink.partition.distributed_columns.empty()) { if (table_sink.__isset.load_to_single_tablet && table_sink.load_to_single_tablet) { @@ -559,6 +560,7 @@ Status VTabletWriterV2::_write_memtable(std::shared_ptr block, int64_t ta .is_high_priority = _is_high_priority, .write_file_cache = _write_file_cache, .storage_vault_id {}, + .enable_table_memtable_backpressure = _tablet_finder->is_adaptive_random_bucket(), }; bool index_not_found = true; for (const auto& index : _schema->indexes()) { @@ -587,13 +589,15 @@ Status VTabletWriterV2::_write_memtable(std::shared_ptr block, int64_t ta { SCOPED_TIMER(_wait_mem_limit_timer); ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush( - [state = _state]() { return state->is_cancelled(); }); + [state = _state]() { return state->is_cancelled(); }, + _state->workload_group().get()); if (_state->is_cancelled()) { return _state->cancel_reason(); } } SCOPED_TIMER(_write_memtable_timer); - st = delta_writer->write(block.get(), rows.row_idxes); + bool memtable_flushed = false; + st = delta_writer->write(block.get(), rows.row_idxes, &memtable_flushed); return st; } diff --git a/be/src/load/channel/load_channel_mgr.cpp b/be/src/load/channel/load_channel_mgr.cpp index 65ec0ed5ec80af..a5f655b5fb67c0 100644 --- a/be/src/load/channel/load_channel_mgr.cpp +++ b/be/src/load/channel/load_channel_mgr.cpp @@ -166,7 +166,7 @@ Status LoadChannelMgr::add_batch(const PTabletWriterAddBlockRequest& request, // because this may block for a while, which may lead to rpc timeout. SCOPED_TIMER(channel->get_handle_mem_limit_timer()); ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush( - [channel]() { return channel->is_cancelled(); }); + [channel]() { return channel->is_cancelled(); }, channel->workload_group().get()); if (channel->is_cancelled()) { return Status::Cancelled("LoadChannel has been cancelled: {}.", load_id.to_string()); } diff --git a/be/src/load/channel/tablets_channel.cpp b/be/src/load/channel/tablets_channel.cpp index 21737a00303910..7d3f44a13a77c3 100644 --- a/be/src/load/channel/tablets_channel.cpp +++ b/be/src/load/channel/tablets_channel.cpp @@ -136,7 +136,11 @@ Status BaseTabletsChannel::open(const PTabletWriterOpenRequest& request) { std::lock_guard l(_lock); // if _state is kOpened, it's a normal case, already open by other sender // if _state is kFinished, already cancelled by other sender - if (_state == kOpened || _state == kFinished) { + if (_state == kOpened) { + RETURN_IF_ERROR(_init_receiver_side_random_bucket_state(request)); + return Status::OK(); + } + if (_state == kFinished) { return Status::OK(); } _txn_id = request.txn_id(); @@ -144,7 +148,6 @@ Status BaseTabletsChannel::open(const PTabletWriterOpenRequest& request) { _schema = std::make_shared(); RETURN_IF_ERROR(_schema->init(request.schema())); _tuple_desc = _schema->tuple_desc(); - int max_sender = request.num_senders(); /* * a tablets channel in reciever is related to a bulk of VNodeChannel of sender. each instance one or none. @@ -175,6 +178,7 @@ Status BaseTabletsChannel::open(const PTabletWriterOpenRequest& request) { _closed_senders.Reset(max_sender); RETURN_IF_ERROR(_open_all_writers(request)); + RETURN_IF_ERROR(_init_receiver_side_random_bucket_state(request)); _state = kOpened; return Status::OK(); @@ -243,6 +247,7 @@ Status BaseTabletsChannel::incremental_open(const PTabletWriterOpenRequest& para wrequest.txn_expiration = params.txn_expiration(); // Required by CLOUD. wrequest.write_file_cache = params.write_file_cache(); wrequest.storage_vault_id = params.storage_vault_id(); + wrequest.enable_table_memtable_backpressure = params.is_receiver_side_random_bucket(); auto delta_writer = create_delta_writer(wrequest); { @@ -256,11 +261,43 @@ Status BaseTabletsChannel::incremental_open(const PTabletWriterOpenRequest& para _s_tablet_writer_count += incremental_tablet_num; LOG(INFO) << ss.str(); + RETURN_IF_ERROR(_init_receiver_side_random_bucket_state(params)); _state = kOpened; return Status::OK(); } +Status BaseTabletsChannel::_init_receiver_side_random_bucket_state( + const PTabletWriterOpenRequest& request) { + if (!request.is_receiver_side_random_bucket() || request.random_bucket_partitions_size() == 0) { + return Status::OK(); + } + if (_adaptive_random_bucket_state == nullptr) { + _adaptive_random_bucket_state = std::make_shared(_load_id); + } + for (const auto& partition : request.random_bucket_partitions()) { + if (partition.ordered_tablet_ids_size() == 0) { + return Status::InternalError( + "ordered_tablet_ids is empty for receiver-side random bucket, load_id={}, " + "sender_id={}, partition_id={}", + print_id(_load_id), request.sender_id(), partition.partition_id()); + } + std::vector ordered_positions; + ordered_positions.reserve(partition.ordered_tablet_ids_size()); + for (int i = 0; i < partition.ordered_tablet_ids_size(); ++i) { + ordered_positions.push_back(cast_set(i)); + } + std::vector ordered_tablet_ids; + ordered_tablet_ids.reserve(partition.ordered_tablet_ids_size()); + for (auto tablet_id : partition.ordered_tablet_ids()) { + ordered_tablet_ids.push_back(tablet_id); + } + _adaptive_random_bucket_state->init_partition(request.sender_id(), partition.partition_id(), + ordered_tablet_ids, ordered_positions, 0); + } + return Status::OK(); +} + std::unique_ptr TabletsChannel::create_delta_writer(const WriteRequest& request) { DCHECK(request.write_req_type == WriteRequestType::DATA); DCHECK(request.table_schema_param != nullptr); @@ -561,6 +598,7 @@ Status BaseTabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& req .is_high_priority = _is_high_priority, .write_file_cache = request.write_file_cache(), .storage_vault_id = request.storage_vault_id(), + .enable_table_memtable_backpressure = request.is_receiver_side_random_bucket(), }; auto delta_writer = create_delta_writer(wrequest); @@ -614,9 +652,15 @@ Status BaseTabletsChannel::_write_block_data( [[maybe_unused]] size_t uncompressed_size = 0; [[maybe_unused]] int64_t uncompressed_time = 0; RETURN_IF_ERROR(send_data.deserialize(request.block(), &uncompressed_size, &uncompressed_time)); - CHECK(send_data.rows() == request.tablet_ids_size()) - << "block rows: " << send_data.rows() - << ", tablet_ids_size: " << request.tablet_ids_size(); + int request_rows = request.is_receiver_side_random_bucket() ? request.partition_ids_size() + : request.tablet_ids_size(); + if (send_data.rows() != request_rows) { + return Status::InternalError( + "invalid add block request row count, load_id={}, index_id={}, packet_seq={}, " + "block_rows={}, request_rows={}", + print_id(_load_id), _index_id, request.packet_seq(), send_data.rows(), + request_rows); + } g_tablets_channel_send_data_allocated_size << send_data.allocated_bytes(); Defer defer { @@ -658,10 +702,10 @@ Status BaseTabletsChannel::_write_block_data( SCOPED_TIMER(_write_block_timer); auto* tablet_load_infos = response->mutable_tablet_load_rowset_num_infos(); for (const auto& tablet_to_rowidxs_it : tablet_to_rowidxs) { + bool memtable_flushed = false; RETURN_IF_ERROR(write_tablet_data(tablet_to_rowidxs_it.first, [&](BaseDeltaWriter* writer) { - return writer->write(&send_data, tablet_to_rowidxs_it.second); + return writer->write(&send_data, tablet_to_rowidxs_it.second, &memtable_flushed); })); - auto tablet_writer_it = _tablet_writers.find(tablet_to_rowidxs_it.first); if (tablet_writer_it != _tablet_writers.end()) { tablet_writer_it->second->set_tablet_load_rowset_num_info(tablet_load_infos); @@ -675,6 +719,160 @@ Status BaseTabletsChannel::_write_block_data( return Status::OK(); } +std::shared_ptr BaseTabletsChannel::_get_partition_route_lock(int64_t partition_id) { + std::lock_guard l(_partition_route_locks_lock); + auto& lock = _partition_route_locks[partition_id]; + if (lock == nullptr) { + lock = std::make_shared(); + } + return lock; +} + +Status BaseTabletsChannel::_write_block_data_for_receiver_side_random_bucket( + const PTabletWriterAddBlockRequest& request, int64_t cur_seq, + std::unordered_map>& partition_to_rowidxs, + PTabletWriterAddBlockResult* response) { + Block send_data; + [[maybe_unused]] size_t uncompressed_size = 0; + [[maybe_unused]] int64_t uncompressed_time = 0; + RETURN_IF_ERROR(send_data.deserialize(request.block(), &uncompressed_size, &uncompressed_time)); + if (send_data.rows() != request.partition_ids_size()) { + return Status::InternalError( + "invalid receiver-side random bucket add block request row count, load_id={}, " + "index_id={}, packet_seq={}, block_rows={}, partition_ids_size={}", + print_id(_load_id), _index_id, request.packet_seq(), send_data.rows(), + request.partition_ids_size()); + } + + { + std::lock_guard l(_lock); + for (const auto& [partition_id, _] : partition_to_rowidxs) { + _partition_ids.emplace(partition_id); + } + } + + g_tablets_channel_send_data_allocated_size << send_data.allocated_bytes(); + Defer defer { + [&]() { g_tablets_channel_send_data_allocated_size << -send_data.allocated_bytes(); }}; + + auto* tablet_errors = response->mutable_tablet_errors(); + auto* tablet_load_infos = response->mutable_tablet_load_rowset_num_infos(); + + auto write_partition_data = [&](int64_t partition_id, + const DorisVector& row_idxs) -> Status { + auto partition_lock = _get_partition_route_lock(partition_id); + std::lock_guard partition_guard(*partition_lock); + + if (_adaptive_random_bucket_state == nullptr) { + return Status::InternalError( + "receiver-side random bucket state is not initialized, load_id={}, " + "index_id={}, packet_seq={}, partition_id={}", + print_id(_load_id), _index_id, request.packet_seq(), partition_id); + } + int64_t tablet_id = + _adaptive_random_bucket_state->current_tablet(request.sender_id(), partition_id); + if (tablet_id < 0) { + return Status::InternalError( + "invalid current tablet for receiver-side random bucket, load_id={}, " + "index_id={}, sender_id={}, packet_seq={}, partition_id={}", + print_id(_load_id), _index_id, request.sender_id(), request.packet_seq(), + partition_id); + } + VLOG_DEBUG << "FIND_TABLET_RANDOM_BUCKET: route+write begin" + << ", load_id=" << _load_id << ", index_id=" << _index_id + << ", sender_id=" << request.sender_id() + << ", packet_seq=" << request.packet_seq() << ", partition_id=" << partition_id + << ", tablet_id=" << tablet_id << ", row_count=" << row_idxs.size(); + + { + std::shared_lock broken_rlock(_broken_tablets_lock); + if (_is_broken_tablet(tablet_id)) { + return Status::InternalError( + "current tablet is broken for receiver-side random bucket, load_id={}, " + "index_id={}, sender_id={}, packet_seq={}, partition_id={}, tablet_id={}", + print_id(_load_id), _index_id, request.sender_id(), request.packet_seq(), + partition_id, tablet_id); + } + } + + BaseDeltaWriter* tablet_writer = nullptr; + { + std::lock_guard l(_tablet_writers_lock); + auto tablet_writer_it = _tablet_writers.find(tablet_id); + if (tablet_writer_it == _tablet_writers.end()) { + return Status::InternalError("unknown tablet to append data, tablet={}", tablet_id); + } + tablet_writer = tablet_writer_it->second.get(); + } + + bool memtable_flushed = false; + Status st = tablet_writer->write(&send_data, row_idxs, &memtable_flushed); + if (!st.ok()) { + auto err_msg = + fmt::format("tablet writer write failed, tablet_id={}, txn_id={}, err={}", + tablet_id, _txn_id, st.to_string()); + LOG(WARNING) << err_msg; + PTabletError* error = tablet_errors->Add(); + error->set_tablet_id(tablet_id); + error->set_msg(err_msg); + static_cast(tablet_writer->cancel_with_status(st)); + _add_broken_tablet(tablet_id); + return Status::OK(); + } + + VLOG_DEBUG << "FIND_TABLET_RANDOM_BUCKET: route+write done" + << ", load_id=" << _load_id << ", index_id=" << _index_id + << ", sender_id=" << request.sender_id() + << ", packet_seq=" << request.packet_seq() << ", partition_id=" << partition_id + << ", tablet_id=" << tablet_id << ", row_count=" << row_idxs.size() + << ", memtable_flushed=" << memtable_flushed; + if (memtable_flushed) { + _adaptive_random_bucket_state->rotate_by_tablet(request.sender_id(), partition_id, + tablet_id); + } + tablet_writer->set_tablet_load_rowset_num_info(tablet_load_infos); + return Status::OK(); + }; + + SCOPED_TIMER(_write_block_timer); + for (const auto& [partition_id, row_idxs] : partition_to_rowidxs) { + RETURN_IF_ERROR(write_partition_data(partition_id, row_idxs)); + } + + { + std::lock_guard l(_lock); + _next_seqs[request.sender_id()] = cur_seq + 1; + } + return Status::OK(); +} + +Status BaseTabletsChannel::_build_partition_to_rowidxs_for_receiver_side_random_bucket( + const PTabletWriterAddBlockRequest& request, + std::unordered_map>* partition_to_rowidxs) { + if (_adaptive_random_bucket_state == nullptr) { + return Status::InternalError( + "receiver-side random bucket state is not initialized, load_id={}, index_id={}, " + "packet_seq={}", + print_id(_load_id), _index_id, request.packet_seq()); + } + if (request.partition_ids_size() == 0) { + return Status::InternalError( + "empty partition ids for receiver-side random bucket add block, load_id={}, " + "index_id={}, packet_seq={}", + print_id(_load_id), _index_id, request.packet_seq()); + } + for (uint32_t i = 0; i < request.partition_ids_size(); ++i) { + int64_t partition_id = request.partition_ids(i); + auto it = partition_to_rowidxs->find(partition_id); + if (it == partition_to_rowidxs->end()) { + partition_to_rowidxs->emplace(partition_id, std::initializer_list {i}); + } else { + it->second.emplace_back(i); + } + } + return Status::OK(); +} + Status TabletsChannel::add_batch(const PTabletWriterAddBlockRequest& request, PTabletWriterAddBlockResult* response) { SCOPED_TIMER(_add_batch_timer); @@ -694,10 +892,18 @@ Status TabletsChannel::add_batch(const PTabletWriterAddBlockRequest& request, return Status::OK(); } + if (request.is_receiver_side_random_bucket()) { + std::unordered_map /* row index */> + partition_to_rowidxs; + RETURN_IF_ERROR(_build_partition_to_rowidxs_for_receiver_side_random_bucket( + request, &partition_to_rowidxs)); + return _write_block_data_for_receiver_side_random_bucket(request, cur_seq, + partition_to_rowidxs, response); + } + std::unordered_map /* row index */> tablet_to_rowidxs; _build_tablet_to_rowidxs(request, &tablet_to_rowidxs); - return _write_block_data(request, cur_seq, tablet_to_rowidxs, response); } diff --git a/be/src/load/channel/tablets_channel.h b/be/src/load/channel/tablets_channel.h index 289e8d48deabdc..c604e1f8cd7e6f 100644 --- a/be/src/load/channel/tablets_channel.h +++ b/be/src/load/channel/tablets_channel.h @@ -31,6 +31,7 @@ #include "common/status.h" #include "core/custom_allocator.h" +#include "exec/sink/vtablet_finder.h" #include "runtime/runtime_profile.h" #include "util/bitmap.h" #include "util/uid_util.h" @@ -121,9 +122,18 @@ class BaseTabletsChannel { bool is_finished() const { return _state == kFinished; } protected: + Status _init_receiver_side_random_bucket_state(const PTabletWriterOpenRequest& request); Status _write_block_data(const PTabletWriterAddBlockRequest& request, int64_t cur_seq, std::unordered_map>& tablet_to_rowidxs, PTabletWriterAddBlockResult* response); + Status _write_block_data_for_receiver_side_random_bucket( + const PTabletWriterAddBlockRequest& request, int64_t cur_seq, + std::unordered_map>& partition_to_rowidxs, + PTabletWriterAddBlockResult* response); + Status _build_partition_to_rowidxs_for_receiver_side_random_bucket( + const PTabletWriterAddBlockRequest& request, + std::unordered_map>* partition_to_rowidxs); + std::shared_ptr _get_partition_route_lock(int64_t partition_id); Status _get_current_seq(int64_t& cur_seq, const PTabletWriterAddBlockRequest& request); @@ -185,6 +195,9 @@ class BaseTabletsChannel { std::unordered_set _reducing_tablets; std::unordered_set _partition_ids; + std::shared_ptr _adaptive_random_bucket_state; + std::mutex _partition_route_locks_lock; + std::unordered_map> _partition_route_locks; static std::atomic _s_tablet_writer_count; diff --git a/be/src/load/delta_writer/delta_writer.cpp b/be/src/load/delta_writer/delta_writer.cpp index b8ef440d3ca18c..7b6bfb8b3690f2 100644 --- a/be/src/load/delta_writer/delta_writer.cpp +++ b/be/src/load/delta_writer/delta_writer.cpp @@ -36,6 +36,7 @@ #include "core/block/block.h" #include "io/fs/file_writer.h" // IWYU pragma: keep #include "load/memtable/memtable_flush_executor.h" +#include "load/memtable/memtable_memory_limiter.h" #include "runtime/exec_env.h" #include "runtime/thread_context.h" #include "service/backend_options.h" @@ -135,6 +136,11 @@ void BaseDeltaWriter::set_tablet_load_rowset_num_info( collect_tablet_load_rowset_num_info(tablet, tablet_infos); } +int64_t BaseDeltaWriter::table_id() const { + DORIS_CHECK(_req.table_schema_param != nullptr); + return _req.table_schema_param->table_id(); +} + DeltaWriter::~DeltaWriter() = default; Status BaseDeltaWriter::init() { @@ -155,10 +161,22 @@ Status BaseDeltaWriter::init() { return Status::OK(); } -Status DeltaWriter::write(const Block* block, const DorisVector& row_idxs) { +Status DeltaWriter::write(const Block* block, const DorisVector& row_idxs, + bool* memtable_flushed) { + if (memtable_flushed != nullptr) { + *memtable_flushed = false; + } if (UNLIKELY(row_idxs.empty())) { return Status::OK(); } + if (_req.enable_table_memtable_backpressure) { + ExecEnv::GetInstance()->memtable_memory_limiter()->handle_table_memtable_backpressure( + [this]() { + std::lock_guard l(_lock); + return _is_cancelled; + }, + table_id()); + } _lock_watch.start(); std::lock_guard l(_lock); _lock_watch.stop(); @@ -172,13 +190,24 @@ Status DeltaWriter::write(const Block* block, const DorisVector& row_i std::this_thread::sleep_for(std::chrono::milliseconds(10)); } } - return _memtable_writer->write(block, row_idxs); + return _memtable_writer->write(block, row_idxs, memtable_flushed); } Status BaseDeltaWriter::wait_flush() { return _memtable_writer->wait_flush(); } +Status BaseDeltaWriter::flush_memtable_async() { + return _memtable_writer->flush_async(); +} + +Status DeltaWriter::flush_memtable_async() { + _lock_watch.start(); + std::lock_guard l(_lock); + _lock_watch.stop(); + return BaseDeltaWriter::flush_memtable_async(); +} + Status DeltaWriter::close() { _lock_watch.start(); std::lock_guard l(_lock); diff --git a/be/src/load/delta_writer/delta_writer.h b/be/src/load/delta_writer/delta_writer.h index 2e6d180f2ee958..7f19f4500e0dca 100644 --- a/be/src/load/delta_writer/delta_writer.h +++ b/be/src/load/delta_writer/delta_writer.h @@ -61,7 +61,8 @@ class BaseDeltaWriter { virtual ~BaseDeltaWriter(); - virtual Status write(const Block* block, const DorisVector& row_idxs) = 0; + virtual Status write(const Block* block, const DorisVector& row_idxs, + bool* memtable_flushed = nullptr) = 0; // flush the last memtable to flush queue, must call it before build_rowset() virtual Status close() = 0; @@ -81,8 +82,12 @@ class BaseDeltaWriter { // Wait all memtable in flush queue to be flushed Status wait_flush(); + virtual Status flush_memtable_async(); + int64_t partition_id() const { return _req.partition_id; } + int64_t table_id() const; + int64_t tablet_id() const { return _req.tablet_id; } int64_t txn_id() const { return _req.txn_id; } @@ -130,10 +135,13 @@ class DeltaWriter final : public BaseDeltaWriter { ~DeltaWriter() override; - Status write(const Block* block, const DorisVector& row_idxs) override; + Status write(const Block* block, const DorisVector& row_idxs, + bool* memtable_flushed = nullptr) override; Status close() override; + Status flush_memtable_async() override; + Status cancel_with_status(const Status& st) override; Status build_rowset() override; diff --git a/be/src/load/delta_writer/delta_writer_context.h b/be/src/load/delta_writer/delta_writer_context.h index 61db94636002d0..bf23bde0fef1dc 100644 --- a/be/src/load/delta_writer/delta_writer_context.h +++ b/be/src/load/delta_writer/delta_writer_context.h @@ -51,6 +51,7 @@ struct WriteRequest { bool write_file_cache = false; WriteRequestType write_req_type = WriteRequestType::DATA; std::string storage_vault_id; + bool enable_table_memtable_backpressure = false; }; } // namespace doris diff --git a/be/src/load/delta_writer/delta_writer_v2.cpp b/be/src/load/delta_writer/delta_writer_v2.cpp index c117e2b3180799..0c6d06d0c96b7a 100644 --- a/be/src/load/delta_writer/delta_writer_v2.cpp +++ b/be/src/load/delta_writer/delta_writer_v2.cpp @@ -35,6 +35,7 @@ #include "core/block/block.h" #include "exec/sink/load_stream_stub.h" #include "io/fs/file_writer.h" // IWYU pragma: keep +#include "load/memtable/memtable_memory_limiter.h" #include "runtime/exec_env.h" #include "runtime/query_context.h" #include "service/backend_options.h" @@ -92,6 +93,11 @@ DeltaWriterV2::~DeltaWriterV2() { static_cast(_memtable_writer->cancel()); } +int64_t DeltaWriterV2::_table_id() const { + DORIS_CHECK(_req.table_schema_param != nullptr); + return _req.table_schema_param->table_id(); +} + Status DeltaWriterV2::init() { if (_is_init) { return Status::OK(); @@ -141,10 +147,18 @@ Status DeltaWriterV2::init() { return Status::OK(); } -Status DeltaWriterV2::write(const Block* block, const DorisVector& row_idxs) { +Status DeltaWriterV2::write(const Block* block, const DorisVector& row_idxs, + bool* memtable_flushed) { + if (memtable_flushed != nullptr) { + *memtable_flushed = false; + } if (UNLIKELY(row_idxs.empty())) { return Status::OK(); } + if (_req.enable_table_memtable_backpressure) { + ExecEnv::GetInstance()->memtable_memory_limiter()->handle_table_memtable_backpressure( + [state = _state]() { return state->is_cancelled(); }, _table_id()); + } _lock_watch.start(); std::lock_guard l(_lock); _lock_watch.stop(); @@ -164,7 +178,7 @@ Status DeltaWriterV2::write(const Block* block, const DorisVector& row } } SCOPED_RAW_TIMER(&_write_memtable_time); - return _memtable_writer->write(block, row_idxs); + return _memtable_writer->write(block, row_idxs, memtable_flushed); } Status DeltaWriterV2::close() { diff --git a/be/src/load/delta_writer/delta_writer_v2.h b/be/src/load/delta_writer/delta_writer_v2.h index d637c6afcbc2d1..1c284d4a418b4e 100644 --- a/be/src/load/delta_writer/delta_writer_v2.h +++ b/be/src/load/delta_writer/delta_writer_v2.h @@ -68,7 +68,8 @@ class DeltaWriterV2 { Status init(); - Status write(const Block* block, const DorisVector& row_idxs); + Status write(const Block* block, const DorisVector& row_idxs, + bool* memtable_flushed = nullptr); // flush the last memtable to flush queue, must call it before close_wait() Status close(); @@ -85,6 +86,7 @@ class DeltaWriterV2 { Status _build_current_tablet_schema(int64_t index_id, const OlapTableSchemaParam* table_schema_param, const TabletSchema& ori_tablet_schema); + int64_t _table_id() const; void _update_profile(RuntimeProfile* profile); diff --git a/be/src/load/memtable/memtable_memory_limiter.cpp b/be/src/load/memtable/memtable_memory_limiter.cpp index 8b44e04a2e6622..f98aea05f71937 100644 --- a/be/src/load/memtable/memtable_memory_limiter.cpp +++ b/be/src/load/memtable/memtable_memory_limiter.cpp @@ -24,7 +24,6 @@ #include "common/metrics/metrics.h" #include "load/memtable/memtable.h" #include "load/memtable/memtable_writer.h" -#include "runtime/workload_group/workload_group_manager.h" #include "util/mem_info.h" namespace doris { @@ -34,6 +33,12 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(memtable_memory_limiter_mem_consumption, Metr bvar::LatencyRecorder g_memtable_memory_limit_latency_ms("mm_limiter_limit_time_ms"); bvar::Adder g_memtable_memory_limit_waiting_threads("mm_limiter_waiting_threads"); +bvar::LatencyRecorder g_memtable_table_backpressure_latency_ms( + "mm_limiter_table_backpressure_time_ms"); +bvar::Adder g_memtable_table_backpressure_waiting_threads( + "mm_limiter_table_backpressure_waiting_threads"); +bvar::Status g_memtable_table_backpressure_pending_count( + "mm_limiter_table_backpressure_pending_count", 0); bvar::Status g_memtable_active_memory("mm_limiter_mem_active", 0); bvar::Status g_memtable_write_memory("mm_limiter_mem_write", 0); bvar::Status g_memtable_flush_memory("mm_limiter_mem_flush", 0); @@ -121,7 +126,69 @@ int64_t MemTableMemoryLimiter::_need_flush() { return need_flush - _queue_mem_usage - _flush_mem_usage; } -void MemTableMemoryLimiter::handle_memtable_flush(std::function cancel_check) { +int64_t MemTableMemoryLimiter::_table_flush_pending_memtable_count(int64_t table_id) { + int64_t pending_memtables = 0; + for (const auto& writer : _writers) { + auto writer_sptr = writer.lock(); + if (writer_sptr == nullptr) { + continue; + } + if (writer_sptr->table_id() == table_id) { + pending_memtables += writer_sptr->flush_pending_memtable_count(); + } + } + return pending_memtables; +} + +void MemTableMemoryLimiter::handle_table_memtable_backpressure(std::function cancel_check, + int64_t table_id) { + if (!config::enable_table_memtable_flush_backpressure) { + return; + } + const int64_t pending_count_limit = config::table_memtable_flush_pending_count_limit; + if (pending_count_limit <= 0) { + return; + } + DORIS_CHECK(table_id > 0); + + std::unique_lock l(_lock); + int64_t pending_count = _table_flush_pending_memtable_count(table_id); + if (pending_count < pending_count_limit) { + return; + } + + MonotonicStopWatch timer; + timer.start(); + g_memtable_table_backpressure_waiting_threads << 1; + while (pending_count >= pending_count_limit) { + g_memtable_table_backpressure_pending_count.set_value(pending_count); + LOG_EVERY_T(INFO, 1) << "table memtable flush backpressure: table_id=" << table_id + << ", pending_memtables=" << pending_count + << ", limit=" << pending_count_limit + << ", memtable writers num: " << _writers.size(); + if (cancel_check && cancel_check()) { + LOG(INFO) << "cancelled when waiting for table memtable flush backpressure" + << ", table_id=" << table_id << ", pending_memtables=" << pending_count + << ", limit=" << pending_count_limit; + g_memtable_table_backpressure_waiting_threads << -1; + return; + } + static_cast(_hard_limit_end_cond.wait_for(l, std::chrono::milliseconds(100))); + pending_count = _table_flush_pending_memtable_count(table_id); + } + g_memtable_table_backpressure_pending_count.set_value(pending_count); + g_memtable_table_backpressure_waiting_threads << -1; + timer.stop(); + int64_t time_ms = timer.elapsed_time() / 1000 / 1000; + g_memtable_table_backpressure_latency_ms << time_ms; + LOG(INFO) << "waited " << PrettyPrinter::print(timer.elapsed_time(), TUnit::TIME_NS) + << " for table memtable flush backpressure" + << ", table_id=" << table_id << ", pending_memtables=" << pending_count + << ", limit=" << pending_count_limit << ", memtable writers num: " << _writers.size(); +} + +void MemTableMemoryLimiter::handle_memtable_flush(std::function cancel_check, + WorkloadGroup*) { // Check the soft limit. DCHECK(_load_soft_mem_limit > 0); do { diff --git a/be/src/load/memtable/memtable_memory_limiter.h b/be/src/load/memtable/memtable_memory_limiter.h index 6fbcc3cf8c8073..11c5c0fdd61413 100644 --- a/be/src/load/memtable/memtable_memory_limiter.h +++ b/be/src/load/memtable/memtable_memory_limiter.h @@ -44,7 +44,8 @@ class MemTableMemoryLimiter { // If yes, it will flush memtable to try to reduce memory consumption. // Every write operation will call this API to check if need flush memtable OR hang // when memory is not available. - void handle_memtable_flush(std::function cancel_check); + void handle_memtable_flush(std::function cancel_check, WorkloadGroup* wg = nullptr); + void handle_table_memtable_backpressure(std::function cancel_check, int64_t table_id); void register_writer(std::weak_ptr writer); @@ -64,6 +65,7 @@ class MemTableMemoryLimiter { bool _hard_limit_reached(); bool _load_usage_low(); int64_t _need_flush(); + int64_t _table_flush_pending_memtable_count(int64_t table_id); int64_t _flush_active_memtables(int64_t need_flush); void _refresh_mem_tracker(); std::mutex _lock; diff --git a/be/src/load/memtable/memtable_writer.cpp b/be/src/load/memtable/memtable_writer.cpp index 865751d05e21b7..e97af807975bb2 100644 --- a/be/src/load/memtable/memtable_writer.cpp +++ b/be/src/load/memtable/memtable_writer.cpp @@ -42,6 +42,7 @@ #include "storage/schema_change/schema_change.h" #include "storage/storage_engine.h" #include "storage/tablet/tablet_schema.h" +#include "storage/tablet_info.h" #include "util/mem_info.h" #include "util/stopwatch.hpp" @@ -87,7 +88,11 @@ Status MemTableWriter::init(std::shared_ptr rowset_writer, return Status::OK(); } -Status MemTableWriter::write(const Block* block, const DorisVector& row_idxs) { +Status MemTableWriter::write(const Block* block, const DorisVector& row_idxs, + bool* memtable_flushed) { + if (memtable_flushed != nullptr) { + *memtable_flushed = false; + } if (UNLIKELY(row_idxs.empty())) { return Status::OK(); } @@ -112,6 +117,9 @@ Status MemTableWriter::write(const Block* block, const DorisVector& ro if (raw_rows + row_idxs.size() > std::numeric_limits::max()) { g_flush_cuz_rowscnt_oveflow << 1; RETURN_IF_ERROR(_flush_memtable()); + if (memtable_flushed != nullptr) { + *memtable_flushed = true; + } } _total_received_rows += row_idxs.size(); @@ -139,6 +147,9 @@ Status MemTableWriter::write(const Block* block, const DorisVector& ro } if (UNLIKELY(_mem_table->need_flush())) { RETURN_IF_ERROR(_flush_memtable()); + if (memtable_flushed != nullptr) { + *memtable_flushed = true; + } } return Status::OK(); @@ -172,7 +183,6 @@ Status MemTableWriter::flush_async() { // 1. call by local, from `VTabletWriterV2::_write_memtable`. // 2. call by remote, from `LoadChannelMgr::_get_load_channel`. // 3. call by daemon thread, from `handle_paused_queries` -> `flush_workload_group_memtables`. - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_resource_ctx->memory_context()->mem_tracker()); if (!_is_init || _is_closed) { // This writer is uninitialized or closed before flushing, do nothing. // We return OK instead of NOT_INITIALIZED or ALREADY_CLOSED. @@ -185,6 +195,9 @@ Status MemTableWriter::flush_async() { return _cancel_status; } + DCHECK(_resource_ctx != nullptr); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_resource_ctx->memory_context()->mem_tracker()); + VLOG_NOTICE << "flush memtable to reduce mem consumption. memtable size: " << PrettyPrinter::print_bytes(_mem_table->memory_usage()) << ", tablet: " << _req.tablet_id << ", load id: " << print_id(_req.load_id); @@ -359,6 +372,27 @@ uint64_t MemTableWriter::flush_running_count() const { return _flush_token == nullptr ? 0 : _flush_token->get_stats().flush_running_count.load(); } +int64_t MemTableWriter::table_id() const { + DORIS_CHECK(_req.table_schema_param != nullptr); + return _req.table_schema_param->table_id(); +} + +int64_t MemTableWriter::flush_pending_memtable_count() { + std::lock_guard l(_mem_table_ptr_lock); + int64_t memtable_count = 0; + for (const auto& mem_table : _freezed_mem_tables) { + auto mem_table_sptr = mem_table.lock(); + if (mem_table_sptr == nullptr) { + continue; + } + auto mem_type = mem_table_sptr->get_mem_type(); + if (mem_type == MemType::WRITE_FINISHED || mem_type == MemType::FLUSH) { + memtable_count++; + } + } + return memtable_count; +} + int64_t MemTableWriter::mem_consumption(MemType mem) { if (!_is_init) { // This method may be called before this writer is initialized. diff --git a/be/src/load/memtable/memtable_writer.h b/be/src/load/memtable/memtable_writer.h index 0c3a03926dc70f..30f03bc65f0976 100644 --- a/be/src/load/memtable/memtable_writer.h +++ b/be/src/load/memtable/memtable_writer.h @@ -62,7 +62,8 @@ class MemTableWriter { std::shared_ptr partial_update_info, std::shared_ptr wg_sptr, bool unique_key_mow = false); - Status write(const Block* block, const DorisVector& row_idxs); + Status write(const Block* block, const DorisVector& row_idxs, + bool* memtable_flushed = nullptr); // flush the last memtable to flush queue, must call it before close_wait() Status close(); @@ -83,6 +84,7 @@ class MemTableWriter { int64_t mem_consumption(MemType mem); int64_t active_memtable_mem_consumption(); + int64_t flush_pending_memtable_count(); // Submit current memtable to flush queue, and return without waiting. // This is currently for reducing mem consumption of this memtable writer. @@ -93,6 +95,8 @@ class MemTableWriter { int64_t tablet_id() const { return _req.tablet_id; } + int64_t table_id() const; + int64_t total_received_rows() const { return _total_received_rows; } const FlushStatistic& get_flush_token_stats(); diff --git a/be/src/storage/segment/page_io.cpp b/be/src/storage/segment/page_io.cpp index 0bc86deaf2414c..625d01309d6546 100644 --- a/be/src/storage/segment/page_io.cpp +++ b/be/src/storage/segment/page_io.cpp @@ -61,6 +61,7 @@ Status PageIO::compress_page_body(BlockCompressionCodec* codec, double min_space if (space_saving > 0 && space_saving >= min_space_saving) { // shrink the buf to fit the len size to avoid taking // up the memory of the size MAX_COMPRESSED_SIZE + buf.shrink_to_fit(); RETURN_IF_CATCH_EXCEPTION(*compressed_body = buf.build()); return Status::OK(); } diff --git a/be/src/storage/tablet_info.cpp b/be/src/storage/tablet_info.cpp index 518dd7baee4878..88d2c79b3ed268 100644 --- a/be/src/storage/tablet_info.cpp +++ b/be/src/storage/tablet_info.cpp @@ -749,10 +749,15 @@ Status VOlapTablePartitionParam::generate_partition_from(const TOlapTablePartiti part_result = _obj_pool.add(new VOlapTablePartition(&_partition_block)); part_result->id = t_part.id; part_result->is_mutable = t_part.is_mutable; - // only load_to_single_tablet = true will set load_tablet_idx if (t_part.__isset.load_tablet_idx) { part_result->load_tablet_idx = t_part.load_tablet_idx; } + if (t_part.__isset.bucket_be_id) { + part_result->bucket_be_id = t_part.bucket_be_id; + } + if (t_part.__isset.local_bucket_seqs) { + part_result->local_bucket_seqs = t_part.local_bucket_seqs; + } if (_is_in_partition) { for (const auto& keys : t_part.in_keys) { @@ -839,6 +844,15 @@ Status VOlapTablePartitionParam::add_partitions( } part->num_buckets = t_part.num_buckets; + if (t_part.__isset.load_tablet_idx) { + part->load_tablet_idx = t_part.load_tablet_idx; + } + if (t_part.__isset.bucket_be_id) { + part->bucket_be_id = t_part.bucket_be_id; + } + if (t_part.__isset.local_bucket_seqs) { + part->local_bucket_seqs = t_part.local_bucket_seqs; + } auto num_indexes = _schema->indexes().size(); if (t_part.indexes.size() != num_indexes) { return Status::InternalError( @@ -911,6 +925,15 @@ Status VOlapTablePartitionParam::replace_partitions( } part->num_buckets = t_part.num_buckets; + if (t_part.__isset.load_tablet_idx) { + part->load_tablet_idx = t_part.load_tablet_idx; + } + if (t_part.__isset.bucket_be_id) { + part->bucket_be_id = t_part.bucket_be_id; + } + if (t_part.__isset.local_bucket_seqs) { + part->local_bucket_seqs = t_part.local_bucket_seqs; + } auto num_indexes = _schema->indexes().size(); if (t_part.indexes.size() != num_indexes) { return Status::InternalError( diff --git a/be/src/storage/tablet_info.h b/be/src/storage/tablet_info.h index c0caf8bb49afdb..748879caed0363 100644 --- a/be/src/storage/tablet_info.h +++ b/be/src/storage/tablet_info.h @@ -167,6 +167,13 @@ struct VOlapTablePartition { bool is_mutable; // -1 indicates partition with hash distribution int64_t load_tablet_idx = -1; + // FE-selected bucket owner BE for adaptive random bucket mode. -1 means use the current + // execution BE, which preserves the legacy BE-side calculation behaviour. + int64_t bucket_be_id = -1; + // Bucket indices (0-based) used by FIND_TABLET_RANDOM_BUCKET rotation. FE may preselect and + // send them; otherwise BE computes them from tablet location info and bucket_be_id/current BE. + // Empty means fallback to the fixed load_tablet_idx bucket. + std::vector local_bucket_seqs; int total_replica_num = 0; int load_required_replica_num = 0; // tablet_id -> set of backend_ids that have version gaps diff --git a/be/test/exec/sink/vrow_distribution_test.cpp b/be/test/exec/sink/vrow_distribution_test.cpp index a5d862f370b59f..4d81af7db866eb 100644 --- a/be/test/exec/sink/vrow_distribution_test.cpp +++ b/be/test/exec/sink/vrow_distribution_test.cpp @@ -275,8 +275,27 @@ TEST(VRowDistributionTest, AutoPartitionMissingValuesBatchingDedupAndCreateParti schema_index_id, tablet_sink_tuple_id, partition_slot_id); auto tlocation = sink_test_utils::build_location_param(); - auto h = _build_vrow_distribution_harness(ctx, tschema, tpartition, tlocation, - tablet_sink_tuple_id, txn_id); + VRowDistributionHarness* harness = nullptr; + bool create_callback_called = false; + std::function create_callback = + [&](TCreatePartitionResult* result) { + create_callback_called = true; + EXPECT_EQ(result->partitions.size(), 1); + + auto new_partition_block = ColumnHelper::create_block({15}); + VOlapTablePartition* new_part = nullptr; + harness->vpartition->find_partition(&new_partition_block, 0, new_part); + if (new_part == nullptr) { + return Status::InternalError("new partition is not found"); + } + EXPECT_EQ(new_part->id, 3); + return Status::OK(); + }; + + auto h = _build_vrow_distribution_harness( + ctx, tschema, tpartition, tlocation, tablet_sink_tuple_id, txn_id, + &_delegated_create_partition_callback, &create_callback); + harness = h.get(); auto input_block = ColumnHelper::create_block({15, 15}); std::shared_ptr converted_block; @@ -334,6 +353,7 @@ TEST(VRowDistributionTest, AutoPartitionMissingValuesBatchingDedupAndCreateParti st = h->row_distribution.automatic_create_partition(); EXPECT_TRUE(st.ok()) << st.to_string(); EXPECT_TRUE(injected); + EXPECT_TRUE(create_callback_called); auto check_block = ColumnHelper::create_block({15}); std::vector parts(1, nullptr); @@ -371,21 +391,21 @@ TEST(VRowDistributionTest, ReplaceOverwritingPartitionInjectedRequestDedupAndRep EXPECT_EQ(result->partitions.size(), 2); auto old_partition_block = ColumnHelper::create_block({1}); - VOlapTablePartition* old_part = nullptr; - harness->vpartition->find_partition(&old_partition_block, 0, old_part); - if (old_part == nullptr) { - return Status::InternalError("old partition is not found"); + VOlapTablePartition* new_part = nullptr; + harness->vpartition->find_partition(&old_partition_block, 0, new_part); + if (new_part == nullptr) { + return Status::InternalError("new partition is not found"); } - EXPECT_EQ(old_part->id, 1); + EXPECT_EQ(new_part->id, 11); auto another_old_partition_block = ColumnHelper::create_block({25}); - VOlapTablePartition* another_old_part = nullptr; + VOlapTablePartition* another_new_part = nullptr; harness->vpartition->find_partition(&another_old_partition_block, 0, - another_old_part); - if (another_old_part == nullptr) { - return Status::InternalError("another old partition is not found"); + another_new_part); + if (another_new_part == nullptr) { + return Status::InternalError("another new partition is not found"); } - EXPECT_EQ(another_old_part->id, 2); + EXPECT_EQ(another_new_part->id, 12); return Status::OK(); }; diff --git a/be/test/exec/sink/vtablet_finder_test.cpp b/be/test/exec/sink/vtablet_finder_test.cpp new file mode 100644 index 00000000000000..802ef53c082e8c --- /dev/null +++ b/be/test/exec/sink/vtablet_finder_test.cpp @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/sink/vtablet_finder.h" + +#include + +#include + +namespace doris { +namespace { + +TEST(AdaptiveRandomBucketStateTest, TracksCurrentTabletPerSenderAndPartition) { + AdaptiveRandomBucketState state(UniqueId(1, 2)); + + state.init_partition(0, 10, std::vector {100, 101}, std::vector {0, 1}, 0); + state.init_partition(1, 10, std::vector {200, 201}, std::vector {0, 1}, 1); + + EXPECT_EQ(state.current_tablet(0, 10), 100); + EXPECT_EQ(state.current_tablet(1, 10), 201); + EXPECT_EQ(state.current_tablet(2, 10), -1); + + state.rotate_by_tablet(0, 10, 100); + EXPECT_EQ(state.current_tablet(0, 10), 101); + EXPECT_EQ(state.current_tablet(1, 10), 201); + + state.rotate_by_tablet(1, 10, 100); + EXPECT_EQ(state.current_tablet(1, 10), 201); + + state.rotate_by_tablet(1, 10, 201); + EXPECT_EQ(state.current_tablet(0, 10), 101); + EXPECT_EQ(state.current_tablet(1, 10), 200); +} + +TEST(AdaptiveRandomBucketStateTest, IgnoresDuplicateInitForSameSenderPartition) { + AdaptiveRandomBucketState state(UniqueId(1, 2)); + + state.init_partition(0, 10, std::vector {100, 101}, std::vector {0, 1}, 0); + state.init_partition(0, 10, std::vector {200, 201}, std::vector {0, 1}, 1); + + EXPECT_EQ(state.current_tablet(0, 10), 100); +} + +} // namespace +} // namespace doris diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 232c06ff3040d4..6cb99a672556f2 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -3668,4 +3668,15 @@ public static int metaServiceRpcRetryTimes() { + "obtaining partition version information when calculating the delete bitmap. Enabled " + "by default."}) public static boolean calc_delete_bitmap_get_versions_waiting_for_pending_txns = true; + + @ConfField(mutable = true, masterOnly = true, description = { + "Whether to enable adaptive random bucket load. When enabled, each BE computes its own local " + + "bucket set (buckets whose primary replica it hosts) from the tablet location info " + + "sent by FE, and rotates across those buckets once per-tablet write volume exceeds " + + "the threshold (default 200 MB). This reduces import memory pressure and improves " + + "throughput for random-distribution tables. Covers all load types uniformly.", + "是否启用自适应随机桶导入。开启后每个 BE 根据 FE 下发的 tablet 位置信息自行计算本地桶集合" + + "(持有主副本的桶),并在单个 tablet 写入量超过阈值(默认 200 MB)后在本地桶之间轮转。" + + "可降低导入内存压力并提升随机分桶表的吞吐量,覆盖所有导入类型。"}) + public static boolean enable_adaptive_random_bucket_load = true; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java index 8f91dcf1462e24..69880de7282c70 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -100,6 +100,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -177,6 +178,12 @@ public void init(TUniqueId loadId, long txnId, long dbId, long loadChannelTimeou "if load_to_single_tablet set to true," + " the olap table must be with random distribution"); } tSink.setLoadToSingleTablet(loadToSingleTablet); + // Signal BE to use FIND_TABLET_RANDOM_BUCKET from the start so that auto-partition + // tables whose initial partition list is empty still enter the correct mode. + if (Config.isCloudMode() && Config.enable_adaptive_random_bucket_load && !loadToSingleTablet + && dstTable.getDefaultDistributionInfo() instanceof RandomDistributionInfo) { + tSink.setEnableAdaptiveRandomBucket(true); + } tSink.setTxnTimeoutS(txnExpirationS); String vaultId = dstTable.getStorageVaultId(); if (vaultId != null && !vaultId.isEmpty()) { @@ -574,6 +581,302 @@ private TOlapTablePartitionParam createDummyPartition(long dbId, OlapTable table return partitionParam; } + public static final class AdaptiveBucketAssignment { + private final long bucketBeId; + private final int loadTabletIdx; + private final List localBucketSeqs; + + public AdaptiveBucketAssignment(long bucketBeId, int loadTabletIdx, List localBucketSeqs) { + this.bucketBeId = bucketBeId; + this.loadTabletIdx = loadTabletIdx; + this.localBucketSeqs = new ArrayList<>(localBucketSeqs); + } + + public long getBucketBeId() { + return bucketBeId; + } + + public int getLoadTabletIdx() { + return loadTabletIdx; + } + + public List getLocalBucketSeqs() { + return localBucketSeqs; + } + } + + public static boolean shouldAssignAdaptiveRandomBucket(TOlapTableSink sink) { + return sink != null + && sink.isSetEnableAdaptiveRandomBucket() + && sink.isEnableAdaptiveRandomBucket() + && (!sink.isSetLoadToSingleTablet() || !sink.isLoadToSingleTablet()) + && sink.isSetPartition() + && sink.getPartition() != null + && (!sink.getPartition().isSetDistributedColumns() + || sink.getPartition().getDistributedColumns().isEmpty()); + } + + public boolean shouldAssignAdaptiveRandomBucket() { + return tDataSink != null && shouldAssignAdaptiveRandomBucket(tDataSink.getOlapTableSink()); + } + + public static Map> computeAdaptiveRandomBucketAssignments( + List sinkBackendIds, List partitions, + List tabletLocations, int planFragmentNum) { + Map> assignments = new HashMap<>(); + List orderedSinkBackendIds = sinkBackendIds.stream() + .distinct() + .sorted() + .collect(Collectors.toList()); + for (Long sinkBackendId : orderedSinkBackendIds) { + assignments.put(sinkBackendId, new HashMap<>()); + } + if (orderedSinkBackendIds.isEmpty() || partitions == null || tabletLocations == null) { + return assignments; + } + + Map tabletLocationMap = new HashMap<>(tabletLocations.size()); + for (TTabletLocation tabletLocation : tabletLocations) { + tabletLocationMap.put(tabletLocation.getTabletId(), tabletLocation); + } + + for (TOlapTablePartition partition : partitions) { + if (!partition.isSetLoadTabletIdx() || partition.getNumBuckets() <= 0 + || partition.getIndexes().isEmpty()) { + continue; + } + Map> beToBucketSeqs = buildBeToBucketSeqs(partition, tabletLocationMap); + long baseTabletIndex = partition.getLoadTabletIdx(); + int fallbackBucketIdx = (int) Math.floorMod(baseTabletIndex, (long) partition.getNumBuckets()); + int targetBucketNum = Math.min( + Math.min(orderedSinkBackendIds.size(), partition.getNumBuckets()), + Math.max(planFragmentNum, 1)); + if (targetBucketNum <= 0) { + continue; + } + + List rotatedSinkBackendIds = rotateSinkBackendIds(orderedSinkBackendIds, baseTabletIndex); + Map bucketToOwnerBe = buildBucketToOwnerBe(beToBucketSeqs); + List selectedBucketSeqs = selectAdaptiveBucketSeqs(rotatedSinkBackendIds, + beToBucketSeqs, bucketToOwnerBe, baseTabletIndex, partition.getNumBuckets(), targetBucketNum); + if (selectedBucketSeqs.isEmpty()) { + selectedBucketSeqs = Collections.singletonList(fallbackBucketIdx); + } + if (selectedBucketSeqs.size() != targetBucketNum) { + LOG.warn("Adaptive random bucket selected {} buckets instead of target {} for partition {}, " + + "sinkBackendIds={}, beToBucketSeqs={}, selectedBucketSeqs={}, fallbackBucketIdx={}", + selectedBucketSeqs.size(), targetBucketNum, partition.getId(), orderedSinkBackendIds, + beToBucketSeqs, selectedBucketSeqs, fallbackBucketIdx); + } + + Map> openedBeToBucketSeqs = buildBeToBucketSeqs(bucketToOwnerBe, selectedBucketSeqs); + Map bucketUseCounts = new HashMap<>(); + Map sinkAssignments = LOG.isInfoEnabled() ? new HashMap<>() : null; + for (Long sinkBackendId : rotatedSinkBackendIds) { + int bucketSeq = selectLeastUsedBucketSeq( + openedBeToBucketSeqs.get(sinkBackendId), bucketUseCounts, baseTabletIndex); + if (bucketSeq < 0) { + bucketSeq = selectLeastUsedBucketSeq(selectedBucketSeqs, bucketUseCounts, baseTabletIndex); + } + if (bucketSeq < 0) { + bucketSeq = fallbackBucketIdx; + } + long bucketBeId = bucketToOwnerBe.getOrDefault(bucketSeq, -1L); + if (bucketBeId <= 0) { + LOG.warn("Adaptive random bucket falls back to bucket {} without owner BE for partition {}, " + + "sinkBackendId={}, selectedBucketSeqs={}, beToBucketSeqs={}", + bucketSeq, partition.getId(), sinkBackendId, selectedBucketSeqs, beToBucketSeqs); + } + List localBucketSeqs = rotateBucketSeqsForStartBucket( + beToBucketSeqs.get(bucketBeId), bucketSeq); + assignments.get(sinkBackendId).put(partition.getId(), + new AdaptiveBucketAssignment(bucketBeId, bucketSeq, localBucketSeqs)); + bucketUseCounts.merge(bucketSeq, 1, Integer::sum); + if (sinkAssignments != null) { + sinkAssignments.put(sinkBackendId, + "bucket=" + bucketSeq + ",bucketBeId=" + bucketBeId + + ",localBucketSeqs=" + localBucketSeqs); + } + } + if (sinkAssignments != null) { + LOG.info("Adaptive random bucket plan partition={}, baseTabletIndex={}, targetBucketNum={}, " + + "sinkBackendIds={}, rotatedSinkBackendIds={}, beToBucketSeqs={}, " + + "selectedBucketSeqs={}, openedBeToBucketSeqs={}, sinkAssignments={}", + partition.getId(), baseTabletIndex, targetBucketNum, orderedSinkBackendIds, + rotatedSinkBackendIds, beToBucketSeqs, selectedBucketSeqs, openedBeToBucketSeqs, + sinkAssignments); + } + } + return assignments; + } + + public static void applyAdaptiveRandomBucketAssignments(List partitions, + Map partitionAssignments) { + if (partitions == null || partitionAssignments == null || partitionAssignments.isEmpty()) { + return; + } + for (TOlapTablePartition partition : partitions) { + AdaptiveBucketAssignment assignment = partitionAssignments.get(partition.getId()); + if (assignment == null) { + continue; + } + partition.setLoadTabletIdx(assignment.getLoadTabletIdx()); + if (assignment.getBucketBeId() > 0) { + partition.setBucketBeId(assignment.getBucketBeId()); + } else if (partition.isSetBucketBeId()) { + partition.unsetBucketBeId(); + } + if (!assignment.getLocalBucketSeqs().isEmpty()) { + partition.setLocalBucketSeqs(new ArrayList<>(assignment.getLocalBucketSeqs())); + } else if (partition.isSetLocalBucketSeqs()) { + partition.unsetLocalBucketSeqs(); + } + if (LOG.isInfoEnabled()) { + LOG.info("Adaptive random bucket apply partition={}, bucketBeId={}, loadTabletIdx={}, " + + "localBucketSeqs={}", + partition.getId(), assignment.getBucketBeId(), assignment.getLoadTabletIdx(), + assignment.getLocalBucketSeqs()); + } + } + } + + private static Map> buildBeToBucketSeqs(TOlapTablePartition partition, + Map tabletLocationMap) { + Map> beToBucketSeqs = new HashMap<>(); + List tablets = partition.getIndexes().get(0).getTablets(); + for (int bucketSeq = 0; bucketSeq < tablets.size(); bucketSeq++) { + TTabletLocation tabletLocation = tabletLocationMap.get(tablets.get(bucketSeq)); + if (tabletLocation == null || tabletLocation.getNodeIds() == null + || tabletLocation.getNodeIds().isEmpty()) { + continue; + } + List sortedNodeIds = new ArrayList<>(tabletLocation.getNodeIds()); + Collections.sort(sortedNodeIds); + long bucketBeId = sortedNodeIds.get(bucketSeq % sortedNodeIds.size()); + beToBucketSeqs.computeIfAbsent(bucketBeId, ignored -> new ArrayList<>()).add(bucketSeq); + } + return beToBucketSeqs; + } + + private static List rotateSinkBackendIds(List sinkBackendIds, long baseTabletIndex) { + if (sinkBackendIds.isEmpty()) { + return Collections.emptyList(); + } + int startIdx = (int) Math.floorMod(baseTabletIndex, (long) sinkBackendIds.size()); + List rotatedSinkBackendIds = new ArrayList<>(sinkBackendIds.size()); + for (int offset = 0; offset < sinkBackendIds.size(); offset++) { + rotatedSinkBackendIds.add(sinkBackendIds.get((startIdx + offset) % sinkBackendIds.size())); + } + return rotatedSinkBackendIds; + } + + private static Map buildBucketToOwnerBe(Map> beToBucketSeqs) { + Map bucketToOwnerBe = new HashMap<>(); + for (Map.Entry> entry : beToBucketSeqs.entrySet()) { + for (Integer bucketSeq : entry.getValue()) { + bucketToOwnerBe.put(bucketSeq, entry.getKey()); + } + } + return bucketToOwnerBe; + } + + private static List selectAdaptiveBucketSeqs(List rotatedSinkBackendIds, + Map> beToBucketSeqs, Map bucketToOwnerBe, + long baseTabletIndex, int numBuckets, int targetBucketNum) { + List selectedBucketSeqs = new ArrayList<>(targetBucketNum); + HashSet selectedBucketSet = new HashSet<>(targetBucketNum); + for (Long sinkBackendId : rotatedSinkBackendIds) { + int bucketSeq = selectBucketSeq(beToBucketSeqs.get(sinkBackendId), baseTabletIndex); + if (bucketSeq >= 0 && selectedBucketSet.add(bucketSeq)) { + selectedBucketSeqs.add(bucketSeq); + if (selectedBucketSeqs.size() >= targetBucketNum) { + return selectedBucketSeqs; + } + } + } + + HashSet sinkBackendIdSet = new HashSet<>(rotatedSinkBackendIds); + appendRemainingBucketSeqs(selectedBucketSeqs, selectedBucketSet, bucketToOwnerBe, sinkBackendIdSet, + baseTabletIndex, numBuckets, targetBucketNum, true); + appendRemainingBucketSeqs(selectedBucketSeqs, selectedBucketSet, bucketToOwnerBe, sinkBackendIdSet, + baseTabletIndex, numBuckets, targetBucketNum, false); + return selectedBucketSeqs; + } + + private static void appendRemainingBucketSeqs(List selectedBucketSeqs, HashSet selectedBucketSet, + Map bucketToOwnerBe, HashSet sinkBackendIdSet, long baseTabletIndex, + int numBuckets, int targetBucketNum, boolean preferSinkBackendBuckets) { + for (int offset = 0; offset < numBuckets && selectedBucketSeqs.size() < targetBucketNum; offset++) { + int bucketSeq = (int) Math.floorMod(baseTabletIndex + offset, (long) numBuckets); + if (selectedBucketSet.contains(bucketSeq)) { + continue; + } + Long ownerBeId = bucketToOwnerBe.get(bucketSeq); + if (preferSinkBackendBuckets && (ownerBeId == null || !sinkBackendIdSet.contains(ownerBeId))) { + continue; + } + if (!preferSinkBackendBuckets && ownerBeId != null && sinkBackendIdSet.contains(ownerBeId)) { + continue; + } + selectedBucketSet.add(bucketSeq); + selectedBucketSeqs.add(bucketSeq); + } + } + + private static Map> buildBeToBucketSeqs(Map bucketToOwnerBe, + List bucketSeqs) { + Map> beToBucketSeqs = new HashMap<>(); + for (Integer bucketSeq : bucketSeqs) { + Long ownerBeId = bucketToOwnerBe.get(bucketSeq); + if (ownerBeId == null) { + continue; + } + beToBucketSeqs.computeIfAbsent(ownerBeId, ignored -> new ArrayList<>()).add(bucketSeq); + } + return beToBucketSeqs; + } + + private static int selectBucketSeq(List bucketSeqs, long baseTabletIndex) { + if (bucketSeqs == null || bucketSeqs.isEmpty()) { + return -1; + } + int bucketPos = (int) Math.floorMod(baseTabletIndex, (long) bucketSeqs.size()); + return bucketSeqs.get(bucketPos); + } + + private static int selectLeastUsedBucketSeq(List bucketSeqs, Map bucketUseCounts, + long baseTabletIndex) { + if (bucketSeqs == null || bucketSeqs.isEmpty()) { + return -1; + } + int startIdx = (int) Math.floorMod(baseTabletIndex, (long) bucketSeqs.size()); + int selectedBucketSeq = -1; + int selectedUseCount = Integer.MAX_VALUE; + for (int offset = 0; offset < bucketSeqs.size(); offset++) { + int candidateBucketSeq = bucketSeqs.get((startIdx + offset) % bucketSeqs.size()); + int candidateUseCount = bucketUseCounts.getOrDefault(candidateBucketSeq, 0); + if (selectedBucketSeq < 0 || candidateUseCount < selectedUseCount) { + selectedBucketSeq = candidateBucketSeq; + selectedUseCount = candidateUseCount; + } + } + return selectedBucketSeq; + } + + private static List rotateBucketSeqsForStartBucket(List bucketSeqs, int startBucketSeq) { + if (bucketSeqs == null || bucketSeqs.isEmpty()) { + return Collections.singletonList(startBucketSeq); + } + int startIdx = bucketSeqs.indexOf(startBucketSeq); + Preconditions.checkState(startIdx >= 0, + "start bucket %s must exist in bucketSeqs %s", startBucketSeq, bucketSeqs); + List rotatedBucketSeqs = new ArrayList<>(bucketSeqs.size()); + for (int offset = 0; offset < bucketSeqs.size(); offset++) { + rotatedBucketSeqs.add(bucketSeqs.get((startIdx + offset) % bucketSeqs.size())); + } + return rotatedBucketSeqs; + } + private TOlapTablePartitionParam createPartition(long dbId, OlapTable table) throws UserException { TOlapTablePartitionParam partitionParam = new TOlapTablePartitionParam(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index b9b12a1abd1a5f..6a81ce97ffa082 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -60,6 +60,7 @@ import org.apache.doris.planner.MultiCastDataSink; import org.apache.doris.planner.MultiCastPlanFragment; import org.apache.doris.planner.OlapScanNode; +import org.apache.doris.planner.OlapTableSink; import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.PlanFragmentId; import org.apache.doris.planner.PlanNode; @@ -96,6 +97,7 @@ import org.apache.doris.thrift.PaloInternalServiceVersion; import org.apache.doris.thrift.TAIResource; import org.apache.doris.thrift.TBrokerScanRange; +import org.apache.doris.thrift.TDataSink; import org.apache.doris.thrift.TDataSinkType; import org.apache.doris.thrift.TDescriptorTable; import org.apache.doris.thrift.TErrorTabletInfo; @@ -105,6 +107,7 @@ import org.apache.doris.thrift.TFileScanRangeParams; import org.apache.doris.thrift.TFragmentInstanceReport; import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TOlapTableSink; import org.apache.doris.thrift.TPaloScanRange; import org.apache.doris.thrift.TPipelineFragmentParams; import org.apache.doris.thrift.TPipelineFragmentParamsList; @@ -155,6 +158,7 @@ import java.security.SecureRandom; import java.time.LocalDateTime; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -165,6 +169,7 @@ import java.util.Optional; import java.util.Random; import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -533,6 +538,95 @@ public Map getBeToInstancesNum() { return result; } + public static final class AdaptiveRandomBucketSinkContext { + private final List sinkBackendIds; + private final int planFragmentNum; + + private AdaptiveRandomBucketSinkContext(List sinkBackendIds, int planFragmentNum) { + this.sinkBackendIds = sinkBackendIds; + this.planFragmentNum = planFragmentNum; + } + + public List getSinkBackendIds() { + return sinkBackendIds; + } + + public int getPlanFragmentNum() { + return planFragmentNum; + } + } + + public Optional getAdaptiveRandomBucketSinkContext() { + Set sinkBackendIds = new TreeSet<>(); + int planFragmentNum = 0; + for (PipelineExecContext context : pipelineExecContexts.values()) { + TPipelineFragmentParams params = context.rpcParams; + if (params.getFragment().getOutputSink() == null + || params.getFragment().getOutputSink().getType() != TDataSinkType.OLAP_TABLE_SINK) { + continue; + } + TOlapTableSink sink = params.getFragment().getOutputSink().getOlapTableSink(); + if (!OlapTableSink.shouldAssignAdaptiveRandomBucket(sink)) { + continue; + } + sinkBackendIds.add(params.getBackendId()); + planFragmentNum += params.getLocalParamsSize(); + } + if (sinkBackendIds.isEmpty()) { + return Optional.empty(); + } + return Optional.of(new AdaptiveRandomBucketSinkContext( + new ArrayList<>(sinkBackendIds), Math.max(planFragmentNum, 1))); + } + + private static void assignAdaptiveRandomBucketForFragment( + Collection fragmentParamsList) { + List sinkParams = fragmentParamsList.stream() + .filter(param -> param.getFragment().getOutputSink() != null + && param.getFragment().getOutputSink().getType() == TDataSinkType.OLAP_TABLE_SINK) + .collect(Collectors.toList()); + if (sinkParams.isEmpty()) { + return; + } + TOlapTableSink sink = sinkParams.get(0).getFragment().getOutputSink().getOlapTableSink(); + if (!OlapTableSink.shouldAssignAdaptiveRandomBucket(sink)) { + return; + } + List sinkBackendIds = sinkParams.stream() + .map(TPipelineFragmentParams::getBackendId) + .distinct() + .sorted() + .collect(Collectors.toList()); + int planFragmentNum = sinkParams.stream() + .mapToInt(TPipelineFragmentParams::getLocalParamsSize) + .sum(); + if (LOG.isInfoEnabled()) { + LOG.info("Adaptive random bucket planning in legacy fragment={}, sinkBackendIds={}, " + + "planFragmentNum={}", + sinkParams.get(0).getFragmentId(), sinkBackendIds, planFragmentNum); + } + Map> assignments = + OlapTableSink.computeAdaptiveRandomBucketAssignments(sinkBackendIds, + sink.getPartition().getPartitions(), sink.getLocation().getTablets(), planFragmentNum); + for (TPipelineFragmentParams sinkParam : sinkParams) { + Map partitionAssignments = + assignments.get(sinkParam.getBackendId()); + if (partitionAssignments == null) { + continue; + } + TOlapTableSink copiedSink = deepCopyOlapTableSinkForCurrentBackend(sinkParam); + OlapTableSink.applyAdaptiveRandomBucketAssignments( + copiedSink.getPartition().getPartitions(), + partitionAssignments); + } + } + + private static TOlapTableSink deepCopyOlapTableSinkForCurrentBackend(TPipelineFragmentParams sinkParam) { + TDataSink copiedOutputSink = sinkParam.getFragment().getOutputSink().deepCopy(); + sinkParam.getFragment().setOutputSink(copiedOutputSink); + return copiedOutputSink.getOlapTableSink(); + } + // Initialize protected void prepare() throws UserException { for (PlanFragment fragment : fragments) { @@ -893,6 +987,7 @@ protected void sendPipelineCtx() throws Exception { } ++backendIdx; } + assignAdaptiveRandomBucketForFragment(tParams.values()); for (Map.Entry entry : tParams.entrySet()) { if (entry.getValue().getFragment().getOutputSink() != null && entry.getValue().getFragment().getOutputSink().getType() diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java index 6c6799b51adc6d..3ac7a1f92034b4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java @@ -54,10 +54,12 @@ import org.apache.doris.qe.CoordinatorContext; import org.apache.doris.thrift.PaloInternalServiceVersion; import org.apache.doris.thrift.TAIResource; +import org.apache.doris.thrift.TDataSink; import org.apache.doris.thrift.TDataSinkType; import org.apache.doris.thrift.TExpr; import org.apache.doris.thrift.TFileScanRangeParams; import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TOlapTableSink; import org.apache.doris.thrift.TPipelineFragmentParams; import org.apache.doris.thrift.TPipelineFragmentParamsList; import org.apache.doris.thrift.TPipelineInstanceParams; @@ -96,6 +98,7 @@ import java.util.TreeSet; import java.util.function.BiConsumer; import java.util.function.Supplier; +import java.util.stream.Collectors; public class ThriftPlansBuilder { private static final Logger LOG = LogManager.getLogger(ThriftPlansBuilder.class); @@ -247,7 +250,22 @@ private static void setParamsForOlapTableSink(List dist } } + Map> olapSinkParamsByFragmentId = Maps.newLinkedHashMap(); + for (Entry kv : fragmentsGroupByWorker.entrySet()) { + for (TPipelineFragmentParams fragmentParams : kv.getValue().getParamsList()) { + TDataSink outputSink = fragmentParams.getFragment().getOutputSink(); + if (outputSink != null && outputSink.getType() == TDataSinkType.OLAP_TABLE_SINK) { + olapSinkParamsByFragmentId.computeIfAbsent(fragmentParams.getFragmentId(), + ignored -> new ArrayList<>()).add(fragmentParams); + } + } + } + for (List sinkParams : olapSinkParamsByFragmentId.values()) { + assignAdaptiveRandomBucketForSinkParams(sinkParams); + } + ConnectContext connectContext = coordinatorContext.connectContext; + for (Entry kv : fragmentsGroupByWorker.entrySet()) { TPipelineFragmentParamsList fragments = kv.getValue(); for (TPipelineFragmentParams fragmentParams : fragments.getParamsList()) { @@ -266,6 +284,49 @@ private static void setParamsForOlapTableSink(List dist } } + private static void assignAdaptiveRandomBucketForSinkParams(List sinkParams) { + if (sinkParams.isEmpty()) { + return; + } + TOlapTableSink sink = sinkParams.get(0).getFragment().getOutputSink().getOlapTableSink(); + if (!OlapTableSink.shouldAssignAdaptiveRandomBucket(sink)) { + return; + } + List sinkBackendIds = sinkParams.stream() + .map(TPipelineFragmentParams::getBackendId) + .distinct() + .sorted() + .collect(Collectors.toList()); + int planFragmentNum = sinkParams.stream() + .mapToInt(TPipelineFragmentParams::getLocalParamsSize) + .sum(); + if (LOG.isInfoEnabled()) { + LOG.info("Adaptive random bucket planning in nereids fragment={}, sinkBackendIds={}, " + + "planFragmentNum={}", + sinkParams.get(0).getFragmentId(), sinkBackendIds, planFragmentNum); + } + Map> assignments = + OlapTableSink.computeAdaptiveRandomBucketAssignments(sinkBackendIds, + sink.getPartition().getPartitions(), sink.getLocation().getTablets(), planFragmentNum); + for (TPipelineFragmentParams sinkParam : sinkParams) { + Map partitionAssignments = + assignments.get(sinkParam.getBackendId()); + if (partitionAssignments == null) { + continue; + } + TOlapTableSink copiedSink = deepCopyOlapTableSinkForCurrentBackend(sinkParam); + OlapTableSink.applyAdaptiveRandomBucketAssignments( + copiedSink.getPartition().getPartitions(), + partitionAssignments); + } + } + + private static TOlapTableSink deepCopyOlapTableSinkForCurrentBackend(TPipelineFragmentParams sinkParam) { + TDataSink copiedOutputSink = sinkParam.getFragment().getOutputSink().deepCopy(); + sinkParam.getFragment().setOutputSink(copiedOutputSink); + return copiedOutputSink.getOlapTableSink(); + } + private static Multiset computeInstanceNumPerWorker( List distributedPlans) { Multiset workerCounter = LinkedHashMultiset.create(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index fb74d7ca29a02a..32c32be0de3f76 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -112,6 +112,8 @@ import org.apache.doris.nereids.trees.plans.commands.info.DropPartitionOp; import org.apache.doris.nereids.trees.plans.commands.info.LabelNameInfo; import org.apache.doris.nereids.trees.plans.commands.info.ReplacePartitionOp; +import org.apache.doris.nereids.trees.plans.distribute.PipelineDistributedPlan; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJob; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.planner.OlapTableSink; import org.apache.doris.qe.ConnectContext; @@ -354,6 +356,7 @@ import java.util.Optional; import java.util.Random; import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -4537,6 +4540,11 @@ public TCreatePartitionResult createPartition(TCreatePartitionRequest request) t List tablets = new ArrayList<>(); List slaveTablets = new ArrayList<>(); List partitions = Lists.newArrayList(); + Backend requestBackend = request.isSetBeEndpoint() ? resolveBeEndpoint(request.getBeEndpoint()) : null; + long adaptiveBucketBeId = requestBackend != null ? requestBackend.getId() : -1L; + TUniqueId queryId = request.isSetQueryId() ? request.getQueryId() : null; + boolean enableAdaptiveRandomBucket = request.isSetEnableAdaptiveRandomBucket() + && request.isEnableAdaptiveRandomBucket(); boolean loadToSingleTablet = request.isSetLoadToSingleTablet() && request.isLoadToSingleTablet(); final boolean hasBeEndpoint = request.isSetBeEndpoint(); // Lazy: resolved on the first CloudTablet that needs it (skipped on cache-hit). @@ -4566,7 +4574,8 @@ public TCreatePartitionResult createPartition(TCreatePartitionRequest request) t tPartition.setIsMutable(olapTable.getPartitionInfo().getIsMutable(partition.getId())); boolean randomDistribution = partition.getDistributionInfo().getType() == DistributionInfo.DistributionInfoType.RANDOM; - boolean cacheLoadTabletIdx = loadToSingleTablet && randomDistribution; + boolean cacheLoadTabletIdx = + (loadToSingleTablet || enableAdaptiveRandomBucket) && randomDistribution; partitions.add(tPartition); // tablet AtomicLong cachedLoadTabletIdx = new AtomicLong(-1); @@ -4577,6 +4586,8 @@ public TCreatePartitionResult createPartition(TCreatePartitionRequest request) t if (cacheLoadTabletIdx) { tPartition.setLoadTabletIdx(cachedLoadTabletIdx.get()); } + assignAdaptiveBucketToPartition(tPartition, partitionTablets, adaptiveBucketBeId, queryId, + enableAdaptiveRandomBucket); // fast path, if cached tablets.addAll(partitionTablets); slaveTablets.addAll(partitionSlaveTablets); @@ -4673,6 +4684,8 @@ public TCreatePartitionResult createPartition(TCreatePartitionRequest request) t tPartition.setLoadTabletIdx(cachedTabletIdx); } } + assignAdaptiveBucketToPartition(tPartition, partitionTablets, adaptiveBucketBeId, queryId, + enableAdaptiveRandomBucket); tablets.addAll(partitionTablets); slaveTablets.addAll(partitionSlaveTablets); @@ -4889,6 +4902,11 @@ public TReplacePartitionResult replacePartition(TReplacePartitionRequest request List tablets = new ArrayList<>(); List slaveTablets = new ArrayList<>(); PartitionInfo partitionInfo = olapTable.getPartitionInfo(); + Backend requestBackend = request.isSetBeEndpoint() ? resolveBeEndpoint(request.getBeEndpoint()) : null; + long adaptiveBucketBeId = requestBackend != null ? requestBackend.getId() : -1L; + TUniqueId queryId = request.isSetQueryId() ? request.getQueryId() : null; + boolean enableAdaptiveRandomBucket = request.isSetEnableAdaptiveRandomBucket() + && request.isEnableAdaptiveRandomBucket(); boolean loadToSingleTablet = request.isSetLoadToSingleTablet() && request.isLoadToSingleTablet(); final boolean replaceHasBeEndpoint = request.isSetBeEndpoint(); // Lazy: resolved on the first CloudTablet that needs it. @@ -4920,7 +4938,8 @@ public TReplacePartitionResult replacePartition(TReplacePartitionRequest request tPartition.setIsMutable(olapTable.getPartitionInfo().getIsMutable(partition.getId())); boolean randomDistribution = partition.getDistributionInfo().getType() == DistributionInfo.DistributionInfoType.RANDOM; - boolean cacheLoadTabletIdx = loadToSingleTablet && randomDistribution; + boolean cacheLoadTabletIdx = + (loadToSingleTablet || enableAdaptiveRandomBucket) && randomDistribution; partitions.add(tPartition); // tablet AtomicLong cachedLoadTabletIdx = new AtomicLong(-1); @@ -4931,6 +4950,8 @@ public TReplacePartitionResult replacePartition(TReplacePartitionRequest request if (cacheLoadTabletIdx) { tPartition.setLoadTabletIdx(cachedLoadTabletIdx.get()); } + assignAdaptiveBucketToPartition(tPartition, partitionTablets, adaptiveBucketBeId, queryId, + enableAdaptiveRandomBucket); // fast path, if cached tablets.addAll(partitionTablets); slaveTablets.addAll(partitionSlaveTablets); @@ -5037,6 +5058,8 @@ public TReplacePartitionResult replacePartition(TReplacePartitionRequest request partitionTablets.size(), partitionSlaveTablets.size()); } } + assignAdaptiveBucketToPartition(tPartition, partitionTablets, adaptiveBucketBeId, queryId, + enableAdaptiveRandomBucket); tablets.addAll(partitionTablets); slaveTablets.addAll(partitionSlaveTablets); @@ -5745,4 +5768,129 @@ private TStatus checkMaster() { } return status; } + + private static final class AdaptiveBucketSinkContext { + private final boolean enableAdaptiveRandomBucket; + private final List sinkBackendIds; + private final int planFragmentNum; + + private AdaptiveBucketSinkContext(boolean enableAdaptiveRandomBucket, List sinkBackendIds, + int planFragmentNum) { + this.enableAdaptiveRandomBucket = enableAdaptiveRandomBucket; + this.sinkBackendIds = sinkBackendIds; + this.planFragmentNum = planFragmentNum; + } + } + + private static AdaptiveBucketSinkContext disabledAdaptiveBucketSinkContext() { + return new AdaptiveBucketSinkContext(false, Lists.newArrayList(), 1); + } + + private static AdaptiveBucketSinkContext collectAdaptiveBucketSinkContext(TUniqueId queryId) { + if (queryId == null) { + return disabledAdaptiveBucketSinkContext(); + } + Coordinator coordinator = QeProcessorImpl.INSTANCE.getCoordinator(queryId); + if (coordinator == null) { + return disabledAdaptiveBucketSinkContext(); + } + if (!(coordinator instanceof NereidsCoordinator)) { + Optional context = + coordinator.getAdaptiveRandomBucketSinkContext(); + if (context.isPresent()) { + return new AdaptiveBucketSinkContext( + true, context.get().getSinkBackendIds(), context.get().getPlanFragmentNum()); + } + return disabledAdaptiveBucketSinkContext(); + } + Set sinkBackendIds = new TreeSet<>(); + int planFragmentNum = 0; + for (PipelineDistributedPlan distributedPlan : + ((NereidsCoordinator) coordinator).getCoordinatorContext().distributedPlans) { + if (!(distributedPlan.getFragmentJob().getFragment().getSink() instanceof OlapTableSink)) { + continue; + } + OlapTableSink sink = (OlapTableSink) distributedPlan.getFragmentJob().getFragment().getSink(); + if (!sink.shouldAssignAdaptiveRandomBucket()) { + continue; + } + planFragmentNum += distributedPlan.getInstanceJobs().size(); + for (AssignedJob assignedJob : distributedPlan.getInstanceJobs()) { + sinkBackendIds.add(assignedJob.getAssignedWorker().id()); + } + } + if (sinkBackendIds.isEmpty()) { + return disabledAdaptiveBucketSinkContext(); + } + return new AdaptiveBucketSinkContext(true, new ArrayList<>(sinkBackendIds), Math.max(planFragmentNum, 1)); + } + + private static void assignAdaptiveBucketToPartition(TOlapTablePartition partition, + List partitionTablets, long currentBeId, TUniqueId queryId, + boolean enableAdaptiveRandomBucket) { + if (!enableAdaptiveRandomBucket || !Config.enable_adaptive_random_bucket_load + || !partition.isSetLoadTabletIdx() || currentBeId <= 0) { + return; + } + AdaptiveBucketSinkContext sinkContext = collectAdaptiveBucketSinkContext(queryId); + if (!sinkContext.enableAdaptiveRandomBucket) { + sinkContext = new AdaptiveBucketSinkContext(true, Lists.newArrayList(currentBeId), 1); + LOG.warn("Adaptive random bucket sink context not found for runtime partition {}, " + + "fallback to currentBeId={}, queryId={}", + partition.getId(), currentBeId, queryId); + } + if (LOG.isInfoEnabled()) { + LOG.info("Adaptive random bucket replanning partition={}, currentBeId={}, queryId={}, " + + "sinkBackendIds={}, planFragmentNum={}", + partition.getId(), currentBeId, queryId, sinkContext.sinkBackendIds, + sinkContext.planFragmentNum); + } + Map> assignments = + OlapTableSink.computeAdaptiveRandomBucketAssignments( + sinkContext.sinkBackendIds, Lists.newArrayList(partition), partitionTablets, + sinkContext.planFragmentNum); + Map partitionAssignments = assignments.get(currentBeId); + if (partitionAssignments == null || !partitionAssignments.containsKey(partition.getId())) { + LOG.warn("Adaptive random bucket found no partition assignment for partition {}, currentBeId={}, " + + "queryId={}, sinkBackendIds={}, fallback to current BE only", + partition.getId(), currentBeId, queryId, sinkContext.sinkBackendIds); + assignments = OlapTableSink.computeAdaptiveRandomBucketAssignments( + Lists.newArrayList(currentBeId), Lists.newArrayList(partition), partitionTablets, 1); + partitionAssignments = assignments.get(currentBeId); + } + if (partitionAssignments != null && partitionAssignments.containsKey(partition.getId())) { + OlapTableSink.AdaptiveBucketAssignment assignment = partitionAssignments.get(partition.getId()); + if (LOG.isInfoEnabled()) { + LOG.info("Adaptive random bucket replan result partition={}, currentBeId={}, bucketBeId={}, " + + "loadTabletIdx={}, localBucketSeqs={}", + partition.getId(), currentBeId, assignment.getBucketBeId(), + assignment.getLoadTabletIdx(), assignment.getLocalBucketSeqs()); + } + OlapTableSink.applyAdaptiveRandomBucketAssignments( + Lists.newArrayList(partition), partitionAssignments); + } else { + LOG.warn("Adaptive random bucket fallback still found no partition assignment for partition {}, " + + "currentBeId={}, queryId={}", + partition.getId(), currentBeId, queryId); + } + } + + /** + * Resolves a BE endpoint string ("host:heartbeat_port") to a Backend object. + * Returns null if the endpoint is malformed or the backend cannot be found. + */ + private static Backend resolveBeEndpoint(String beEndpoint) { + int colonIdx = beEndpoint.lastIndexOf(':'); + if (colonIdx < 0) { + return null; + } + String host = beEndpoint.substring(0, colonIdx); + try { + int port = Integer.parseInt(beEndpoint.substring(colonIdx + 1)); + return Env.getCurrentSystemInfo().getBackendWithHeartbeatPort(host, port); + } catch (NumberFormatException e) { + return null; + } + } + } diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 89a5d64976e29c..ca3a41cd49bd21 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -103,6 +103,11 @@ message PTabletWithPartition { required int64 tablet_id = 2; } +message PRandomBucketPartitionParam { + required int64 partition_id = 1; + repeated int64 ordered_tablet_ids = 2; +} + message PTabletLoadRowsetInfo { required int32 current_rowset_nums = 1; required int32 max_config_rowset_nums = 2; @@ -148,6 +153,8 @@ message PTabletWriterOpenRequest { optional string storage_vault_id = 18; optional int32 sender_id = 19; optional int64 workload_group_id = 20; + optional bool is_receiver_side_random_bucket = 21 [default = false]; + repeated PRandomBucketPartitionParam random_bucket_partitions = 22; }; message PTabletWriterOpenResult { @@ -205,6 +212,7 @@ message PTabletWriterAddBlockRequest { optional bool is_single_tablet_block = 14 [default = false]; // for auto-partition first stage close, we should hang. optional bool hang_wait = 15 [default = false]; + optional bool is_receiver_side_random_bucket = 16 [default = false]; }; message PSlaveTabletNodes { @@ -1265,4 +1273,3 @@ service PBackendService { rpc fetch_peer_data(PFetchPeerDataRequest) returns (PFetchPeerDataResponse); rpc request_cdc_client(PRequestCdcClientRequest) returns (PRequestCdcClientResult); }; - diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift index 3212313183bcc4..79c7a5d1ce74a0 100644 --- a/gensrc/thrift/DataSinks.thrift +++ b/gensrc/thrift/DataSinks.thrift @@ -305,6 +305,14 @@ struct TOlapTableSink { 23: optional double max_filter_ratio 24: optional string storage_vault_id + + // When true, FE should assign each sink a bucket_be_id/load_tablet_idx pair for random + // distribution partitions. BE then derives the bucket sequence owned by bucket_be_id from + // the location info and rotates within that sequence once per-tablet write volume exceeds + // the threshold (default 200 MB). This flag is set regardless of whether the initial + // partition list is empty, so auto-partition tables whose first partitions arrive at + // runtime still enter the correct mode from the start. + 25: optional bool enable_adaptive_random_bucket } struct THiveLocationParams { diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index a9fd978a9bd978..4b535896195906 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -279,13 +279,20 @@ struct TOlapTablePartition { 9: optional bool is_mutable = true // only used in List Partition 10: optional bool is_default_partition; - // only used in random distribution scenario to make data distributed even + // only used in random distribution scenario: + // - legacy mode: global round-robin index / fixed bucket index + // - adaptive random bucket mode: FE-selected starting bucket seq for this sink 11: optional i64 load_tablet_idx 12: optional i32 total_replica_num 13: optional i32 load_required_replica_num // tablet_id -> list of backend_ids that have version gaps (lastFailedVersion >= 0) // used by BE to exclude these backends from success counting in majority write 14: optional map> tablet_version_gap_backends + // only used in adaptive random bucket mode: FE-selected bucket owner BE for this sink + 15: optional i64 bucket_be_id + // only used in adaptive random bucket mode: FE-selected bucket seqs for this sink. + // When set, BE uses them directly and skips recomputing from tablet locations. + 16: optional list local_bucket_seqs } struct TOlapTablePartitionParam { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index b7111df7066908..a2e42af778d804 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1405,8 +1405,11 @@ struct TCreatePartitionRequest { 6: optional bool write_single_replica = false // query_id to identify the coordinator, if coordinator exists, it means this is a multi-instance load 7: optional Types.TUniqueId query_id - // Whether the caller's table sink is using load_to_single_tablet mode. + // Request-side sink mode. FE uses it to decide whether to populate + // TOlapTablePartition.load_tablet_idx in the result for runtime auto partitions. 8: optional bool load_to_single_tablet = false + // Whether the caller's table sink is using receiver-side adaptive random bucket routing. + 9: optional bool enable_adaptive_random_bucket = false } struct TCreatePartitionResult { @@ -1427,8 +1430,11 @@ struct TReplacePartitionRequest { 5: optional string be_endpoint 6: optional bool write_single_replica = false 7: optional Types.TUniqueId query_id - // Whether the caller's table sink is using load_to_single_tablet mode. + // Request-side sink mode. FE uses it to decide whether to populate + // TOlapTablePartition.load_tablet_idx in the result for runtime auto partitions. 8: optional bool load_to_single_tablet = false + // Whether the caller's table sink is using receiver-side adaptive random bucket routing. + 9: optional bool enable_adaptive_random_bucket = false } struct TReplacePartitionResult {