Skip to content

Commit 14794be

Browse files
committed
[improvement](be) Add table-level memtable flush backpressure
### What problem does this PR solve? Issue Number: None Related PR: #62661 Problem Summary: Adaptive random bucket load can rotate writes across multiple tablets on the same BE. When one table generates too many memtables faster than flush can drain them, new write batches should be throttled at table granularity instead of only relying on per-writer or process memory limits. This change tracks pending WRITE_FINISHED/FLUSH memtables by table id in MemTableMemoryLimiter and blocks new DeltaWriter writes when the per-BE table pending count reaches the configured limit. ### Release note Add BE configs enable_table_memtable_flush_backpressure and table_memtable_flush_pending_count_limit to throttle load writes when one table has too many pending flush memtables on a BE. ### Check List (For Author) - Test: Manual test - git diff --check - build-support/clang-format.sh attempted but failed because llvm@16 is not installed in this environment - ./build.sh --be attempted but failed during CMake configuration before Doris source compilation because OpenMP_C was not found for OpenBLAS - Behavior changed: Yes. When enable_table_memtable_flush_backpressure is true, writes to a table block once its pending flush memtable count on a BE reaches table_memtable_flush_pending_count_limit. - Does this need documentation: No
1 parent 629cc31 commit 14794be

11 files changed

Lines changed: 125 additions & 0 deletions

be/src/cloud/cloud_delta_writer.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
#include "cloud/cloud_storage_engine.h"
2323
#include "cloud/config.h"
2424
#include "load/delta_writer/delta_writer.h"
25+
#include "load/memtable/memtable_memory_limiter.h"
26+
#include "runtime/exec_env.h"
2527
#include "runtime/thread_context.h"
2628

