Skip to content

Commit 32910ff

Browse files
authored
Load: Fix LoadPieceNode reading aligned Chunk causing OOM (#16132)
* Load: Fix LoadPieceNode reading aligned Chunk causing OOM * update alignedChunkData * add ByteBufferInputStream
1 parent b1dd87a commit 32910ff

6 files changed

Lines changed: 85 additions & 49 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import org.slf4j.Logger;
3535
import org.slf4j.LoggerFactory;
3636

37-
import java.io.ByteArrayInputStream;
3837
import java.io.ByteArrayOutputStream;
3938
import java.io.DataOutputStream;
4039
import java.io.File;
@@ -152,7 +151,9 @@ public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
152151
}
153152

154153
public static PlanNode deserialize(ByteBuffer buffer) {
155-
InputStream stream = new ByteArrayInputStream(buffer.array());
154+
buffer = buffer.duplicate();
155+
buffer.position(0);
156+
ByteBufferInputStream stream = new ByteBufferInputStream(buffer);
156157
try {
157158
ReadWriteIOUtils.readShort(stream); // read PlanNodeType
158159
final File tsFile = new File(ReadWriteIOUtils.readString(stream));
@@ -193,4 +194,41 @@ public int hashCode() {
193194
public String toString() {
194195
return "LoadTsFilePieceNode{" + "tsFile=" + tsFile + ", dataSize=" + dataSize + '}';
195196
}
197+
198+
public static class ByteBufferInputStream extends InputStream {
199+
private final ByteBuffer buffer;
200+
201+
public ByteBufferInputStream(ByteBuffer buffer) {
202+
this.buffer = buffer;
203+
}
204+
205+
@Override
206+
public int read() {
207+
if (!buffer.hasRemaining()) {
208+
return -1;
209+
}
210+
return buffer.get() & 0xFF;
211+
}
212+
213+
@Override
214+
public int read(byte[] b, int off, int len) {
215+
if (!buffer.hasRemaining()) {
216+
return -1;
217+
}
218+
int toRead = Math.min(len, buffer.remaining());
219+
buffer.get(b, off, toRead);
220+
return toRead;
221+
}
222+
223+
public ByteBuffer read(int length) {
224+
if (length < 0 || length > buffer.remaining()) {
225+
throw new IllegalArgumentException("Invalid length for slicing: " + length);
226+
}
227+
ByteBuffer slicedBuffer = buffer.slice();
228+
slicedBuffer.limit(length);
229+
230+
buffer.position(buffer.position() + length);
231+
return slicedBuffer;
232+
}
233+
}
196234
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
import org.apache.iotdb.rpc.TSStatusCode;
8282

8383
import org.apache.commons.io.FileUtils;
84+
import org.apache.tsfile.exception.write.PageException;
8485
import org.apache.tsfile.utils.FilePathUtils;
8586
import org.slf4j.Logger;
8687
import org.slf4j.LoggerFactory;
@@ -947,7 +948,7 @@ public TSStatus writeLoadTsFileNode(
947948

948949
try {
949950
loadTsFileManager.writeToDataRegion(dataRegion, pieceNode, uuid);
950-
} catch (IOException e) {
951+
} catch (IOException | PageException e) {
951952
LOGGER.warn(
952953
"IO error when writing piece node of TsFile {} to DataRegion {}.",
953954
pieceNode.getTsFile(),

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656

5757
import org.apache.tsfile.common.constant.TsFileConstant;
5858
import org.apache.tsfile.enums.TSDataType;
59+
import org.apache.tsfile.exception.write.PageException;
5960
import org.apache.tsfile.file.metadata.ChunkGroupMetadata;
6061
import org.apache.tsfile.file.metadata.ChunkMetadata;
6162
import org.apache.tsfile.file.metadata.IDeviceID;
@@ -203,7 +204,7 @@ private void recover() {
203204
}
204205

205206
public void writeToDataRegion(DataRegion dataRegion, LoadTsFilePieceNode pieceNode, String uuid)
206-
throws IOException {
207+
throws IOException, PageException {
207208
if (!uuid2WriterManager.containsKey(uuid)) {
208209
synchronized (uuid2CleanupTask) {
209210
final CleanupTask cleanupTask =
@@ -414,7 +415,8 @@ private void clearDir(File dir) {
414415
* BatchedAlignedChunkData, it may result in no data for the time column in the new file.
415416
*/
416417
@SuppressWarnings("squid:S3824")
417-
private void write(DataPartitionInfo partitionInfo, ChunkData chunkData) throws IOException {
418+
private void write(DataPartitionInfo partitionInfo, ChunkData chunkData)
419+
throws IOException, PageException {
418420
if (isClosed) {
419421
throw new IOException(String.format(MESSAGE_WRITER_MANAGER_HAS_BEEN_CLOSED, taskDir));
420422
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java

Lines changed: 35 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
2323
import org.apache.iotdb.commons.utils.TimePartitionUtils;
24+
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
2425

2526
import org.apache.tsfile.enums.TSDataType;
2627
import org.apache.tsfile.exception.write.PageException;
@@ -66,16 +67,15 @@ public class AlignedChunkData implements ChunkData {
6667
protected final IDeviceID device;
6768
protected List<ChunkHeader> chunkHeaderList;
6869

69-
protected final PublicBAOS byteStream;
70-
protected final DataOutputStream stream;
70+
protected PublicBAOS byteStream;
71+
protected DataOutputStream stream;
7172
protected List<long[]> timeBatch;
7273
protected long dataSize;
7374
protected boolean needDecodeChunk;
7475
protected List<Integer> pageNumbers;
7576
protected Queue<Integer> satisfiedLengthQueue;
7677

77-
private AlignedChunkWriterImpl chunkWriter;
78-
protected List<Chunk> chunkList;
78+
protected ByteBuffer chunkData;
7979

8080
public AlignedChunkData(
8181
@Nonnull final IDeviceID device,
@@ -143,14 +143,8 @@ public boolean isAligned() {
143143
}
144144

145145
@Override
146-
public void writeToFileWriter(final TsFileIOWriter writer) throws IOException {
147-
if (chunkList != null) {
148-
for (final Chunk chunk : chunkList) {
149-
writer.writeChunk(chunk);
150-
}
151-
} else {
152-
chunkWriter.writeToFileWriter(writer);
153-
}
146+
public void writeToFileWriter(final TsFileIOWriter writer) throws IOException, PageException {
147+
writeTsFileData(writer);
154148
}
155149

156150
public void addValueChunk(final ChunkHeader chunkHeader) {
@@ -167,6 +161,7 @@ public void serialize(final DataOutputStream stream) throws IOException {
167161
ReadWriteIOUtils.write(getType().ordinal(), stream);
168162
ReadWriteIOUtils.write(isAligned(), stream);
169163
serializeAttr(stream);
164+
ReadWriteIOUtils.write(byteStream.size(), stream);
170165
byteStream.writeTo(stream);
171166
close();
172167
}
@@ -291,26 +286,41 @@ public void writeDecodeValuePage(
291286
}
292287
}
293288

294-
protected void deserializeTsFileData(final InputStream stream) throws IOException, PageException {
289+
protected void writeTsFileData(TsFileIOWriter writer) throws IOException, PageException {
290+
final InputStream stream = new LoadTsFilePieceNode.ByteBufferInputStream(chunkData);
295291
if (needDecodeChunk) {
296-
buildChunkWriter(stream);
292+
writeChunkToWriter(stream, writer);
293+
} else {
294+
writeEntireChunkToWriter(stream, writer);
295+
}
296+
}
297+
298+
protected void deserializeTsFileDataByte(final InputStream stream) throws IOException {
299+
final int size = ReadWriteIOUtils.readInt(stream);
300+
if (stream instanceof LoadTsFilePieceNode.ByteBufferInputStream) {
301+
this.chunkData = ((LoadTsFilePieceNode.ByteBufferInputStream) stream).read(size);
297302
} else {
298-
deserializeEntireChunk(stream);
303+
byte[] data = new byte[size];
304+
if (size != stream.read(data)) {
305+
throw new IOException("TsFileData byte array read error, size mismatch.");
306+
}
307+
this.chunkData = ByteBuffer.wrap(data);
299308
}
300309
}
301310

302-
private void deserializeEntireChunk(final InputStream stream) throws IOException {
303-
chunkList = new ArrayList<>();
311+
private void writeEntireChunkToWriter(final InputStream stream, final TsFileIOWriter writer)
312+
throws IOException {
304313
for (final ChunkHeader chunkHeader : chunkHeaderList) {
305314
final ByteBuffer chunkData =
306315
ByteBuffer.wrap(ReadWriteIOUtils.readBytesWithSelfDescriptionLength(stream));
307316
final Statistics<? extends Serializable> statistics =
308317
Statistics.deserialize(stream, chunkHeader.getDataType());
309-
chunkList.add(new Chunk(chunkHeader, chunkData, null, statistics));
318+
writer.writeChunk(new Chunk(chunkHeader, chunkData, null, statistics));
310319
}
311320
}
312321

313-
protected void buildChunkWriter(final InputStream stream) throws IOException, PageException {
322+
protected void writeChunkToWriter(final InputStream stream, final TsFileIOWriter writer)
323+
throws IOException, PageException {
314324
final List<IMeasurementSchema> measurementSchemaList = new ArrayList<>();
315325
IMeasurementSchema timeSchema = null;
316326
for (final ChunkHeader chunkHeader : chunkHeaderList) {
@@ -330,17 +340,20 @@ protected void buildChunkWriter(final InputStream stream) throws IOException, Pa
330340
chunkHeader.getEncodingType(),
331341
chunkHeader.getCompressionType()));
332342
}
333-
chunkWriter = new AlignedChunkWriterImpl(timeSchema, measurementSchemaList);
343+
AlignedChunkWriterImpl chunkWriter =
344+
new AlignedChunkWriterImpl(timeSchema, measurementSchemaList);
334345
timeBatch = new ArrayList<>();
335346
final int chunkHeaderSize = chunkHeaderList.size();
336347
for (int i = 0; i < chunkHeaderSize; i++) {
337-
buildChunk(stream, chunkHeaderList.get(i), pageNumbers.get(i), i - 1, i == 0);
348+
buildChunk(chunkWriter, stream, chunkHeaderList.get(i), pageNumbers.get(i), i - 1, i == 0);
338349
}
339350
timeBatch = null;
351+
chunkWriter.writeToFileWriter(writer);
340352
}
341353

342354
@SuppressWarnings({"squid:S6541", "squid:S3776"})
343355
private void buildChunk(
356+
final AlignedChunkWriterImpl chunkWriter,
344357
final InputStream stream,
345358
final ChunkHeader chunkHeader,
346359
final int pageNumber,
@@ -463,7 +476,7 @@ public static AlignedChunkData deserialize(final InputStream stream)
463476
chunkData.needDecodeChunk = needDecodeChunk;
464477
chunkData.chunkHeaderList = chunkHeaderList;
465478
chunkData.pageNumbers = pageNumbers;
466-
chunkData.deserializeTsFileData(stream);
479+
chunkData.deserializeTsFileDataByte(stream);
467480
chunkData.dataSize = dataSize;
468481
chunkData.close();
469482
return chunkData;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/BatchedAlignedValueChunkData.java

Lines changed: 3 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.apache.tsfile.file.header.PageHeader;
2929
import org.apache.tsfile.file.metadata.IDeviceID;
3030
import org.apache.tsfile.file.metadata.statistics.Statistics;
31-
import org.apache.tsfile.read.common.Chunk;
3231
import org.apache.tsfile.utils.Binary;
3332
import org.apache.tsfile.utils.ReadWriteIOUtils;
3433
import org.apache.tsfile.utils.TsPrimitiveType;
@@ -41,8 +40,6 @@
4140
import java.io.InputStream;
4241
import java.io.Serializable;
4342
import java.nio.ByteBuffer;
44-
import java.util.ArrayList;
45-
import java.util.List;
4643

4744
/**
4845
* This class is used to be compatible with the new distribution of aligned series in chunk group.
@@ -52,8 +49,6 @@
5249
*/
5350
public class BatchedAlignedValueChunkData extends AlignedChunkData {
5451

55-
private List<ValueChunkWriter> valueChunkWriters;
56-
5752
// Used for splitter
5853
public BatchedAlignedValueChunkData(AlignedChunkData alignedChunkData) {
5954
super(alignedChunkData);
@@ -62,7 +57,6 @@ public BatchedAlignedValueChunkData(AlignedChunkData alignedChunkData) {
6257
// Used for deserialize
6358
public BatchedAlignedValueChunkData(IDeviceID device, TTimePartitionSlot timePartitionSlot) {
6459
super(device, timePartitionSlot);
65-
valueChunkWriters = new ArrayList<>();
6660
}
6761

6862
@Override
@@ -130,7 +124,8 @@ public void writeDecodeValuePage(long[] times, TsPrimitiveType[] values, TSDataT
130124
}
131125

132126
@Override
133-
protected void buildChunkWriter(final InputStream stream) throws IOException, PageException {
127+
protected void writeChunkToWriter(final InputStream stream, final TsFileIOWriter writer)
128+
throws IOException, PageException {
134129
for (int i = 0; i < chunkHeaderList.size(); i++) {
135130
ChunkHeader chunkHeader = chunkHeaderList.get(i);
136131
MeasurementSchema measurementSchema =
@@ -146,8 +141,8 @@ protected void buildChunkWriter(final InputStream stream) throws IOException, Pa
146141
measurementSchema.getType(),
147142
measurementSchema.getEncodingType(),
148143
measurementSchema.getValueEncoder());
149-
valueChunkWriters.add(valueChunkWriter);
150144
buildValueChunkWriter(stream, chunkHeader, pageNumbers.get(i), valueChunkWriter);
145+
valueChunkWriter.writeToFileWriter(writer);
151146
}
152147
}
153148

@@ -226,17 +221,4 @@ private void buildValueChunkWriter(
226221
valueChunkWriter.sealCurrentPage();
227222
}
228223
}
229-
230-
@Override
231-
public void writeToFileWriter(TsFileIOWriter writer) throws IOException {
232-
if (chunkList != null) {
233-
for (final Chunk chunk : chunkList) {
234-
writer.writeChunk(chunk);
235-
}
236-
} else {
237-
for (ValueChunkWriter valueChunkWriter : valueChunkWriters) {
238-
valueChunkWriter.writeToFileWriter(writer);
239-
}
240-
}
241-
}
242224
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/ChunkData.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public interface ChunkData extends TsFileData {
4848

4949
void writeDecodePage(long[] times, Object[] values, int satisfiedLength) throws IOException;
5050

51-
void writeToFileWriter(TsFileIOWriter writer) throws IOException;
51+
void writeToFileWriter(TsFileIOWriter writer) throws IOException, PageException;
5252

5353
@Override
5454
default TsFileDataType getType() {

0 commit comments

Comments
 (0)