Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions be/src/cloud/cloud_delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include "cloud/cloud_storage_engine.h"
#include "cloud/config.h"
#include "load/delta_writer/delta_writer.h"
#include "load/memtable/memtable_memory_limiter.h"
#include "runtime/exec_env.h"
#include "runtime/thread_context.h"

namespace doris {
Expand Down Expand Up @@ -64,10 +66,22 @@ Status CloudDeltaWriter::batch_init(std::vector<CloudDeltaWriter*> writers) {
return cloud::bthread_fork_join(tasks, 10);
}

Status CloudDeltaWriter::write(const Block* block, const DorisVector<uint32_t>& row_idxs) {
Status CloudDeltaWriter::write(const Block* block, const DorisVector<uint32_t>& row_idxs,
bool* memtable_flushed) {
if (memtable_flushed != nullptr) {
*memtable_flushed = false;
}
if (row_idxs.empty()) [[unlikely]] {
return Status::OK();
}
if (_req.enable_table_memtable_backpressure) {
Comment thread
sollhui marked this conversation as resolved.
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_table_memtable_backpressure(
[this]() {
std::lock_guard lock(_mtx);
return _is_cancelled;
},
table_id());
}
std::lock_guard lock(_mtx);
CHECK(_is_init || _is_cancelled);
{
Expand All @@ -77,7 +91,7 @@ Status CloudDeltaWriter::write(const Block* block, const DorisVector<uint32_t>&
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
return _memtable_writer->write(block, row_idxs);
return _memtable_writer->write(block, row_idxs, memtable_flushed);
}

Status CloudDeltaWriter::close() {
Expand All @@ -86,6 +100,11 @@ Status CloudDeltaWriter::close() {
return _memtable_writer->close();
}

Status CloudDeltaWriter::flush_memtable_async() {
std::lock_guard lock(_mtx);
return BaseDeltaWriter::flush_memtable_async();
}

Status CloudDeltaWriter::cancel_with_status(const Status& st) {
std::lock_guard lock(_mtx);
return BaseDeltaWriter::cancel_with_status(st);
Expand Down
5 changes: 4 additions & 1 deletion be/src/cloud/cloud_delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,13 @@ class CloudDeltaWriter final : public BaseDeltaWriter {
const UniqueId& load_id);
~CloudDeltaWriter() override;

Status write(const Block* block, const DorisVector<uint32_t>& row_idxs) override;
Status write(const Block* block, const DorisVector<uint32_t>& row_idxs,
bool* memtable_flushed = nullptr) override;

Status close() override;

Status flush_memtable_async() override;

Status cancel_with_status(const Status& st) override;

Status build_rowset() override;
Expand Down
18 changes: 18 additions & 0 deletions be/src/cloud/cloud_tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,24 @@ Status CloudTabletsChannel::add_batch(const PTabletWriterAddBlockRequest& reques
return Status::OK();
}

if (request.is_receiver_side_random_bucket()) {
std::unordered_map<int64_t, DorisVector<uint32_t>> partition_to_rowidxs;
_build_partition_to_rowidxs_for_receiver_side_random_bucket(request, &partition_to_rowidxs);
if (!partition_to_rowidxs.empty()) {
std::unordered_set<int64_t> partition_ids;
partition_ids.reserve(partition_to_rowidxs.size());
for (const auto& [partition_id, _] : partition_to_rowidxs) {
partition_ids.insert(partition_id);
}
{
std::lock_guard<std::mutex> l(_tablet_writers_lock);
RETURN_IF_ERROR(_init_writers_by_partition_ids(partition_ids));
}
}
return _write_block_data_for_receiver_side_random_bucket(request, cur_seq,
partition_to_rowidxs, response);
}

std::unordered_map<int64_t, DorisVector<uint32_t>> tablet_to_rowidxs;
_build_tablet_to_rowidxs(request, &tablet_to_rowidxs);

Expand Down
7 changes: 7 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,8 @@ DEFINE_mInt32(memory_gc_sleep_time_ms, "500");
// max write buffer size before flush, default 200MB
DEFINE_mInt64(write_buffer_size, "209715200");
DEFINE_mBool(enable_adaptive_write_buffer_size, "true");
// Whether random bucket load rotates to the next local bucket when memtable flushes.
DEFINE_mBool(enable_adaptive_random_bucket_load_bucket_rotation, "true");
// max buffer size used in memtable for the aggregated table, default 400MB
DEFINE_mInt64(write_buffer_size_for_agg, "104857600");
DEFINE_mInt64(min_write_buffer_size_for_partial_update, "1048576");
Expand Down Expand Up @@ -858,6 +860,11 @@ DEFINE_mDouble(min_flush_thread_num_per_cpu, "0.5");
// Whether to enable adaptive flush thread adjustment
DEFINE_mBool(enable_adaptive_flush_threads, "true");

// Whether to block writes when one table has too many pending flush memtables on this BE.
DEFINE_mBool(enable_table_memtable_flush_backpressure, "true");
// Max pending flush memtables for one table on this BE before blocking new writes.
DEFINE_mInt32(table_memtable_flush_pending_count_limit, "10");

// config for tablet meta checkpoint
DEFINE_mInt32(tablet_meta_checkpoint_min_new_rowsets_num, "10");
DEFINE_mInt32(tablet_meta_checkpoint_min_interval_secs, "600");
Expand Down
7 changes: 7 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,8 @@ DECLARE_mInt32(memory_gc_sleep_time_ms);
// max write buffer size before flush, default 200MB
DECLARE_mInt64(write_buffer_size);
DECLARE_mBool(enable_adaptive_write_buffer_size);
// Whether random bucket load rotates to the next local bucket when memtable flushes.
DECLARE_mBool(enable_adaptive_random_bucket_load_bucket_rotation);
// max buffer size used in memtable for the aggregated table, default 400MB
DECLARE_mInt64(write_buffer_size_for_agg);

Expand Down Expand Up @@ -913,6 +915,11 @@ DECLARE_mDouble(min_flush_thread_num_per_cpu);
// Whether to enable adaptive flush thread adjustment
DECLARE_mBool(enable_adaptive_flush_threads);

// Whether to block writes when one table has too many pending flush memtables on this BE.
DECLARE_mBool(enable_table_memtable_flush_backpressure);
// Max pending flush memtables for one table on this BE before blocking new writes.
DECLARE_mInt32(table_memtable_flush_pending_count_limit);

// config for tablet meta checkpoint
DECLARE_mInt32(tablet_meta_checkpoint_min_new_rowsets_num);
DECLARE_mInt32(tablet_meta_checkpoint_min_interval_secs);
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/sink/delta_writer_v2_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "exec/sink/delta_writer_v2_pool.h"

#include "exec/sink/vtablet_finder.h"
#include "load/delta_writer/delta_writer_v2.h"
#include "runtime/runtime_profile.h"

Expand Down
20 changes: 15 additions & 5 deletions be/src/exec/sink/vrow_distribution.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,9 @@ void VRowDistribution::_filter_block_by_skip(Block* block, RowPartTabletIds& row
if (!_skip[i]) {
row_ids.emplace_back(i);
partition_ids.emplace_back(_partitions[i]->id);
tablet_ids.emplace_back(_tablet_ids[i]);
if (!_tablet_finder->is_adaptive_random_bucket()) {
tablet_ids.emplace_back(_tablet_ids[i]);
}
}
}
}
Expand All @@ -312,7 +314,9 @@ Status VRowDistribution::_filter_block_by_skip_and_where_clause(
if (nullable_column->get_bool_inline(i) && !_skip[i]) {
row_ids.emplace_back(i);
partition_ids.emplace_back(_partitions[i]->id);
tablet_ids.emplace_back(_tablet_ids[i]);
if (!_tablet_finder->is_adaptive_random_bucket()) {
tablet_ids.emplace_back(_tablet_ids[i]);
}
}
}
} else if (const auto* const_column = check_and_get_column<ColumnConst>(*filter_column)) {
Expand All @@ -331,7 +335,9 @@ Status VRowDistribution::_filter_block_by_skip_and_where_clause(
if (filter[i] != 0 && !_skip[i]) {
row_ids.emplace_back(i);
partition_ids.emplace_back(_partitions[i]->id);
tablet_ids.emplace_back(_tablet_ids[i]);
if (!_tablet_finder->is_adaptive_random_bucket()) {
tablet_ids.emplace_back(_tablet_ids[i]);
}
}
}
}
Expand All @@ -342,7 +348,9 @@ Status VRowDistribution::_filter_block_by_skip_and_where_clause(
Status VRowDistribution::_filter_block(Block* block,
std::vector<RowPartTabletIds>& row_part_tablet_ids) {
for (int i = 0; i < _schema->indexes().size(); i++) {
_get_tablet_ids(block, i, _tablet_ids);
if (!_tablet_finder->is_adaptive_random_bucket()) {
_get_tablet_ids(block, i, _tablet_ids);
}
auto& where_clause = _schema->indexes()[i]->where_clause;
if (where_clause != nullptr) {
RETURN_IF_ERROR(_filter_block_by_skip_and_where_clause(block, where_clause,
Expand Down Expand Up @@ -522,7 +530,9 @@ void VRowDistribution::_reset_row_part_tablet_ids(
// This is important for performance.
row_ids.reserve(rows);
partition_ids.reserve(rows);
tablet_ids.reserve(rows);
if (!_tablet_finder->is_adaptive_random_bucket()) {
tablet_ids.reserve(rows);
}
}
}

Expand Down
7 changes: 6 additions & 1 deletion be/src/exec/sink/vrow_distribution.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,12 @@ class RowPartTabletIds {
std::string value;
value.reserve(row_ids.size() * 15);
for (int i = 0; i < row_ids.size(); i++) {
value.append(fmt::format("[{}, {}, {}]", row_ids[i], partition_ids[i], tablet_ids[i]));
if (i < tablet_ids.size()) {
value.append(
fmt::format("[{}, {}, {}]", row_ids[i], partition_ids[i], tablet_ids[i]));
} else {
value.append(fmt::format("[{}, {}]", row_ids[i], partition_ids[i]));
}
}
return value;
}
Expand Down
78 changes: 77 additions & 1 deletion be/src/exec/sink/vtablet_finder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,89 @@
#include <gen_cpp/FrontendService_types.h>
#include <glog/logging.h>

#include <algorithm>
#include <string>
#include <utility>

#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "common/status.h"
#include "core/block/block.h"
#include "runtime/runtime_state.h"
#include "storage/tablet_info.h"

namespace doris {

void AdaptiveRandomBucketState::init_partition(int64_t partition_id,
const std::vector<int64_t>& tablets,
const std::vector<int32_t>& bucket_seqs,
int32_t start_tablet_idx) {
if (partition_id < 0 || tablets.empty()) {
return;
}
std::lock_guard<std::mutex> lock(_mutex);
if (_partition_states.contains(partition_id)) {
return;
Comment thread
sollhui marked this conversation as resolved.
}

PartitionState state;
state.partition_id = partition_id;
state.tablets = tablets;
state.bucket_seqs = bucket_seqs;
if (start_tablet_idx >= 0 && start_tablet_idx < state.tablets.size()) {
state.tablet_pos = start_tablet_idx;
}
state.current_tablet_id = state.tablets[state.tablet_pos];

for (int32_t tablet_pos = 0; tablet_pos < state.tablets.size(); ++tablet_pos) {
_tablet_to_partition[state.tablets[tablet_pos]] = partition_id;
_tablet_to_bucket[state.tablets[tablet_pos]] = tablet_pos;
}
_partition_states.emplace(partition_id, std::move(state));
LOG(INFO) << "FIND_TABLET_RANDOM_BUCKET: load_id=" << _load_id << ", partition=" << partition_id
<< ", local tablet count=" << tablets.size()
<< ", start tablet=" << _partition_states.at(partition_id).current_tablet_id;
}

int64_t AdaptiveRandomBucketState::current_tablet(int64_t partition_id) {
std::lock_guard<std::mutex> lock(_mutex);
auto it = _partition_states.find(partition_id);
if (it == _partition_states.end()) {
return -1;
}
return it->second.current_tablet_id;
}

void AdaptiveRandomBucketState::rotate_by_tablet(int64_t tablet_id) {
if (!config::enable_adaptive_random_bucket_load_bucket_rotation) {
return;
}
std::lock_guard<std::mutex> lock(_mutex);
auto partition_it = _tablet_to_partition.find(tablet_id);
if (partition_it == _tablet_to_partition.end()) {
return;
}
auto state_it = _partition_states.find(partition_it->second);
if (state_it == _partition_states.end()) {
return;
}
auto bucket_it = _tablet_to_bucket.find(tablet_id);
if (bucket_it == _tablet_to_bucket.end()) {
return;
}
auto& state = state_it->second;
if (bucket_it->second != state.tablet_pos) {
return;
}
int32_t next_pos = (state.tablet_pos + 1) % static_cast<int32_t>(state.tablets.size());
LOG(INFO) << "FIND_TABLET_RANDOM_BUCKET: load_id=" << _load_id
<< ", partition=" << state.partition_id << " shared rotate tablet "
<< state.current_tablet_id << " -> " << state.tablets[next_pos]
<< " after tablet=" << tablet_id << " memtable flushed";
state.tablet_pos = next_pos;
state.current_tablet_id = state.tablets[next_pos];
}

Status OlapTabletFinder::find_tablets(RuntimeState* state, Block* block, int rows,
std::vector<VOlapTablePartition*>& partitions,
std::vector<uint32_t>& tablet_index, std::vector<bool>& skip,
Expand Down Expand Up @@ -82,8 +155,11 @@ Status OlapTabletFinder::find_tablets(RuntimeState* state, Block* block, int row

if (_find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_ROW) {
_vpartition->find_tablets(block, qualified_rows, partitions, tablet_index);
} else if (_find_tablet_mode == FindTabletMode::FIND_TABLET_RANDOM_BUCKET) {
// Receiver-side random bucket mode only needs partition ids on sender side.
// The receiver decides the concrete tablet from its local ordered tablet list.
} else {
// for random distribution
// FIND_TABLET_EVERY_BATCH / FIND_TABLET_EVERY_SINK
_vpartition->find_tablets(block, qualified_rows, partitions, tablet_index,
&_partition_to_tablet_map);
if (_find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_BATCH) {
Expand Down
47 changes: 45 additions & 2 deletions be/src/exec/sink/vtablet_finder.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,45 @@

#include <cstdint>
#include <map>
#include <memory>
#include <mutex>
#include <unordered_map>
#include <vector>

#include "common/status.h"
#include "core/block/block.h"
#include "exec/common/hash_table/phmap_fwd_decl.h"
#include "storage/tablet_info.h"
#include "util/bitmap.h"
#include "util/uid_util.h"

namespace doris {

class AdaptiveRandomBucketState {
public:
explicit AdaptiveRandomBucketState(UniqueId load_id) : _load_id(load_id) {}

void init_partition(int64_t partition_id, const std::vector<int64_t>& tablets,
const std::vector<int32_t>& bucket_seqs, int32_t start_tablet_idx);
int64_t current_tablet(int64_t partition_id);
void rotate_by_tablet(int64_t tablet_id);

private:
struct PartitionState {
int64_t partition_id = -1;
std::vector<int64_t> tablets;
std::vector<int32_t> bucket_seqs;
int32_t tablet_pos = 0;
int64_t current_tablet_id = -1;
};

std::mutex _mutex;
UniqueId _load_id;
std::unordered_map<int64_t, PartitionState> _partition_states;
std::unordered_map<int64_t, int64_t> _tablet_to_partition;
std::unordered_map<int64_t, int32_t> _tablet_to_bucket;
};

class OlapTabletFinder {
public:
// FIND_TABLET_EVERY_ROW is used for hash distribution info, which indicates that we
Expand All @@ -37,7 +67,13 @@ class OlapTabletFinder {
// FIND_TABLET_EVERY_SINK is used for random distribution info when load_to_single_tablet set to true,
// which indicates that we should only compute tablet index in the corresponding partition once for the
// whole time in olap table sink
enum FindTabletMode { FIND_TABLET_EVERY_ROW, FIND_TABLET_EVERY_BATCH, FIND_TABLET_EVERY_SINK };
// FIND_TABLET_RANDOM_BUCKET is used for V1 receiver-side random bucket mode.
enum FindTabletMode {
FIND_TABLET_EVERY_ROW,
FIND_TABLET_EVERY_BATCH,
FIND_TABLET_EVERY_SINK,
FIND_TABLET_RANDOM_BUCKET
};

OlapTabletFinder(VOlapTablePartitionParam* vpartition, FindTabletMode mode)
: _vpartition(vpartition), _find_tablet_mode(mode), _filter_bitmap(1024) {};
Expand All @@ -51,7 +87,14 @@ class OlapTabletFinder {
return _find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_SINK;
}

bool is_single_tablet() { return _partition_to_tablet_map.size() == 1; }
bool is_adaptive_random_bucket() const {
return _find_tablet_mode == FindTabletMode::FIND_TABLET_RANDOM_BUCKET;
}

bool is_single_tablet() {
return _find_tablet_mode != FindTabletMode::FIND_TABLET_RANDOM_BUCKET &&
_partition_to_tablet_map.size() == 1;
}

// all partitions for multi find-processes of its relative writer.
const flat_hash_set<int64_t>& partition_ids() { return _partition_ids; }
Expand Down
Loading
Loading