2729
namespace doris {
@@ -72,6 +74,8 @@ Status CloudDeltaWriter::write(const Block* block, const DorisVector<uint32_t>&
7274
if (row_idxs.empty()) [[unlikely]] {
7375
return Status::OK();
7476
}
77+
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_table_memtable_backpressure(
78+
nullptr, table_id());
7579
std::lock_guard lock(_mtx);
7680
CHECK(_is_init || _is_cancelled);
7781
{

be/src/common/config.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -842,6 +842,10 @@ DEFINE_mBool(enable_adaptive_flush_threads, "true");
842842

843843
// Whether to enable queue-based backpressure in MemTableMemoryLimiter.
844844
DEFINE_mBool(enable_memtable_flush_queue_backpressure, "false");
845+
// Whether to block writes when one table has too many pending flush memtables on this BE.
846+
DEFINE_mBool(enable_table_memtable_flush_backpressure, "true");
847+
// Max pending flush memtables for one table on this BE before blocking new writes.
848+
DEFINE_mInt32(table_memtable_flush_pending_count_limit, "10");
845849

846850
// config for tablet meta checkpoint
847851
DEFINE_mInt32(tablet_meta_checkpoint_min_new_rowsets_num, "10");

be/src/common/config.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -900,6 +900,10 @@ DECLARE_mBool(enable_adaptive_flush_threads);
900900
// When enabled, writes will be throttled if the workload group's flush pool
901901
// queue size exceeds the threshold, even if memory limits are not reached.
902902
DECLARE_mBool(enable_memtable_flush_queue_backpressure);
903+
// Whether to block writes when one table has too many pending flush memtables on this BE.
904+
DECLARE_mBool(enable_table_memtable_flush_backpressure);
905+
// Max pending flush memtables for one table on this BE before blocking new writes.
906+
DECLARE_mInt32(table_memtable_flush_pending_count_limit);
903907

904908
// config for tablet meta checkpoint
905909
DECLARE_mInt32(tablet_meta_checkpoint_min_new_rowsets_num);

be/src/load/delta_writer/delta_writer.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
#include "core/block/block.h"
3737
#include "io/fs/file_writer.h" // IWYU pragma: keep
3838
#include "load/memtable/memtable_flush_executor.h"
39+
#include "load/memtable/memtable_memory_limiter.h"
3940
#include "runtime/exec_env.h"
4041
#include "runtime/thread_context.h"
4142
#include "service/backend_options.h"
@@ -124,6 +125,11 @@ void BaseDeltaWriter::set_tablet_load_rowset_num_info(
124125
collect_tablet_load_rowset_num_info(tablet, tablet_infos);
125126
}
126127

128+
int64_t BaseDeltaWriter::table_id() const {
129+
DORIS_CHECK(_req.table_schema_param != nullptr);
130+
return _req.table_schema_param->table_id();
131+
}
132+
127133
DeltaWriter::~DeltaWriter() = default;
128134

129135
Status BaseDeltaWriter::init() {
@@ -152,6 +158,8 @@ Status DeltaWriter::write(const Block* block, const DorisVector<uint32_t>& row_i
152158
if (UNLIKELY(row_idxs.empty())) {
153159
return Status::OK();
154160
}
161+
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_table_memtable_backpressure(
162+
nullptr, table_id());
155163
_lock_watch.start();
156164
std::lock_guard<std::mutex> l(_lock);
157165
_lock_watch.stop();

be/src/load/delta_writer/delta_writer.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ class BaseDeltaWriter {
8686

8787
int64_t partition_id() const { return _req.partition_id; }
8888

89+
int64_t table_id() const;
90+
8991
int64_t tablet_id() const { return _req.tablet_id; }
9092

9193
int64_t txn_id() const { return _req.txn_id; }

be/src/load/delta_writer/delta_writer_v2.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
#include "core/block/block.h"
3636
#include "exec/sink/load_stream_stub.h"
3737
#include "io/fs/file_writer.h" // IWYU pragma: keep
38+
#include "load/memtable/memtable_memory_limiter.h"
3839
#include "runtime/exec_env.h"
3940
#include "runtime/query_context.h"
4041
#include "service/backend_options.h"
@@ -93,6 +94,11 @@ DeltaWriterV2::~DeltaWriterV2() {
9394
static_cast<void>(_memtable_writer->cancel());
9495
}
9596

97+
int64_t DeltaWriterV2::_table_id() const {
98+
DORIS_CHECK(_req.table_schema_param != nullptr);
99+
return _req.table_schema_param->table_id();
100+
}
101+
96102
Status DeltaWriterV2::init() {
97103
if (_is_init) {
98104
return Status::OK();
@@ -148,6 +154,8 @@ Status DeltaWriterV2::write(const Block* block, const DorisVector<uint32_t>& row
148154
if (UNLIKELY(row_idxs.empty())) {
149155
return Status::OK();
150156
}
157+
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_table_memtable_backpressure(
158+
[state = _state]() { return state->is_cancelled(); }, _table_id());
151159
_lock_watch.start();
152160
std::lock_guard<std::mutex> l(_lock);
153161
_lock_watch.stop();

be/src/load/delta_writer/delta_writer_v2.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ class DeltaWriterV2 {
8686
Status _build_current_tablet_schema(int64_t index_id,
8787
const OlapTableSchemaParam* table_schema_param,
8888
const TabletSchema& ori_tablet_schema);
89+
int64_t _table_id() const;
8990

9091
void _update_profile(RuntimeProfile* profile);
9192

be/src/load/memtable/memtable_memory_limiter.cpp

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,12 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(memtable_memory_limiter_mem_consumption, Metr
3636

3737
bvar::LatencyRecorder g_memtable_memory_limit_latency_ms("mm_limiter_limit_time_ms");
3838
bvar::Adder<int> g_memtable_memory_limit_waiting_threads("mm_limiter_waiting_threads");
39+
bvar::LatencyRecorder g_memtable_table_backpressure_latency_ms(
40+
"mm_limiter_table_backpressure_time_ms");
41+
bvar::Adder<int> g_memtable_table_backpressure_waiting_threads(
42+
"mm_limiter_table_backpressure_waiting_threads");
43+
bvar::Status<int64_t> g_memtable_table_backpressure_pending_count(
44+
"mm_limiter_table_backpressure_pending_count", 0);
3945
bvar::Status<int64_t> g_memtable_active_memory("mm_limiter_mem_active", 0);
4046
bvar::Status<int64_t> g_memtable_write_memory("mm_limiter_mem_write", 0);
4147
bvar::Status<int64_t> g_memtable_flush_memory("mm_limiter_mem_flush", 0);
@@ -123,6 +129,68 @@ int64_t MemTableMemoryLimiter::_need_flush() {
123129
return need_flush - _queue_mem_usage - _flush_mem_usage;
124130
}
125131

132+
int64_t MemTableMemoryLimiter::_table_flush_pending_memtable_count(int64_t table_id) {
133+
int64_t pending_memtables = 0;
134+
for (const auto& writer : _writers) {
135+
auto writer_sptr = writer.lock();
136+
if (writer_sptr == nullptr) {
137+
continue;
138+
}
139+
if (writer_sptr->table_id() == table_id) {
140+
pending_memtables += writer_sptr->flush_pending_memtable_count();
141+
}
142+
}
143+
return pending_memtables;
144+
}
145+
146+
void MemTableMemoryLimiter::handle_table_memtable_backpressure(
147+
std::function<bool()> cancel_check, int64_t table_id) {
148+
if (!config::enable_table_memtable_flush_backpressure) {
149+
return;
150+
}
151+
const int64_t pending_count_limit = config::table_memtable_flush_pending_count_limit;
152+
if (pending_count_limit <= 0) {
153+
return;
154+
}
155+
DORIS_CHECK(table_id > 0);
156+
157+
std::unique_lock<std::mutex> l(_lock);
158+
int64_t pending_count = _table_flush_pending_memtable_count(table_id);
159+
if (pending_count < pending_count_limit) {
160+
return;
161+
}
162+
163+
MonotonicStopWatch timer;
164+
timer.start();
165+
g_memtable_table_backpressure_waiting_threads << 1;
166+
while (pending_count >= pending_count_limit) {
167+
g_memtable_table_backpressure_pending_count.set_value(pending_count);
168+
LOG_EVERY_T(INFO, 1) << "table memtable flush backpressure: table_id=" << table_id
169+
<< ", pending_memtables=" << pending_count
170+
<< ", limit=" << pending_count_limit
171+
<< ", memtable writers num: " << _writers.size();
172+
if (cancel_check && cancel_check()) {
173+
LOG(INFO) << "cancelled when waiting for table memtable flush backpressure"
174+
<< ", table_id=" << table_id << ", pending_memtables=" << pending_count
175+
<< ", limit=" << pending_count_limit;
176+
g_memtable_table_backpressure_waiting_threads << -1;
177+
return;
178+
}
179+
static_cast<void>(_hard_limit_end_cond.wait_for(l, std::chrono::milliseconds(100)));
180+
pending_count = _table_flush_pending_memtable_count(table_id);
181+
}
182+
g_memtable_table_backpressure_pending_count.set_value(pending_count);
183+
g_memtable_table_backpressure_waiting_threads << -1;
184+
timer.stop();
185+
int64_t time_ms = timer.elapsed_time() / 1000 / 1000;
186+
g_memtable_table_backpressure_latency_ms << time_ms;
187+
LOG(INFO) << "waited " << PrettyPrinter::print(timer.elapsed_time(), TUnit::TIME_NS)
188+
<< " for table memtable flush backpressure"
189+
<< ", table_id=" << table_id << ", pending_memtables=" << pending_count
190+
<< ", limit=" << pending_count_limit
191+
<< ", memtable writers num: " << _writers.size();
192+
}
193+
126194
void MemTableMemoryLimiter::handle_memtable_flush(std::function<bool()> cancel_check,
127195
WorkloadGroup* wg) {
128196
ThreadPool* wg_flush_pool = (wg != nullptr) ? wg->get_memtable_flush_pool() : nullptr;

be/src/load/memtable/memtable_memory_limiter.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ class MemTableMemoryLimiter {
4545
// Every write operation will call this API to check if need flush memtable OR hang
4646
// when memory is not available.
4747
void handle_memtable_flush(std::function<bool()> cancel_check, WorkloadGroup* wg = nullptr);
48+
void handle_table_memtable_backpressure(std::function<bool()> cancel_check, int64_t table_id);
4849

4950
void register_writer(std::weak_ptr<MemTableWriter> writer);
5051

@@ -64,6 +65,7 @@ class MemTableMemoryLimiter {
6465
bool _hard_limit_reached();
6566
bool _load_usage_low();
6667
int64_t _need_flush();
68+
int64_t _table_flush_pending_memtable_count(int64_t table_id);
6769
int64_t _flush_active_memtables(int64_t need_flush);
6870
void _refresh_mem_tracker();
6971
std::mutex _lock;

be/src/load/memtable/memtable_writer.cpp

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,27 @@ uint64_t MemTableWriter::flush_running_count() const {
381381
return _flush_token == nullptr ? 0 : _flush_token->get_stats().flush_running_count.load();
382382
}
383383

384+
int64_t MemTableWriter::table_id() const {
385+
DORIS_CHECK(_req.table_schema_param != nullptr);
386+
return _req.table_schema_param->table_id();
387+
}
388+
389+
int64_t MemTableWriter::flush_pending_memtable_count() {
390+
std::lock_guard<std::mutex> l(_mem_table_ptr_lock);
391+
int64_t memtable_count = 0;
392+
for (const auto& mem_table : _freezed_mem_tables) {
393+
auto mem_table_sptr = mem_table.lock();
394+
if (mem_table_sptr == nullptr) {
395+
continue;
396+
}
397+
auto mem_type = mem_table_sptr->get_mem_type();
398+
if (mem_type == MemType::WRITE_FINISHED || mem_type == MemType::FLUSH) {
399+
memtable_count++;
400+
}
401+
}
402+
return memtable_count;
403+
}
404+
384405
int64_t MemTableWriter::mem_consumption(MemType mem) {
385406
if (!_is_init) {
386407
// This method may be called before this writer is initialized.

0 commit comments

Comments
 (0)