Skip to content

Commit 4747d5f

Browse files
authored
Optimize local load TsFile piece dispatch (#17851)
1 parent 90055d5 commit 4747d5f

5 files changed

Lines changed: 215 additions & 7 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
3333
import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
3434
import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNode;
35-
import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeType;
3635
import org.apache.iotdb.db.conf.IoTDBDescriptor;
3736
import org.apache.iotdb.db.exception.load.LoadFileException;
3837
import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
@@ -161,12 +160,7 @@ public void dispatchLocally(FragmentInstance instance) throws FragmentInstanceDi
161160
PlanNode planNode = instance.getFragment().getPlanNodeTree();
162161

163162
if (planNode instanceof LoadTsFilePieceNode) { // split
164-
LoadTsFilePieceNode pieceNode =
165-
(LoadTsFilePieceNode) PlanNodeType.deserialize(planNode.serializeToByteBuffer());
166-
if (pieceNode == null) {
167-
throw new FragmentInstanceDispatchException(
168-
new TSStatus(TSStatusCode.DESERIALIZE_PIECE_OF_TSFILE_ERROR.getStatusCode()));
169-
}
163+
LoadTsFilePieceNode pieceNode = (LoadTsFilePieceNode) planNode;
170164
TSStatus resultStatus =
171165
StorageEngine.getInstance().writeLoadTsFileNode((DataRegionId) groupId, pieceNode, uuid);
172166

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,7 @@ public void writeDecodeValuePage(
290290
}
291291

292292
protected void writeTsFileData(TsFileIOWriter writer) throws IOException, PageException {
293+
ensureDataReadyForWriting();
293294
final InputStream stream = new LoadTsFilePieceNode.ByteBufferInputStream(chunkData);
294295
if (needDecodeChunk) {
295296
writeChunkToWriter(stream, writer);
@@ -298,6 +299,14 @@ protected void writeTsFileData(TsFileIOWriter writer) throws IOException, PageEx
298299
}
299300
}
300301

302+
private void ensureDataReadyForWriting() throws IOException {
303+
if (chunkData != null) {
304+
chunkData.rewind();
305+
return;
306+
}
307+
chunkData = ByteBuffer.wrap(byteStream.getBuf(), 0, byteStream.size());
308+
}
309+
301310
protected void deserializeTsFileDataByte(final InputStream stream) throws IOException {
302311
final int size = ReadWriteIOUtils.readInt(stream);
303312
if (stream instanceof LoadTsFilePieceNode.ByteBufferInputStream) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/NonAlignedChunkData.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
2323
import org.apache.iotdb.commons.utils.TimePartitionUtils;
24+
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
2425

2526
import org.apache.tsfile.exception.write.PageException;
2627
import org.apache.tsfile.file.header.ChunkHeader;
@@ -114,13 +115,28 @@ public boolean isAligned() {
114115

115116
@Override
116117
public void writeToFileWriter(final TsFileIOWriter writer) throws IOException {
118+
ensureDataReadyForWriting();
117119
if (chunk != null) {
118120
writer.writeChunk(chunk);
119121
} else {
120122
chunkWriter.writeToFileWriter(writer);
121123
}
122124
}
123125

126+
private void ensureDataReadyForWriting() throws IOException {
127+
if (chunk != null || chunkWriter != null) {
128+
return;
129+
}
130+
131+
try {
132+
deserializeTsFileData(
133+
new LoadTsFilePieceNode.ByteBufferInputStream(
134+
ByteBuffer.wrap(byteStream.getBuf(), 0, byteStream.size())));
135+
} catch (final PageException e) {
136+
throw new IOException(e);
137+
}
138+
}
139+
124140
@Override
125141
public void serialize(final DataOutputStream stream) throws IOException {
126142
ReadWriteIOUtils.write(getType().ordinal(), stream);
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.queryengine.plan.scheduler.load;
21+
22+
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
23+
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
24+
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
25+
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
26+
import org.apache.iotdb.commons.consensus.DataRegionId;
27+
import org.apache.iotdb.commons.partition.StorageExecutor;
28+
import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
29+
import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
30+
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
31+
import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
32+
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
33+
import org.apache.iotdb.db.storageengine.StorageEngine;
34+
import org.apache.iotdb.rpc.RpcUtils;
35+
36+
import org.junit.Test;
37+
import org.junit.runner.RunWith;
38+
import org.mockito.Mockito;
39+
import org.powermock.api.mockito.PowerMockito;
40+
import org.powermock.core.classloader.annotations.PowerMockIgnore;
41+
import org.powermock.core.classloader.annotations.PrepareForTest;
42+
import org.powermock.modules.junit4.PowerMockRunner;
43+
44+
import java.io.File;
45+
import java.util.Collections;
46+
47+
@PowerMockIgnore({"com.sun.org.apache.xerces.*", "javax.xml.*", "org.xml.*", "javax.management.*"})
48+
@RunWith(PowerMockRunner.class)
49+
@PrepareForTest(StorageEngine.class)
50+
public class LoadTsFileDispatcherImplTest {
51+
52+
@Test
53+
public void testDispatchLocallyPieceNodeSkipsSerdeRoundTrip() throws Exception {
54+
final StorageEngine storageEngine = Mockito.mock(StorageEngine.class);
55+
PowerMockito.mockStatic(StorageEngine.class);
56+
PowerMockito.when(StorageEngine.getInstance()).thenReturn(storageEngine);
57+
58+
final LoadTsFileDispatcherImpl dispatcher = new LoadTsFileDispatcherImpl(null, false);
59+
dispatcher.setUuid("test-uuid");
60+
61+
final LoadTsFilePieceNode pieceNode =
62+
new LoadTsFilePieceNode(new PlanNodeId("piece"), new File("test.tsfile"));
63+
final FragmentInstance instance = createFragmentInstance(pieceNode);
64+
65+
Mockito.when(
66+
storageEngine.writeLoadTsFileNode(
67+
Mockito.eq(new DataRegionId(1)), Mockito.same(pieceNode), Mockito.eq("test-uuid")))
68+
.thenReturn(RpcUtils.SUCCESS_STATUS);
69+
70+
dispatcher.dispatchLocally(instance);
71+
72+
Mockito.verify(storageEngine)
73+
.writeLoadTsFileNode(
74+
Mockito.eq(new DataRegionId(1)), Mockito.same(pieceNode), Mockito.eq("test-uuid"));
75+
}
76+
77+
private static FragmentInstance createFragmentInstance(final LoadTsFilePieceNode pieceNode) {
78+
final PlanFragmentId fragmentId = new PlanFragmentId("test", 0);
79+
final FragmentInstance instance =
80+
new FragmentInstance(
81+
new PlanFragment(fragmentId, pieceNode),
82+
fragmentId.genFragmentInstanceId(),
83+
null,
84+
null,
85+
0,
86+
null,
87+
false,
88+
false);
89+
final TConsensusGroupId consensusGroupId = new DataRegionId(1).convertToTConsensusGroupId();
90+
instance.setExecutorAndHost(
91+
new StorageExecutor(
92+
new TRegionReplicaSet(
93+
consensusGroupId,
94+
Collections.singletonList(
95+
new TDataNodeLocation().setInternalEndPoint(new TEndPoint("127.0.0.1", 1))))));
96+
return instance;
97+
}
98+
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.storageengine.load.splitter;
21+
22+
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
23+
24+
import org.apache.tsfile.enums.TSDataType;
25+
import org.apache.tsfile.file.header.ChunkHeader;
26+
import org.apache.tsfile.file.metadata.IChunkMetadata;
27+
import org.apache.tsfile.file.metadata.IDeviceID;
28+
import org.apache.tsfile.file.metadata.StringArrayDeviceID;
29+
import org.apache.tsfile.file.metadata.enums.CompressionType;
30+
import org.apache.tsfile.file.metadata.enums.TSEncoding;
31+
import org.apache.tsfile.file.metadata.statistics.Statistics;
32+
import org.apache.tsfile.read.common.Chunk;
33+
import org.apache.tsfile.write.writer.TsFileIOWriter;
34+
import org.junit.Test;
35+
import org.mockito.Mockito;
36+
37+
import java.nio.ByteBuffer;
38+
39+
public class ChunkDataDirectWriteTest {
40+
41+
@Test
42+
public void testNonAlignedChunkDataCanWriteWithoutSerdeRoundTrip() throws Exception {
43+
final NonAlignedChunkData chunkData = createNonAlignedChunkData();
44+
chunkData.setNotDecode();
45+
final IChunkMetadata chunkMetadata = Mockito.mock(IChunkMetadata.class);
46+
Mockito.doReturn(createInt32Statistics()).when(chunkMetadata).getStatistics();
47+
chunkData.writeEntireChunk(ByteBuffer.allocate(0), chunkMetadata);
48+
49+
final TsFileIOWriter writer = Mockito.mock(TsFileIOWriter.class);
50+
chunkData.writeToFileWriter(writer);
51+
52+
Mockito.verify(writer).writeChunk(Mockito.any(Chunk.class));
53+
}
54+
55+
@Test
56+
public void testAlignedChunkDataCanWriteWithoutSerdeRoundTrip() throws Exception {
57+
final AlignedChunkData chunkData = createAlignedChunkData();
58+
chunkData.setNotDecode();
59+
final IChunkMetadata chunkMetadata = Mockito.mock(IChunkMetadata.class);
60+
Mockito.doReturn(createInt32Statistics()).when(chunkMetadata).getStatistics();
61+
chunkData.writeEntireChunk(ByteBuffer.allocate(0), chunkMetadata);
62+
63+
final TsFileIOWriter writer = Mockito.mock(TsFileIOWriter.class);
64+
chunkData.writeToFileWriter(writer);
65+
66+
Mockito.verify(writer).writeChunk(Mockito.any(Chunk.class));
67+
}
68+
69+
private static Statistics<?> createInt32Statistics() {
70+
final Statistics<?> statistics = Statistics.getStatsByType(TSDataType.INT32);
71+
statistics.update(1L, 1);
72+
return statistics;
73+
}
74+
75+
private static NonAlignedChunkData createNonAlignedChunkData() {
76+
final IDeviceID device = new StringArrayDeviceID("root", "sg", "d1");
77+
return (NonAlignedChunkData)
78+
ChunkData.createChunkData(false, device, createChunkHeader(), new TTimePartitionSlot(0L));
79+
}
80+
81+
private static AlignedChunkData createAlignedChunkData() {
82+
final IDeviceID device = new StringArrayDeviceID("root", "sg", "d1");
83+
return (AlignedChunkData)
84+
ChunkData.createChunkData(true, device, createChunkHeader(), new TTimePartitionSlot(0L));
85+
}
86+
87+
private static ChunkHeader createChunkHeader() {
88+
return new ChunkHeader(
89+
"temperature", 0, TSDataType.INT32, CompressionType.UNCOMPRESSED, TSEncoding.PLAIN, 0);
90+
}
91+
}

0 commit comments

Comments
 (0)