Skip to content

Commit 50cefa2

Browse files
authored
Pipe: Fixed the OOM bug of parser for large aligned pages (#17639)
* chunk * shop * Update TsFileInsertionEventScanParser.java * Fix
1 parent 9348cb8 commit 50cefa2

5 files changed

Lines changed: 345 additions & 24 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/AlignedSinglePageWholeChunkReader.java

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan;
2121

22+
import org.apache.tsfile.compress.IUnCompressor;
2223
import org.apache.tsfile.encoding.decoder.Decoder;
2324
import org.apache.tsfile.encrypt.EncryptParameter;
2425
import org.apache.tsfile.encrypt.IDecryptor;
@@ -31,6 +32,7 @@
3132
import org.apache.tsfile.read.reader.chunk.AbstractChunkReader;
3233
import org.apache.tsfile.read.reader.chunk.ChunkReader;
3334
import org.apache.tsfile.read.reader.page.AlignedPageReader;
35+
import org.apache.tsfile.read.reader.page.LazyLoadPageData;
3436

3537
import java.io.IOException;
3638
import java.io.Serializable;
@@ -43,7 +45,8 @@
4345
* The {@link AlignedSinglePageWholeChunkReader} is used to read a whole single page aligned chunk
4446
* with need to pass in the statistics.
4547
*/
46-
public class AlignedSinglePageWholeChunkReader extends AbstractChunkReader {
48+
public class AlignedSinglePageWholeChunkReader extends AbstractChunkReader
49+
implements EstimatedMemoryChunkReader {
4750

4851
// chunk header of the time column
4952
private final ChunkHeader timeChunkHeader;
@@ -58,6 +61,7 @@ public class AlignedSinglePageWholeChunkReader extends AbstractChunkReader {
5861
private final List<ByteBuffer> valueChunkDataBufferList = new ArrayList<>();
5962
// deleted intervals of all the sub sensors
6063
private final List<List<TimeRange>> valueDeleteIntervalsList = new ArrayList<>();
64+
private final long pageEstimatedMemoryUsageInBytes;
6165

6266
public AlignedSinglePageWholeChunkReader(
6367
Chunk timeChunk, List<Chunk> valueChunkList, LongConsumer filteredRowsRecord)
@@ -66,6 +70,8 @@ public AlignedSinglePageWholeChunkReader(
6670
this.timeChunkHeader = timeChunk.getHeader();
6771
this.timeChunkDataBuffer = timeChunk.getData();
6872
this.encryptParam = timeChunk.getEncryptParam();
73+
this.pageEstimatedMemoryUsageInBytes =
74+
calculatePageEstimatedMemoryUsageInBytes(timeChunk, valueChunkList);
6975

7076
valueChunkList.forEach(
7177
chunk -> {
@@ -133,7 +139,7 @@ private AlignedPageReader constructAlignedPageReader(
133139
timePageHeader, timeChunkDataBuffer, timeChunkHeader, decryptor);
134140

135141
List<PageHeader> valuePageHeaderList = new ArrayList<>();
136-
List<ByteBuffer> valuePageDataList = new ArrayList<>();
142+
LazyLoadPageData[] valuePageDataArray = new LazyLoadPageData[rawValuePageHeaderList.size()];
137143
List<TSDataType> valueDataTypeList = new ArrayList<>();
138144
List<Decoder> valueDecoderList = new ArrayList<>();
139145

@@ -144,15 +150,22 @@ private AlignedPageReader constructAlignedPageReader(
144150
if (valuePageHeader == null || valuePageHeader.getUncompressedSize() == 0) {
145151
// Empty Page
146152
valuePageHeaderList.add(null);
147-
valuePageDataList.add(null);
153+
valuePageDataArray[i] = null;
148154
valueDataTypeList.add(null);
149155
valueDecoderList.add(null);
150156
} else {
151157
ChunkHeader valueChunkHeader = valueChunkHeaderList.get(i);
158+
int currentPagePosition = valueChunkDataBufferList.get(i).position();
159+
valueChunkDataBufferList
160+
.get(i)
161+
.position(currentPagePosition + valuePageHeader.getCompressedSize());
152162
valuePageHeaderList.add(valuePageHeader);
153-
valuePageDataList.add(
154-
ChunkReader.deserializePageData(
155-
valuePageHeader, valueChunkDataBufferList.get(i), valueChunkHeader, decryptor));
163+
valuePageDataArray[i] =
164+
new LazyLoadPageData(
165+
valueChunkDataBufferList.get(i).array(),
166+
currentPagePosition,
167+
IUnCompressor.getUnCompressor(valueChunkHeader.getCompressionType()),
168+
encryptParam);
156169
valueDataTypeList.add(valueChunkHeader.getDataType());
157170
valueDecoderList.add(
158171
Decoder.getDecoderByType(
@@ -169,11 +182,38 @@ private AlignedPageReader constructAlignedPageReader(
169182
timePageData,
170183
defaultTimeDecoder,
171184
valuePageHeaderList,
172-
valuePageDataList,
185+
valuePageDataArray,
173186
valueDataTypeList,
174187
valueDecoderList,
175188
queryFilter);
176189
alignedPageReader.setDeleteIntervalList(valueDeleteIntervalsList);
177190
return alignedPageReader;
178191
}
192+
193+
@Override
194+
public long getCurrentPageEstimatedMemoryUsageInBytes() {
195+
return pageEstimatedMemoryUsageInBytes;
196+
}
197+
198+
public static long calculatePageEstimatedMemoryUsageInBytes(
199+
final Chunk timeChunk, final List<Chunk> valueChunkList) throws IOException {
200+
final ByteBuffer timeChunkDataBuffer = timeChunk.getData().duplicate();
201+
long estimatedMemoryUsageInBytes =
202+
PageHeader.deserializeFrom(timeChunkDataBuffer, (Statistics<? extends Serializable>) null)
203+
.getUncompressedSize();
204+
205+
for (final Chunk valueChunk : valueChunkList) {
206+
if (valueChunk == null) {
207+
continue;
208+
}
209+
210+
final ByteBuffer valueChunkDataBuffer = valueChunk.getData().duplicate();
211+
estimatedMemoryUsageInBytes +=
212+
PageHeader.deserializeFrom(
213+
valueChunkDataBuffer, (Statistics<? extends Serializable>) null)
214+
.getUncompressedSize();
215+
}
216+
217+
return estimatedMemoryUsageInBytes;
218+
}
179219
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan;
21+
22+
interface EstimatedMemoryChunkReader {
23+
24+
long getCurrentPageEstimatedMemoryUsageInBytes();
25+
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/SinglePageWholeChunkReader.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.tsfile.file.metadata.statistics.Statistics;
3030
import org.apache.tsfile.read.common.Chunk;
3131
import org.apache.tsfile.read.reader.chunk.AbstractChunkReader;
32+
import org.apache.tsfile.read.reader.page.LazyLoadPageData;
3233
import org.apache.tsfile.read.reader.page.PageReader;
3334

3435
import java.io.IOException;
@@ -37,17 +38,20 @@
3738

3839
import static org.apache.tsfile.file.metadata.enums.CompressionType.UNCOMPRESSED;
3940

40-
public class SinglePageWholeChunkReader extends AbstractChunkReader {
41+
public class SinglePageWholeChunkReader extends AbstractChunkReader
42+
implements EstimatedMemoryChunkReader {
4143
private final ChunkHeader chunkHeader;
4244
private final ByteBuffer chunkDataBuffer;
4345
private final EncryptParameter encryptParam;
46+
private final long pageEstimatedMemoryUsageInBytes;
4447

4548
public SinglePageWholeChunkReader(Chunk chunk) throws IOException {
4649
super(Long.MIN_VALUE, null, null);
4750

4851
this.chunkHeader = chunk.getHeader();
4952
this.chunkDataBuffer = chunk.getData();
5053
this.encryptParam = chunk.getEncryptParam();
54+
this.pageEstimatedMemoryUsageInBytes = calculatePageEstimatedMemoryUsageInBytes(chunk);
5155
initAllPageReaders();
5256
}
5357

@@ -62,16 +66,34 @@ private void initAllPageReaders() throws IOException {
6266
}
6367

6468
private PageReader constructPageReader(PageHeader pageHeader) throws IOException {
65-
IDecryptor decryptor = IDecryptor.getDecryptor(encryptParam);
69+
final int currentPagePosition = chunkDataBuffer.position();
70+
chunkDataBuffer.position(currentPagePosition + pageHeader.getCompressedSize());
6671
return new PageReader(
6772
pageHeader,
68-
deserializePageData(pageHeader, chunkDataBuffer, chunkHeader, decryptor),
73+
new LazyLoadPageData(
74+
chunkDataBuffer.array(),
75+
currentPagePosition,
76+
IUnCompressor.getUnCompressor(chunkHeader.getCompressionType()),
77+
encryptParam),
6978
chunkHeader.getDataType(),
7079
Decoder.getDecoderByType(chunkHeader.getEncodingType(), chunkHeader.getDataType()),
7180
defaultTimeDecoder,
7281
null);
7382
}
7483

84+
@Override
85+
public long getCurrentPageEstimatedMemoryUsageInBytes() {
86+
return pageEstimatedMemoryUsageInBytes;
87+
}
88+
89+
public static long calculatePageEstimatedMemoryUsageInBytes(final Chunk chunk)
90+
throws IOException {
91+
final ByteBuffer chunkDataBuffer = chunk.getData().duplicate();
92+
final PageHeader pageHeader =
93+
PageHeader.deserializeFrom(chunkDataBuffer, (Statistics<? extends Serializable>) null);
94+
return pageHeader.getUncompressedSize();
95+
}
96+
7597
/////////////////////////////////////////////////////////////////////////////////////////////////
7698
// util methods
7799
/////////////////////////////////////////////////////////////////////////////////////////////////

0 commit comments

Comments
 (0)