Skip to content

Commit ef4582d

Browse files
authored
branch-4.0: [fix](load) fix load_to_single_tablet routing for auto partition (#64615)
pick #64356
1 parent 78454d9 commit ef4582d

5 files changed

Lines changed: 151 additions & 10 deletions

File tree

be/src/vec/sink/vrow_distribution.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ Status VRowDistribution::automatic_create_partition() {
112112
request.__set_partitionValues(_partitions_need_create);
113113
request.__set_be_endpoint(be_endpoint);
114114
request.__set_write_single_replica(_write_single_replica);
115+
request.__set_load_to_single_tablet(_tablet_finder->is_find_tablet_every_sink());
115116
if (_state && _state->get_query_ctx()) {
116117
// Pass query_id to FE so it can determine if this is a multi-instance load by checking Coordinator
117118
request.__set_query_id(_state->get_query_ctx()->query_id());
@@ -208,6 +209,7 @@ Status VRowDistribution::_replace_overwriting_partition() {
208209

209210
std::string be_endpoint = BackendOptions::get_be_endpoint();
210211
request.__set_be_endpoint(be_endpoint);
212+
request.__set_load_to_single_tablet(_tablet_finder->is_find_tablet_every_sink());
211213
if (_state && _state->get_query_ctx()) {
212214
// Pass query_id to FE so it can determine if this is a multi-instance load by checking Coordinator
213215
request.__set_query_id(_state->get_query_ctx()->query_id());

fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java

Lines changed: 56 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.doris.catalog.Column;
3939
import org.apache.doris.catalog.Database;
4040
import org.apache.doris.catalog.DatabaseIf;
41+
import org.apache.doris.catalog.DistributionInfo;
4142
import org.apache.doris.catalog.Env;
4243
import org.apache.doris.catalog.MaterializedIndex;
4344
import org.apache.doris.catalog.OlapTable;
@@ -340,6 +341,7 @@
340341
import java.util.concurrent.TimeUnit;
341342
import java.util.concurrent.TimeoutException;
342343
import java.util.concurrent.atomic.AtomicInteger;
344+
import java.util.concurrent.atomic.AtomicLong;
343345
import java.util.concurrent.locks.ReentrantLock;
344346
import java.util.stream.Collectors;
345347

@@ -3856,6 +3858,7 @@ public TCreatePartitionResult createPartition(TCreatePartitionRequest request) t
38563858
List<TTabletLocation> tablets = new ArrayList<>();
38573859
List<TTabletLocation> slaveTablets = new ArrayList<>();
38583860
List<TOlapTablePartition> partitions = Lists.newArrayList();
3861+
boolean loadToSingleTablet = request.isSetLoadToSingleTablet() && request.isLoadToSingleTablet();
38593862
for (String partitionName : addPartitionClauseMap.keySet()) {
38603863
Partition partition = table.getPartition(partitionName);
38613864
// For thread safety, we preserve the tablet distribution information of each partition
@@ -3879,17 +3882,36 @@ public TCreatePartitionResult createPartition(TCreatePartitionRequest request) t
38793882
tPartition.setNumBuckets(index.getTablets().size());
38803883
}
38813884
tPartition.setIsMutable(olapTable.getPartitionInfo().getIsMutable(partition.getId()));
3885+
boolean randomDistribution =
3886+
partition.getDistributionInfo().getType() == DistributionInfo.DistributionInfoType.RANDOM;
3887+
boolean cacheLoadTabletIdx = loadToSingleTablet && randomDistribution;
38823888
partitions.add(tPartition);
38833889
// tablet
3890+
AtomicLong cachedLoadTabletIdx = new AtomicLong(-1);
38843891
if (needUseCache
38853892
&& Env.getCurrentGlobalTransactionMgr().getAutoPartitionCacheMgr()
38863893
.getAutoPartitionInfo(txnId, partition.getId(), partitionTablets,
3887-
partitionSlaveTablets)) {
3894+
partitionSlaveTablets, cachedLoadTabletIdx)) {
3895+
if (cacheLoadTabletIdx) {
3896+
tPartition.setLoadTabletIdx(cachedLoadTabletIdx.get());
3897+
}
38883898
// fast path, if cached
38893899
tablets.addAll(partitionTablets);
38903900
slaveTablets.addAll(partitionSlaveTablets);
38913901
continue;
38923902
}
3903+
if (cacheLoadTabletIdx) {
3904+
try {
3905+
int tabletIndex = Env.getCurrentEnv().getTabletLoadIndexRecorderMgr()
3906+
.getCurrentTabletLoadIndex(dbId, olapTable.getId(), partition);
3907+
tPartition.setLoadTabletIdx(tabletIndex);
3908+
} catch (UserException ex) {
3909+
errorStatus.setErrorMsgs(Lists.newArrayList(ex.getMessage()));
3910+
result.setStatus(errorStatus);
3911+
LOG.warn("send create partition error status: {}", result);
3912+
return result;
3913+
}
3914+
}
38933915
int quorum = olapTable.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum() / 2
38943916
+ 1;
38953917
for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
@@ -3953,9 +3975,13 @@ public TCreatePartitionResult createPartition(TCreatePartitionRequest request) t
39533975
}
39543976

39553977
if (needUseCache) {
3956-
Env.getCurrentGlobalTransactionMgr().getAutoPartitionCacheMgr()
3978+
long loadTabletIdx = cacheLoadTabletIdx ? tPartition.getLoadTabletIdx() : -1;
3979+
long cachedTabletIdx = Env.getCurrentGlobalTransactionMgr().getAutoPartitionCacheMgr()
39573980
.getOrSetAutoPartitionInfo(txnId, partition.getId(), partitionTablets,
3958-
partitionSlaveTablets);
3981+
partitionSlaveTablets, loadTabletIdx);
3982+
if (cacheLoadTabletIdx) {
3983+
tPartition.setLoadTabletIdx(cachedTabletIdx);
3984+
}
39593985
}
39603986

39613987
tablets.addAll(partitionTablets);
@@ -4173,6 +4199,7 @@ public TReplacePartitionResult replacePartition(TReplacePartitionRequest request
41734199
List<TTabletLocation> tablets = new ArrayList<>();
41744200
List<TTabletLocation> slaveTablets = new ArrayList<>();
41754201
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
4202+
boolean loadToSingleTablet = request.isSetLoadToSingleTablet() && request.isLoadToSingleTablet();
41764203
for (long partitionId : resultPartitionIds) {
41774204
Partition partition = olapTable.getPartition(partitionId);
41784205
// For thread safety, we preserve the tablet distribution information of each partition
@@ -4198,17 +4225,36 @@ public TReplacePartitionResult replacePartition(TReplacePartitionRequest request
41984225
tPartition.setNumBuckets(index.getTablets().size());
41994226
}
42004227
tPartition.setIsMutable(olapTable.getPartitionInfo().getIsMutable(partition.getId()));
4228+
boolean randomDistribution =
4229+
partition.getDistributionInfo().getType() == DistributionInfo.DistributionInfoType.RANDOM;
4230+
boolean cacheLoadTabletIdx = loadToSingleTablet && randomDistribution;
42014231
partitions.add(tPartition);
42024232
// tablet
4233+
AtomicLong cachedLoadTabletIdx = new AtomicLong(-1);
42034234
if (needUseCache && txnId != 0
42044235
&& Env.getCurrentGlobalTransactionMgr().getAutoPartitionCacheMgr()
42054236
.getAutoPartitionInfo(txnId, partition.getId(), partitionTablets,
4206-
partitionSlaveTablets)) {
4237+
partitionSlaveTablets, cachedLoadTabletIdx)) {
4238+
if (cacheLoadTabletIdx) {
4239+
tPartition.setLoadTabletIdx(cachedLoadTabletIdx.get());
4240+
}
42074241
// fast path, if cached
42084242
tablets.addAll(partitionTablets);
42094243
slaveTablets.addAll(partitionSlaveTablets);
42104244
continue;
42114245
}
4246+
if (cacheLoadTabletIdx) {
4247+
try {
4248+
int tabletIndex = Env.getCurrentEnv().getTabletLoadIndexRecorderMgr()
4249+
.getCurrentTabletLoadIndex(dbId, olapTable.getId(), partition);
4250+
tPartition.setLoadTabletIdx(tabletIndex);
4251+
} catch (UserException ex) {
4252+
errorStatus.setErrorMsgs(Lists.newArrayList(ex.getMessage()));
4253+
result.setStatus(errorStatus);
4254+
LOG.warn("send replace partition error status: {}", result);
4255+
return result;
4256+
}
4257+
}
42124258
int quorum = olapTable.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum() / 2
42134259
+ 1;
42144260
for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
@@ -4276,9 +4322,13 @@ public TReplacePartitionResult replacePartition(TReplacePartitionRequest request
42764322
}
42774323

42784324
if (needUseCache) {
4279-
Env.getCurrentGlobalTransactionMgr().getAutoPartitionCacheMgr()
4325+
long loadTabletIdx = cacheLoadTabletIdx ? tPartition.getLoadTabletIdx() : -1;
4326+
long cachedTabletIdx = Env.getCurrentGlobalTransactionMgr().getAutoPartitionCacheMgr()
42804327
.getOrSetAutoPartitionInfo(txnId, partition.getId(), partitionTablets,
4281-
partitionSlaveTablets);
4328+
partitionSlaveTablets, loadTabletIdx);
4329+
if (cacheLoadTabletIdx) {
4330+
tPartition.setLoadTabletIdx(cachedTabletIdx);
4331+
}
42824332
if (LOG.isDebugEnabled()) {
42834333
LOG.debug("Cache auto partition info, txnId: {}, partitionId: {}, "
42844334
+ "tablets: {}, slaveTablets: {}", txnId, partition.getId(),

fe/fe-core/src/main/java/org/apache/doris/transaction/AutoPartitionCacheManager.java

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.List;
2727
import java.util.concurrent.ConcurrentHashMap;
2828
import java.util.concurrent.atomic.AtomicBoolean;
29+
import java.util.concurrent.atomic.AtomicLong;
2930

3031
/*
3132
** this class AutoPartitionCacheManager is used for solve the follow question :
@@ -59,10 +60,17 @@ public class AutoPartitionCacheManager {
5960
public static class PartitionTabletCache {
6061
public final List<TTabletLocation> tablets;
6162
public final List<TTabletLocation> slaveTablets;
63+
public final long loadTabletIdx;
6264

6365
public PartitionTabletCache(List<TTabletLocation> tablets, List<TTabletLocation> slaveTablets) {
66+
this(tablets, slaveTablets, -1);
67+
}
68+
69+
public PartitionTabletCache(List<TTabletLocation> tablets, List<TTabletLocation> slaveTablets,
70+
long loadTabletIdx) {
6471
this.tablets = tablets;
6572
this.slaveTablets = slaveTablets;
73+
this.loadTabletIdx = loadTabletIdx;
6674
}
6775
}
6876

@@ -73,6 +81,14 @@ public PartitionTabletCache(List<TTabletLocation> tablets, List<TTabletLocation>
7381
// return true if cached, else false, this function only read cache
7482
public boolean getAutoPartitionInfo(Long txnId, Long partitionId,
7583
List<TTabletLocation> partitionTablets, List<TTabletLocation> partitionSlaveTablets) {
84+
return getAutoPartitionInfo(txnId, partitionId, partitionTablets, partitionSlaveTablets,
85+
new AtomicLong(-1));
86+
}
87+
88+
// return true if cached, else false, this function only read cache
89+
public boolean getAutoPartitionInfo(Long txnId, Long partitionId,
90+
List<TTabletLocation> partitionTablets, List<TTabletLocation> partitionSlaveTablets,
91+
AtomicLong loadTabletIdx) {
7692
ConcurrentHashMap<Long, PartitionTabletCache> partitionMap = autoPartitionInfo.get(txnId);
7793
if (partitionMap == null) {
7894
return false;
@@ -87,11 +103,18 @@ public boolean getAutoPartitionInfo(Long txnId, Long partitionId,
87103
partitionTablets.addAll(cached.tablets);
88104
partitionSlaveTablets.clear();
89105
partitionSlaveTablets.addAll(cached.slaveTablets);
106+
loadTabletIdx.set(cached.loadTabletIdx);
90107
return true;
91108
}
92109

93110
public void getOrSetAutoPartitionInfo(Long txnId, Long partitionId,
94111
List<TTabletLocation> partitionTablets, List<TTabletLocation> partitionSlaveTablets) {
112+
getOrSetAutoPartitionInfo(txnId, partitionId, partitionTablets, partitionSlaveTablets, -1);
113+
}
114+
115+
public long getOrSetAutoPartitionInfo(Long txnId, Long partitionId,
116+
List<TTabletLocation> partitionTablets, List<TTabletLocation> partitionSlaveTablets,
117+
long loadTabletIdx) {
95118
ConcurrentHashMap<Long, PartitionTabletCache> partitionMap =
96119
autoPartitionInfo.computeIfAbsent(txnId, k -> new ConcurrentHashMap<>());
97120

@@ -100,7 +123,8 @@ public void getOrSetAutoPartitionInfo(Long txnId, Long partitionId,
100123
needUpdate.set(true);
101124
return new PartitionTabletCache(
102125
new ArrayList<>(partitionTablets),
103-
new ArrayList<>(partitionSlaveTablets)
126+
new ArrayList<>(partitionSlaveTablets),
127+
loadTabletIdx
104128
);
105129
});
106130

@@ -110,13 +134,13 @@ public void getOrSetAutoPartitionInfo(Long txnId, Long partitionId,
110134
partitionTablets.addAll(cached.tablets);
111135
partitionSlaveTablets.addAll(cached.slaveTablets);
112136
LOG.debug("Get cached auto partition info from cache, txnId: {}, partitionId: {}, "
113-
+ "tablets: {}, slaveTablets: {}", txnId, partitionId,
114-
cached.tablets.size(), cached.slaveTablets.size());
137+
+ "tablets: {}, slaveTablets: {}, loadTabletIdx: {}", txnId, partitionId,
138+
cached.tablets.size(), cached.slaveTablets.size(), cached.loadTabletIdx);
115139
}
140+
return cached.loadTabletIdx;
116141
}
117142

118143
public void clearAutoPartitionInfo(Long txnId) {
119144
autoPartitionInfo.remove(txnId);
120145
}
121146
}
122-
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.transaction;
19+
20+
import org.apache.doris.thrift.TTabletLocation;
21+
22+
import org.junit.Assert;
23+
import org.junit.Test;
24+
25+
import java.util.ArrayList;
26+
import java.util.Arrays;
27+
import java.util.List;
28+
import java.util.concurrent.atomic.AtomicLong;
29+
30+
public class AutoPartitionCacheManagerTest {
31+
@Test
32+
public void testGetOrSetAutoPartitionInfoReturnsCachedLoadTabletIdx() {
33+
AutoPartitionCacheManager cacheManager = new AutoPartitionCacheManager();
34+
List<TTabletLocation> firstTablets = new ArrayList<>();
35+
firstTablets.add(new TTabletLocation(10001L, Arrays.asList(1L)));
36+
List<TTabletLocation> firstSlaveTablets = new ArrayList<>();
37+
38+
long storedLoadTabletIdx = cacheManager.getOrSetAutoPartitionInfo(
39+
10L, 20L, firstTablets, firstSlaveTablets, 3);
40+
Assert.assertEquals(3, storedLoadTabletIdx);
41+
42+
List<TTabletLocation> secondTablets = new ArrayList<>();
43+
secondTablets.add(new TTabletLocation(20001L, Arrays.asList(2L)));
44+
List<TTabletLocation> secondSlaveTablets = new ArrayList<>();
45+
46+
long cachedLoadTabletIdx = cacheManager.getOrSetAutoPartitionInfo(
47+
10L, 20L, secondTablets, secondSlaveTablets, 5);
48+
Assert.assertEquals(3, cachedLoadTabletIdx);
49+
Assert.assertEquals(1, secondTablets.size());
50+
Assert.assertEquals(10001L, secondTablets.get(0).getTabletId());
51+
52+
List<TTabletLocation> cachedTablets = new ArrayList<>();
53+
List<TTabletLocation> cachedSlaveTablets = new ArrayList<>();
54+
AtomicLong readLoadTabletIdx = new AtomicLong(-1);
55+
Assert.assertTrue(cacheManager.getAutoPartitionInfo(
56+
10L, 20L, cachedTablets, cachedSlaveTablets, readLoadTabletIdx));
57+
Assert.assertEquals(3, readLoadTabletIdx.get());
58+
Assert.assertEquals(1, cachedTablets.size());
59+
Assert.assertEquals(10001L, cachedTablets.get(0).getTabletId());
60+
}
61+
}

gensrc/thrift/FrontendService.thrift

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1414,6 +1414,8 @@ struct TCreatePartitionRequest {
14141414
6: optional bool write_single_replica = false
14151415
// query_id to identify the coordinator, if coordinator exists, it means this is a multi-instance load
14161416
7: optional Types.TUniqueId query_id
1417+
// Whether the caller's table sink is using load_to_single_tablet mode.
1418+
8: optional bool load_to_single_tablet = false
14171419
}
14181420

14191421
struct TCreatePartitionResult {
@@ -1434,6 +1436,8 @@ struct TReplacePartitionRequest {
14341436
5: optional string be_endpoint
14351437
6: optional bool write_single_replica = false
14361438
7: optional Types.TUniqueId query_id
1439+
// Whether the caller's table sink is using load_to_single_tablet mode.
1440+
8: optional bool load_to_single_tablet = false
14371441
}
14381442

14391443
struct TReplacePartitionResult {

0 commit comments

Comments
 (0)