Skip to content

Commit 6a59cf4

Browse files
authored
Optimize local load TsFile piece dispatch (#17851) (#17876)
1 parent abcc51c commit 6a59cf4

5 files changed

Lines changed: 209 additions & 8 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
3939
import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
4040
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
41-
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
4241
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadSingleTsFileNode;
4342
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
4443
import org.apache.iotdb.db.queryengine.plan.scheduler.FragInstanceDispatchResult;
@@ -156,12 +155,7 @@ public void dispatchLocally(FragmentInstance instance) throws FragmentInstanceDi
156155
PlanNode planNode = instance.getFragment().getPlanNodeTree();
157156

158157
if (planNode instanceof LoadTsFilePieceNode) { // split
159-
LoadTsFilePieceNode pieceNode =
160-
(LoadTsFilePieceNode) PlanNodeType.deserialize(planNode.serializeToByteBuffer());
161-
if (pieceNode == null) {
162-
throw new FragmentInstanceDispatchException(
163-
new TSStatus(TSStatusCode.DESERIALIZE_PIECE_OF_TSFILE_ERROR.getStatusCode()));
164-
}
158+
LoadTsFilePieceNode pieceNode = (LoadTsFilePieceNode) planNode;
165159
TSStatus resultStatus =
166160
StorageEngine.getInstance().writeLoadTsFileNode((DataRegionId) groupId, pieceNode, uuid);
167161

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,14 +287,20 @@ public void writeDecodeValuePage(
287287
}
288288

289289
protected void deserializeTsFileData(TsFileIOWriter writer) throws IOException, PageException {
290-
final InputStream stream = new ByteArrayInputStream(chunkData);
290+
final InputStream stream = createTsFileDataInputStream();
291291
if (needDecodeChunk) {
292292
buildChunkWriter(stream, writer);
293293
} else {
294294
deserializeEntireChunk(stream, writer);
295295
}
296296
}
297297

298+
private InputStream createTsFileDataInputStream() {
299+
return chunkData == null
300+
? new ByteArrayInputStream(byteStream.getBuf(), 0, byteStream.size())
301+
: new ByteArrayInputStream(chunkData);
302+
}
303+
298304
protected void deserializeTsFileDataByte(final InputStream stream) throws IOException {
299305
final int size = ReadWriteIOUtils.readInt(stream);
300306
this.chunkData = new byte[size];

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/NonAlignedChunkData.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.tsfile.write.schema.MeasurementSchema;
3939
import org.apache.tsfile.write.writer.TsFileIOWriter;
4040

41+
import java.io.ByteArrayInputStream;
4142
import java.io.DataOutputStream;
4243
import java.io.IOException;
4344
import java.io.InputStream;
@@ -113,13 +114,26 @@ public boolean isAligned() {
113114

114115
@Override
115116
public void writeToFileWriter(final TsFileIOWriter writer) throws IOException {
117+
ensureDataReadyForWriting();
116118
if (chunk != null) {
117119
writer.writeChunk(chunk);
118120
} else {
119121
chunkWriter.writeToFileWriter(writer);
120122
}
121123
}
122124

125+
private void ensureDataReadyForWriting() throws IOException {
126+
if (chunk != null || chunkWriter != null) {
127+
return;
128+
}
129+
130+
try {
131+
deserializeTsFileData(new ByteArrayInputStream(byteStream.getBuf(), 0, byteStream.size()));
132+
} catch (final PageException e) {
133+
throw new IOException(e);
134+
}
135+
}
136+
123137
@Override
124138
public void serialize(final DataOutputStream stream) throws IOException {
125139
ReadWriteIOUtils.write(isModification(), stream);
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.queryengine.plan.scheduler.load;
21+
22+
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
23+
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
24+
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
25+
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
26+
import org.apache.iotdb.commons.consensus.DataRegionId;
27+
import org.apache.iotdb.commons.partition.StorageExecutor;
28+
import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
29+
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
30+
import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
31+
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
32+
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
33+
import org.apache.iotdb.db.storageengine.StorageEngine;
34+
import org.apache.iotdb.rpc.RpcUtils;
35+
36+
import org.junit.Test;
37+
import org.junit.runner.RunWith;
38+
import org.mockito.Mockito;
39+
import org.powermock.api.mockito.PowerMockito;
40+
import org.powermock.core.classloader.annotations.PowerMockIgnore;
41+
import org.powermock.core.classloader.annotations.PrepareForTest;
42+
import org.powermock.modules.junit4.PowerMockRunner;
43+
44+
import java.io.File;
45+
import java.util.Collections;
46+
47+
@PowerMockIgnore({"com.sun.org.apache.xerces.*", "javax.xml.*", "org.xml.*", "javax.management.*"})
48+
@RunWith(PowerMockRunner.class)
49+
@PrepareForTest(StorageEngine.class)
50+
public class LoadTsFileDispatcherImplTest {
51+
52+
@Test
53+
public void testDispatchLocallyPieceNodeSkipsSerdeRoundTrip() throws Exception {
54+
final StorageEngine storageEngine = Mockito.mock(StorageEngine.class);
55+
PowerMockito.mockStatic(StorageEngine.class);
56+
PowerMockito.when(StorageEngine.getInstance()).thenReturn(storageEngine);
57+
58+
final LoadTsFileDispatcherImpl dispatcher = new LoadTsFileDispatcherImpl(null, false);
59+
dispatcher.setUuid("test-uuid");
60+
61+
final LoadTsFilePieceNode pieceNode =
62+
new LoadTsFilePieceNode(new PlanNodeId("piece"), new File("test.tsfile"));
63+
final FragmentInstance instance = createFragmentInstance(pieceNode);
64+
65+
Mockito.when(
66+
storageEngine.writeLoadTsFileNode(
67+
Mockito.eq(new DataRegionId(1)), Mockito.same(pieceNode), Mockito.eq("test-uuid")))
68+
.thenReturn(RpcUtils.SUCCESS_STATUS);
69+
70+
dispatcher.dispatchLocally(instance);
71+
72+
Mockito.verify(storageEngine)
73+
.writeLoadTsFileNode(
74+
Mockito.eq(new DataRegionId(1)), Mockito.same(pieceNode), Mockito.eq("test-uuid"));
75+
}
76+
77+
private static FragmentInstance createFragmentInstance(final LoadTsFilePieceNode pieceNode) {
78+
final PlanFragmentId fragmentId = new PlanFragmentId("test", 0);
79+
final FragmentInstance instance =
80+
new FragmentInstance(
81+
new PlanFragment(fragmentId, pieceNode),
82+
fragmentId.genFragmentInstanceId(),
83+
null,
84+
null,
85+
0,
86+
null,
87+
false,
88+
false);
89+
final TConsensusGroupId consensusGroupId = new DataRegionId(1).convertToTConsensusGroupId();
90+
instance.setExecutorAndHost(
91+
new StorageExecutor(
92+
new TRegionReplicaSet(
93+
consensusGroupId,
94+
Collections.singletonList(
95+
new TDataNodeLocation().setInternalEndPoint(new TEndPoint("127.0.0.1", 1))))));
96+
return instance;
97+
}
98+
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.storageengine.load.splitter;
21+
22+
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
23+
24+
import org.apache.tsfile.enums.TSDataType;
25+
import org.apache.tsfile.file.header.ChunkHeader;
26+
import org.apache.tsfile.file.metadata.IChunkMetadata;
27+
import org.apache.tsfile.file.metadata.enums.CompressionType;
28+
import org.apache.tsfile.file.metadata.enums.TSEncoding;
29+
import org.apache.tsfile.file.metadata.statistics.Statistics;
30+
import org.apache.tsfile.read.common.Chunk;
31+
import org.apache.tsfile.write.writer.TsFileIOWriter;
32+
import org.junit.Test;
33+
import org.mockito.Mockito;
34+
35+
import java.nio.ByteBuffer;
36+
37+
public class ChunkDataDirectWriteTest {
38+
39+
@Test
40+
public void testNonAlignedChunkDataCanWriteWithoutSerdeRoundTrip() throws Exception {
41+
final NonAlignedChunkData chunkData = createNonAlignedChunkData();
42+
chunkData.setNotDecode();
43+
final IChunkMetadata chunkMetadata = Mockito.mock(IChunkMetadata.class);
44+
Mockito.doReturn(createInt32Statistics()).when(chunkMetadata).getStatistics();
45+
chunkData.writeEntireChunk(ByteBuffer.allocate(0), chunkMetadata);
46+
47+
final TsFileIOWriter writer = Mockito.mock(TsFileIOWriter.class);
48+
chunkData.writeToFileWriter(writer);
49+
50+
Mockito.verify(writer).writeChunk(Mockito.any(Chunk.class));
51+
}
52+
53+
@Test
54+
public void testAlignedChunkDataCanWriteWithoutSerdeRoundTrip() throws Exception {
55+
final AlignedChunkData chunkData = createAlignedChunkData();
56+
chunkData.setNotDecode();
57+
final IChunkMetadata chunkMetadata = Mockito.mock(IChunkMetadata.class);
58+
Mockito.doReturn(createInt32Statistics()).when(chunkMetadata).getStatistics();
59+
chunkData.writeEntireChunk(ByteBuffer.allocate(0), chunkMetadata);
60+
61+
final TsFileIOWriter writer = Mockito.mock(TsFileIOWriter.class);
62+
chunkData.writeToFileWriter(writer);
63+
64+
Mockito.verify(writer).writeChunk(Mockito.any(Chunk.class));
65+
}
66+
67+
private static Statistics<?> createInt32Statistics() {
68+
final Statistics<?> statistics = Statistics.getStatsByType(TSDataType.INT32);
69+
statistics.update(1L, 1);
70+
return statistics;
71+
}
72+
73+
private static NonAlignedChunkData createNonAlignedChunkData() {
74+
return (NonAlignedChunkData)
75+
ChunkData.createChunkData(
76+
false, "root.sg.d1", createChunkHeader(), new TTimePartitionSlot(0L));
77+
}
78+
79+
private static AlignedChunkData createAlignedChunkData() {
80+
return (AlignedChunkData)
81+
ChunkData.createChunkData(
82+
true, "root.sg.d1", createChunkHeader(), new TTimePartitionSlot(0L));
83+
}
84+
85+
private static ChunkHeader createChunkHeader() {
86+
return new ChunkHeader(
87+
"temperature", 0, TSDataType.INT32, CompressionType.UNCOMPRESSED, TSEncoding.PLAIN, 0);
88+
}
89+
}

0 commit comments

Comments
 (0)