Skip to content

Commit 4d55d51

Browse files
authored
Optimize load partition routing (#17863)
1 parent d09785e commit 4d55d51

3 files changed

Lines changed: 90 additions & 72 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -273,28 +273,28 @@ public DataPartition getOrCreateDataPartition(
273273
@Override
274274
public DataPartition getOrCreateDataPartition(
275275
final List<DataPartitionQueryParam> dataPartitionQueryParams, final String userName) {
276-
DataPartition dataPartition;
276+
final Map<String, List<DataPartitionQueryParam>> splitDataPartitionQueryParams =
277+
splitDataPartitionQueryParam(
278+
dataPartitionQueryParams, config.isAutoCreateSchemaEnabled(), userName);
279+
DataPartition dataPartition = partitionCache.getDataPartition(splitDataPartitionQueryParams);
280+
if (null != dataPartition) {
281+
return dataPartition;
282+
}
283+
277284
try (final ConfigNodeClient client =
278285
configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
279-
final Map<String, List<DataPartitionQueryParam>> splitDataPartitionQueryParams =
280-
splitDataPartitionQueryParam(
281-
dataPartitionQueryParams, config.isAutoCreateSchemaEnabled(), userName);
282-
dataPartition = partitionCache.getDataPartition(splitDataPartitionQueryParams);
283-
284-
if (null == dataPartition) {
285-
final TDataPartitionReq req = constructDataPartitionReq(splitDataPartitionQueryParams);
286-
final TDataPartitionTableResp dataPartitionTableResp =
287-
client.getOrCreateDataPartitionTable(req);
286+
final TDataPartitionReq req = constructDataPartitionReq(splitDataPartitionQueryParams);
287+
final TDataPartitionTableResp dataPartitionTableResp =
288+
client.getOrCreateDataPartitionTable(req);
288289

289-
if (dataPartitionTableResp.getStatus().getCode()
290-
== TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
291-
dataPartition = parseDataPartitionResp(dataPartitionTableResp);
292-
partitionCache.updateDataPartitionCache(dataPartitionTableResp.getDataPartitionTable());
293-
} else {
294-
throw new IoTDBRuntimeException(
295-
dataPartitionTableResp.getStatus().getMessage(),
296-
dataPartitionTableResp.getStatus().getCode());
297-
}
290+
if (dataPartitionTableResp.getStatus().getCode()
291+
== TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
292+
dataPartition = parseDataPartitionResp(dataPartitionTableResp);
293+
partitionCache.updateDataPartitionCache(dataPartitionTableResp.getDataPartitionTable());
294+
} else {
295+
throw new IoTDBRuntimeException(
296+
dataPartitionTableResp.getStatus().getMessage(),
297+
dataPartitionTableResp.getStatus().getCode());
298298
}
299299
} catch (final ClientManagerException | TException e) {
300300
throw new StatementAnalyzeException(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -96,27 +96,31 @@ public boolean needDecodeTsFile(
9696
return true;
9797
}
9898

99-
List<Pair<IDeviceID, TTimePartitionSlot>> slotList = new ArrayList<>();
100-
resource
101-
.getDevices()
102-
.forEach(
103-
o -> {
104-
// iterating the index, must present
105-
slotList.add(
106-
new Pair<>(
107-
o, TimePartitionUtils.getTimePartitionSlot(resource.getStartTime(o).get())));
108-
slotList.add(
109-
new Pair<>(
110-
o, TimePartitionUtils.getTimePartitionSlot(resource.getEndTime(o).get())));
111-
});
99+
List<Pair<IDeviceID, TTimePartitionSlot>> slotList =
100+
new ArrayList<>(resource.getDevices().size() << 1);
101+
for (final IDeviceID device : resource.getDevices()) {
102+
// iterating the index, must present
103+
final TTimePartitionSlot startSlot =
104+
TimePartitionUtils.getTimePartitionSlot(resource.getStartTime(device).get());
105+
final TTimePartitionSlot endSlot =
106+
TimePartitionUtils.getTimePartitionSlot(resource.getEndTime(device).get());
107+
slotList.add(new Pair<>(device, startSlot));
108+
if (!startSlot.equals(endSlot)) {
109+
slotList.add(new Pair<>(device, endSlot));
110+
}
111+
}
112112

113113
if (slotList.isEmpty()) {
114114
throw new IllegalStateException(
115115
String.format("Devices in TsFile %s is empty, this should not happen here.", tsFile));
116-
} else if (slotList.stream()
117-
.anyMatch(slotPair -> !slotPair.getRight().equals(slotList.get(0).right))) {
118-
needDecodeTsFile = true;
119116
} else {
117+
final TTimePartitionSlot firstSlot = slotList.get(0).right;
118+
for (int i = 1, size = slotList.size(); i < size; i++) {
119+
if (!slotList.get(i).right.equals(firstSlot)) {
120+
needDecodeTsFile = true;
121+
return true;
122+
}
123+
}
120124
needDecodeTsFile = !isDispatchedToLocal(new HashSet<>(partitionFetcher.apply(slotList)));
121125
}
122126

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java

Lines changed: 51 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -782,14 +782,29 @@ private void routeChunkData() throws LoadFileException {
782782
return;
783783
}
784784

785+
final List<Pair<IDeviceID, TTimePartitionSlot>> partitionSlotList = new ArrayList<>();
786+
final int[] chunkPartitionIndexes = new int[nonDirectionalChunkData.size()];
787+
final Map<IDeviceID, Map<TTimePartitionSlot, Integer>> partitionSlotIndexes = new HashMap<>();
788+
for (int i = 0, size = nonDirectionalChunkData.size(); i < size; i++) {
789+
final ChunkData chunkData = nonDirectionalChunkData.get(i);
790+
final IDeviceID device = chunkData.getDevice();
791+
final TTimePartitionSlot timePartitionSlot = chunkData.getTimePartitionSlot();
792+
final Map<TTimePartitionSlot, Integer> slotIndexes =
793+
partitionSlotIndexes.computeIfAbsent(device, key -> new HashMap<>());
794+
Integer partitionSlotIndex = slotIndexes.get(timePartitionSlot);
795+
if (partitionSlotIndex == null) {
796+
partitionSlotIndex = partitionSlotList.size();
797+
slotIndexes.put(timePartitionSlot, partitionSlotIndex);
798+
partitionSlotList.add(new Pair<>(device, timePartitionSlot));
799+
}
800+
chunkPartitionIndexes[i] = partitionSlotIndex;
801+
}
802+
785803
List<TRegionReplicaSet> replicaSets =
786804
scheduler.partitionFetcher.queryDataPartition(
787-
nonDirectionalChunkData.stream()
788-
.map(data -> new Pair<>(data.getDevice(), data.getTimePartitionSlot()))
789-
.collect(Collectors.toList()),
790-
scheduler.queryContext.getSession().getUserName());
791-
for (int i = 0; i < replicaSets.size(); i++) {
792-
final TRegionReplicaSet replicaSet = replicaSets.get(i);
805+
partitionSlotList, scheduler.queryContext.getSession().getUserName());
806+
for (int i = 0, size = nonDirectionalChunkData.size(); i < size; i++) {
807+
final TRegionReplicaSet replicaSet = replicaSets.get(chunkPartitionIndexes[i]);
793808
final TConsensusGroupId regionId = replicaSet.getRegionId();
794809
if (regionId2ReplicaSetAndNode.containsKey(regionId)
795810
&& !Objects.equals(regionId2ReplicaSetAndNode.get(regionId).getLeft(), replicaSet)) {
@@ -864,50 +879,49 @@ public void setDatabase(String database) {
864879

865880
public List<TRegionReplicaSet> queryDataPartition(
866881
List<Pair<IDeviceID, TTimePartitionSlot>> slotList, String userName) {
867-
List<TRegionReplicaSet> replicaSets = new ArrayList<>();
882+
List<TRegionReplicaSet> replicaSets = new ArrayList<>(slotList.size());
868883
int size = slotList.size();
869884

870885
for (int i = 0; i < size; i += TRANSMIT_LIMIT) {
871886
List<Pair<IDeviceID, TTimePartitionSlot>> subSlotList =
872887
slotList.subList(i, Math.min(size, i + TRANSMIT_LIMIT));
873888
DataPartition dataPartition =
874889
fetcher.getOrCreateDataPartition(toQueryParam(subSlotList), userName);
875-
replicaSets.addAll(
876-
subSlotList.stream()
877-
.map(
878-
pair ->
879-
// database is an explicit database hint for table-model loads and
880-
// pipe-generated tree-model loads.
881-
database != null
882-
? dataPartition.getDataRegionReplicaSetForWriting(
883-
pair.left, pair.right, database)
884-
: dataPartition.getDataRegionReplicaSetForWriting(
885-
pair.left, pair.right))
886-
.collect(Collectors.toList()));
890+
for (final Pair<IDeviceID, TTimePartitionSlot> pair : subSlotList) {
891+
// database is an explicit database hint for table-model loads and
892+
// pipe-generated tree-model loads.
893+
replicaSets.add(
894+
database != null
895+
? dataPartition.getDataRegionReplicaSetForWriting(pair.left, pair.right, database)
896+
: dataPartition.getDataRegionReplicaSetForWriting(pair.left, pair.right));
897+
}
887898
}
888899
return replicaSets;
889900
}
890901

891902
private List<DataPartitionQueryParam> toQueryParam(
892903
List<Pair<IDeviceID, TTimePartitionSlot>> slots) {
893-
return slots.stream()
894-
.collect(
895-
Collectors.groupingBy(
896-
Pair::getLeft, Collectors.mapping(Pair::getRight, Collectors.toSet())))
897-
.entrySet()
898-
.stream()
899-
.map(
900-
entry -> {
901-
DataPartitionQueryParam queryParam =
902-
new DataPartitionQueryParam(entry.getKey(), new ArrayList<>(entry.getValue()));
903-
// database is an explicit database hint for table-model loads and
904-
// pipe-generated tree-model loads.
905-
if (database != null) {
906-
queryParam.setDatabaseName(database);
907-
}
908-
return queryParam;
909-
})
910-
.collect(Collectors.toList());
904+
final Map<IDeviceID, Set<TTimePartitionSlot>> device2TimePartitionSlots = new HashMap<>();
905+
for (final Pair<IDeviceID, TTimePartitionSlot> slot : slots) {
906+
device2TimePartitionSlots
907+
.computeIfAbsent(slot.left, key -> new HashSet<>())
908+
.add(slot.right);
909+
}
910+
911+
final List<DataPartitionQueryParam> queryParams =
912+
new ArrayList<>(device2TimePartitionSlots.size());
913+
for (final Map.Entry<IDeviceID, Set<TTimePartitionSlot>> entry :
914+
device2TimePartitionSlots.entrySet()) {
915+
final DataPartitionQueryParam queryParam =
916+
new DataPartitionQueryParam(entry.getKey(), new ArrayList<>(entry.getValue()));
917+
// database is an explicit database hint for table-model loads and
918+
// pipe-generated tree-model loads.
919+
if (database != null) {
920+
queryParam.setDatabaseName(database);
921+
}
922+
queryParams.add(queryParam);
923+
}
924+
return queryParams;
911925
}
912926
}
913927
}

0 commit comments

Comments
 (0)