Skip to content

Commit e6fa997

Browse files
authored
branch-4.0: [enhance](load) only set brokerLoadBatchSize when enableMemtableOnSinkNode is true (#63801)
pick #60301
1 parent 437c850 commit e6fa997

3 files changed

Lines changed: 68 additions & 1 deletion

File tree

fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.doris.load.loadv2;
1919

2020
import org.apache.doris.analysis.BrokerDesc;
21+
import org.apache.doris.analysis.IndexDef;
2122
import org.apache.doris.analysis.StorageBackend;
2223
import org.apache.doris.analysis.UserIdentity;
2324
import org.apache.doris.catalog.Database;
@@ -307,6 +308,13 @@ private void createLoadingTask(Database db, BrokerPendingTaskAttachment attachme
307308
}
308309
boolean isEnableMemtableOnSinkNode =
309310
table.getTableProperty().getUseSchemaLightChange() && this.enableMemTableOnSinkNode;
311+
boolean hasInvertedIndexV1 = table.hasIndexOfType(IndexDef.IndexType.INVERTED)
312+
&& table.getInvertedIndexFileStorageFormat()
313+
== org.apache.doris.thrift.TInvertedIndexFileStorageFormat.V1;
314+
if (isPartialUpdate() || hasInvertedIndexV1 || Config.isCloudMode()) {
315+
isEnableMemtableOnSinkNode = false;
316+
}
317+
310318
// Generate loading task and init the plan of task
311319
LoadLoadingTask task = createTask(db, table, brokerFileGroups,
312320
isEnableMemtableOnSinkNode, batchSize, aggKey, attachment);

fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,9 @@ protected void executeOnce() throws Exception {
171171
curCoordinator.setExecMemoryLimit(execMemLimit);
172172

173173
curCoordinator.setMemTableOnSinkNode(enableMemTableOnSinkNode);
174-
curCoordinator.setBatchSize(batchSize);
174+
if (enableMemTableOnSinkNode) {
175+
curCoordinator.setBatchSize(batchSize);
176+
}
175177

176178
long leftTimeMs = getLeftTimeMs();
177179
if (leftTimeMs <= 0) {
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.load.loadv2;
19+
20+
import org.apache.doris.thrift.TQueryOptions;
21+
22+
import org.junit.Assert;
23+
import org.junit.Test;
24+
25+
public class LoadLoadingTaskTest {
26+
27+
/**
28+
* Test the batch size setting logic in LoadLoadingTask.executeOnce().
29+
* When enableMemTableOnSinkNode = true, setBatchSize() should be called.
30+
* When enableMemTableOnSinkNode = false, setBatchSize() should NOT be called.
31+
*/
32+
@Test
33+
public void testBatchSizeSettingLogic() {
34+
int brokerLoadBatchSize = 16352;
35+
36+
// Case 1: enableMemTableOnSinkNode = true, setBatchSize should be called
37+
TQueryOptions queryOptionsWithMemTable = new TQueryOptions();
38+
boolean enableMemTableOnSinkNode1 = true;
39+
queryOptionsWithMemTable.setEnableMemtableOnSinkNode(enableMemTableOnSinkNode1);
40+
if (enableMemTableOnSinkNode1) {
41+
queryOptionsWithMemTable.setBatchSize(brokerLoadBatchSize);
42+
}
43+
Assert.assertTrue(queryOptionsWithMemTable.isEnableMemtableOnSinkNode());
44+
Assert.assertEquals(brokerLoadBatchSize, queryOptionsWithMemTable.getBatchSize());
45+
46+
// Case 2: enableMemTableOnSinkNode = false, setBatchSize should NOT be called
47+
TQueryOptions queryOptionsWithoutMemTable = new TQueryOptions();
48+
boolean enableMemTableOnSinkNode2 = false;
49+
queryOptionsWithoutMemTable.setEnableMemtableOnSinkNode(enableMemTableOnSinkNode2);
50+
if (enableMemTableOnSinkNode2) {
51+
queryOptionsWithoutMemTable.setBatchSize(brokerLoadBatchSize);
52+
}
53+
Assert.assertFalse(queryOptionsWithoutMemTable.isEnableMemtableOnSinkNode());
54+
// batch_size should remain 0 (unset), BE will use DEFAULT_BATCH_SIZE (4062)
55+
Assert.assertEquals(0, queryOptionsWithoutMemTable.getBatchSize());
56+
}
57+
}

0 commit comments

Comments
 (0)