Skip to content

Commit 7cffde5

Browse files
authored
Optimize load partition routing (#17863) (#17884)
1 parent d39d22f commit 7cffde5

3 files changed

Lines changed: 88 additions & 64 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -258,26 +258,28 @@ public DataPartition getOrCreateDataPartition(
258258
dataPartitionQueryParams, config.isAutoCreateSchemaEnabled(), userName);
259259
DataPartition dataPartition = partitionCache.getDataPartition(splitDataPartitionQueryParams);
260260

261-
if (null == dataPartition) {
262-
try (ConfigNodeClient client =
263-
configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
264-
TDataPartitionReq req = constructDataPartitionReq(splitDataPartitionQueryParams);
265-
TDataPartitionTableResp dataPartitionTableResp = client.getOrCreateDataPartitionTable(req);
261+
if (null != dataPartition) {
262+
return dataPartition;
263+
}
266264

267-
if (dataPartitionTableResp.getStatus().getCode()
268-
== TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
269-
dataPartition = parseDataPartitionResp(dataPartitionTableResp);
270-
partitionCache.updateDataPartitionCache(dataPartitionTableResp.getDataPartitionTable());
271-
} else {
272-
throw new RuntimeException(
273-
new IoTDBException(
274-
dataPartitionTableResp.getStatus().getMessage(),
275-
dataPartitionTableResp.getStatus().getCode()));
276-
}
277-
} catch (ClientManagerException | TException e) {
278-
throw new StatementAnalyzeException(
279-
"An error occurred when executing getOrCreateDataPartition():" + e.getMessage());
265+
try (ConfigNodeClient client =
266+
configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
267+
TDataPartitionReq req = constructDataPartitionReq(splitDataPartitionQueryParams);
268+
TDataPartitionTableResp dataPartitionTableResp = client.getOrCreateDataPartitionTable(req);
269+
270+
if (dataPartitionTableResp.getStatus().getCode()
271+
== TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
272+
dataPartition = parseDataPartitionResp(dataPartitionTableResp);
273+
partitionCache.updateDataPartitionCache(dataPartitionTableResp.getDataPartitionTable());
274+
} else {
275+
throw new RuntimeException(
276+
new IoTDBException(
277+
dataPartitionTableResp.getStatus().getMessage(),
278+
dataPartitionTableResp.getStatus().getCode()));
280279
}
280+
} catch (ClientManagerException | TException e) {
281+
throw new StatementAnalyzeException(
282+
"An error occurred when executing getOrCreateDataPartition():" + e.getMessage());
281283
}
282284
return dataPartition;
283285
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -79,27 +79,35 @@ public boolean isTsFileEmpty() {
7979
public boolean needDecodeTsFile(
8080
Function<List<Pair<IDeviceID, TTimePartitionSlot>>, List<TRegionReplicaSet>>
8181
partitionFetcher) {
82-
List<Pair<IDeviceID, TTimePartitionSlot>> slotList = new ArrayList<>();
83-
resource
84-
.getDevices()
85-
.forEach(
86-
o -> {
87-
// iterating the index, must present
88-
slotList.add(
89-
new Pair<>(
90-
o, TimePartitionUtils.getTimePartitionSlot(resource.getStartTime(o).get())));
91-
slotList.add(
92-
new Pair<>(
93-
o, TimePartitionUtils.getTimePartitionSlot(resource.getEndTime(o).get())));
94-
});
82+
if (needDecodeTsFile) {
83+
return true;
84+
}
85+
86+
List<Pair<IDeviceID, TTimePartitionSlot>> slotList =
87+
new ArrayList<>(resource.getDevices().size() << 1);
88+
for (final IDeviceID device : resource.getDevices()) {
89+
// iterating the index, must present
90+
final TTimePartitionSlot startSlot =
91+
TimePartitionUtils.getTimePartitionSlot(resource.getStartTime(device).get());
92+
final TTimePartitionSlot endSlot =
93+
TimePartitionUtils.getTimePartitionSlot(resource.getEndTime(device).get());
94+
slotList.add(new Pair<>(device, startSlot));
95+
if (!startSlot.equals(endSlot)) {
96+
slotList.add(new Pair<>(device, endSlot));
97+
}
98+
}
9599

96100
if (slotList.isEmpty()) {
97101
throw new IllegalStateException(
98102
String.format("Devices in TsFile %s is empty, this should not happen here.", tsFile));
99-
} else if (slotList.stream()
100-
.anyMatch(slotPair -> !slotPair.getRight().equals(slotList.get(0).right))) {
101-
needDecodeTsFile = true;
102103
} else {
104+
final TTimePartitionSlot firstSlot = slotList.get(0).right;
105+
for (int i = 1, size = slotList.size(); i < size; i++) {
106+
if (!slotList.get(i).right.equals(firstSlot)) {
107+
needDecodeTsFile = true;
108+
return true;
109+
}
110+
}
103111
needDecodeTsFile = !isDispatchedToLocal(new HashSet<>(partitionFetcher.apply(slotList)));
104112
}
105113

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java

Lines changed: 44 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -709,18 +709,29 @@ private void routeChunkData() throws LoadFileException {
709709
return;
710710
}
711711

712+
final List<Pair<IDeviceID, TTimePartitionSlot>> partitionSlotList = new ArrayList<>();
713+
final int[] chunkPartitionIndexes = new int[nonDirectionalChunkData.size()];
714+
final Map<IDeviceID, Map<TTimePartitionSlot, Integer>> partitionSlotIndexes = new HashMap<>();
715+
for (int i = 0, size = nonDirectionalChunkData.size(); i < size; i++) {
716+
final ChunkData chunkData = nonDirectionalChunkData.get(i);
717+
final IDeviceID device = new PlainDeviceID(chunkData.getDevice());
718+
final TTimePartitionSlot timePartitionSlot = chunkData.getTimePartitionSlot();
719+
final Map<TTimePartitionSlot, Integer> slotIndexes =
720+
partitionSlotIndexes.computeIfAbsent(device, key -> new HashMap<>());
721+
Integer partitionSlotIndex = slotIndexes.get(timePartitionSlot);
722+
if (partitionSlotIndex == null) {
723+
partitionSlotIndex = partitionSlotList.size();
724+
slotIndexes.put(timePartitionSlot, partitionSlotIndex);
725+
partitionSlotList.add(new Pair<>(device, timePartitionSlot));
726+
}
727+
chunkPartitionIndexes[i] = partitionSlotIndex;
728+
}
729+
712730
List<TRegionReplicaSet> replicaSets =
713731
scheduler.partitionFetcher.queryDataPartition(
714-
nonDirectionalChunkData.stream()
715-
.map(
716-
data ->
717-
new Pair<>(
718-
(IDeviceID) new PlainDeviceID(data.getDevice()),
719-
data.getTimePartitionSlot()))
720-
.collect(Collectors.toList()),
721-
scheduler.queryContext.getSession().getUserName());
722-
for (int i = 0; i < replicaSets.size(); i++) {
723-
final TRegionReplicaSet replicaSet = replicaSets.get(i);
732+
partitionSlotList, scheduler.queryContext.getSession().getUserName());
733+
for (int i = 0, size = nonDirectionalChunkData.size(); i < size; i++) {
734+
final TRegionReplicaSet replicaSet = replicaSets.get(chunkPartitionIndexes[i]);
724735
final TConsensusGroupId regionId = replicaSet.getRegionId();
725736
if (regionId2ReplicaSetAndNode.containsKey(regionId)
726737
&& !Objects.equals(regionId2ReplicaSetAndNode.get(regionId).getLeft(), replicaSet)) {
@@ -790,39 +801,42 @@ public DataPartitionBatchFetcher(IPartitionFetcher fetcher) {
790801

791802
public List<TRegionReplicaSet> queryDataPartition(
792803
List<Pair<IDeviceID, TTimePartitionSlot>> slotList, String userName) {
793-
List<TRegionReplicaSet> replicaSets = new ArrayList<>();
804+
List<TRegionReplicaSet> replicaSets = new ArrayList<>(slotList.size());
794805
int size = slotList.size();
795806

796807
for (int i = 0; i < size; i += TRANSMIT_LIMIT) {
797808
List<Pair<IDeviceID, TTimePartitionSlot>> subSlotList =
798809
slotList.subList(i, Math.min(size, i + TRANSMIT_LIMIT));
799810
DataPartition dataPartition =
800811
fetcher.getOrCreateDataPartition(toQueryParam(subSlotList), userName);
801-
replicaSets.addAll(
802-
subSlotList.stream()
803-
.map(
804-
pair ->
805-
dataPartition.getDataRegionReplicaSetForWriting(
806-
((PlainDeviceID) pair.left).toStringID(), pair.right))
807-
.collect(Collectors.toList()));
812+
for (final Pair<IDeviceID, TTimePartitionSlot> pair : subSlotList) {
813+
replicaSets.add(
814+
dataPartition.getDataRegionReplicaSetForWriting(
815+
((PlainDeviceID) pair.left).toStringID(), pair.right));
816+
}
808817
}
809818
return replicaSets;
810819
}
811820

812821
private List<DataPartitionQueryParam> toQueryParam(
813822
List<Pair<IDeviceID, TTimePartitionSlot>> slots) {
814-
return slots.stream()
815-
.collect(
816-
Collectors.groupingBy(
817-
Pair::getLeft, Collectors.mapping(Pair::getRight, Collectors.toSet())))
818-
.entrySet()
819-
.stream()
820-
.map(
821-
entry ->
822-
new DataPartitionQueryParam(
823-
((PlainDeviceID) entry.getKey()).toStringID(),
824-
new ArrayList<>(entry.getValue())))
825-
.collect(Collectors.toList());
823+
final Map<IDeviceID, Set<TTimePartitionSlot>> device2TimePartitionSlots = new HashMap<>();
824+
for (final Pair<IDeviceID, TTimePartitionSlot> slot : slots) {
825+
device2TimePartitionSlots
826+
.computeIfAbsent(slot.left, key -> new HashSet<>())
827+
.add(slot.right);
828+
}
829+
830+
final List<DataPartitionQueryParam> queryParams =
831+
new ArrayList<>(device2TimePartitionSlots.size());
832+
for (final Map.Entry<IDeviceID, Set<TTimePartitionSlot>> entry :
833+
device2TimePartitionSlots.entrySet()) {
834+
final DataPartitionQueryParam queryParam =
835+
new DataPartitionQueryParam(
836+
((PlainDeviceID) entry.getKey()).toStringID(), new ArrayList<>(entry.getValue()));
837+
queryParams.add(queryParam);
838+
}
839+
return queryParams;
826840
}
827841
}
828842
}

0 commit comments

Comments
 (0)