Skip to content

Commit 5a1a6d0

Browse files
committed
Pipe: Fixed the OOM bug of parser for large aligned pages (#17639)
* chunk * shop * Update TsFileInsertionEventScanParser.java * Fix
1 parent 6a1961b commit 5a1a6d0

5 files changed

Lines changed: 336 additions & 22 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/AlignedSinglePageWholeChunkReader.java

Lines changed: 46 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.iotdb.db.pipe.event.common.tsfile.container.scan;
2121

22+
import org.apache.tsfile.compress.IUnCompressor;
2223
import org.apache.tsfile.encoding.decoder.Decoder;
2324
import org.apache.tsfile.enums.TSDataType;
2425
import org.apache.tsfile.file.header.ChunkHeader;
@@ -29,6 +30,7 @@
2930
import org.apache.tsfile.read.reader.chunk.AbstractChunkReader;
3031
import org.apache.tsfile.read.reader.chunk.ChunkReader;
3132
import org.apache.tsfile.read.reader.page.AlignedPageReader;
33+
import org.apache.tsfile.read.reader.page.LazyLoadPageData;
3234

3335
import java.io.IOException;
3436
import java.io.Serializable;
@@ -40,7 +42,8 @@
4042
* The {@link AlignedSinglePageWholeChunkReader} is used to read a whole single page aligned chunk
4143
* with need to pass in the statistics.
4244
*/
43-
public class AlignedSinglePageWholeChunkReader extends AbstractChunkReader {
45+
public class AlignedSinglePageWholeChunkReader extends AbstractChunkReader
46+
implements EstimatedMemoryChunkReader {
4447

4548
// chunk header of the time column
4649
private final ChunkHeader timeChunkHeader;
@@ -53,12 +56,15 @@ public class AlignedSinglePageWholeChunkReader extends AbstractChunkReader {
5356
private final List<ByteBuffer> valueChunkDataBufferList = new ArrayList<>();
5457
// deleted intervals of all the sub sensors
5558
private final List<List<TimeRange>> valueDeleteIntervalsList = new ArrayList<>();
59+
private final long pageEstimatedMemoryUsageInBytes;
5660

5761
public AlignedSinglePageWholeChunkReader(Chunk timeChunk, List<Chunk> valueChunkList)
5862
throws IOException {
5963
super(Long.MIN_VALUE, null);
6064
this.timeChunkHeader = timeChunk.getHeader();
6165
this.timeChunkDataBuffer = timeChunk.getData();
66+
this.pageEstimatedMemoryUsageInBytes =
67+
calculatePageEstimatedMemoryUsageInBytes(timeChunk, valueChunkList);
6268

6369
valueChunkList.forEach(
6470
chunk -> {
@@ -124,7 +130,7 @@ private AlignedPageReader constructAlignedPageReader(
124130
ChunkReader.deserializePageData(timePageHeader, timeChunkDataBuffer, timeChunkHeader);
125131

126132
List<PageHeader> valuePageHeaderList = new ArrayList<>();
127-
List<ByteBuffer> valuePageDataList = new ArrayList<>();
133+
LazyLoadPageData[] valuePageDataArray = new LazyLoadPageData[rawValuePageHeaderList.size()];
128134
List<TSDataType> valueDataTypeList = new ArrayList<>();
129135
List<Decoder> valueDecoderList = new ArrayList<>();
130136

@@ -135,15 +141,21 @@ private AlignedPageReader constructAlignedPageReader(
135141
if (valuePageHeader == null || valuePageHeader.getUncompressedSize() == 0) {
136142
// Empty Page
137143
valuePageHeaderList.add(null);
138-
valuePageDataList.add(null);
144+
valuePageDataArray[i] = null;
139145
valueDataTypeList.add(null);
140146
valueDecoderList.add(null);
141147
} else {
142148
ChunkHeader valueChunkHeader = valueChunkHeaderList.get(i);
149+
int currentPagePosition = valueChunkDataBufferList.get(i).position();
150+
valueChunkDataBufferList
151+
.get(i)
152+
.position(currentPagePosition + valuePageHeader.getCompressedSize());
143153
valuePageHeaderList.add(valuePageHeader);
144-
valuePageDataList.add(
145-
ChunkReader.deserializePageData(
146-
valuePageHeader, valueChunkDataBufferList.get(i), valueChunkHeader));
154+
valuePageDataArray[i] =
155+
new LazyLoadPageData(
156+
valueChunkDataBufferList.get(i).array(),
157+
currentPagePosition,
158+
IUnCompressor.getUnCompressor(valueChunkHeader.getCompressionType()));
147159
valueDataTypeList.add(valueChunkHeader.getDataType());
148160
valueDecoderList.add(
149161
Decoder.getDecoderByType(
@@ -160,11 +172,38 @@ private AlignedPageReader constructAlignedPageReader(
160172
timePageData,
161173
defaultTimeDecoder,
162174
valuePageHeaderList,
163-
valuePageDataList,
175+
valuePageDataArray,
164176
valueDataTypeList,
165177
valueDecoderList,
166178
queryFilter);
167179
alignedPageReader.setDeleteIntervalList(valueDeleteIntervalsList);
168180
return alignedPageReader;
169181
}
182+
183+
@Override
184+
public long getCurrentPageEstimatedMemoryUsageInBytes() {
185+
return pageEstimatedMemoryUsageInBytes;
186+
}
187+
188+
public static long calculatePageEstimatedMemoryUsageInBytes(
189+
final Chunk timeChunk, final List<Chunk> valueChunkList) throws IOException {
190+
final ByteBuffer timeChunkDataBuffer = timeChunk.getData().duplicate();
191+
long estimatedMemoryUsageInBytes =
192+
PageHeader.deserializeFrom(timeChunkDataBuffer, (Statistics<? extends Serializable>) null)
193+
.getUncompressedSize();
194+
195+
for (final Chunk valueChunk : valueChunkList) {
196+
if (valueChunk == null) {
197+
continue;
198+
}
199+
200+
final ByteBuffer valueChunkDataBuffer = valueChunk.getData().duplicate();
201+
estimatedMemoryUsageInBytes +=
202+
PageHeader.deserializeFrom(
203+
valueChunkDataBuffer, (Statistics<? extends Serializable>) null)
204+
.getUncompressedSize();
205+
}
206+
207+
return estimatedMemoryUsageInBytes;
208+
}
170209
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.pipe.event.common.tsfile.container.scan;
21+
22+
interface EstimatedMemoryChunkReader {
23+
24+
long getCurrentPageEstimatedMemoryUsageInBytes();
25+
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/SinglePageWholeChunkReader.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,22 +26,25 @@
2626
import org.apache.tsfile.file.metadata.statistics.Statistics;
2727
import org.apache.tsfile.read.common.Chunk;
2828
import org.apache.tsfile.read.reader.chunk.AbstractChunkReader;
29+
import org.apache.tsfile.read.reader.page.LazyLoadPageData;
2930
import org.apache.tsfile.read.reader.page.PageReader;
3031

3132
import java.io.IOException;
3233
import java.io.Serializable;
3334
import java.nio.ByteBuffer;
3435

35-
public class SinglePageWholeChunkReader extends AbstractChunkReader {
36+
public class SinglePageWholeChunkReader extends AbstractChunkReader
37+
implements EstimatedMemoryChunkReader {
3638
private final ChunkHeader chunkHeader;
3739
private final ByteBuffer chunkDataBuffer;
40+
private final long pageEstimatedMemoryUsageInBytes;
3841

3942
public SinglePageWholeChunkReader(Chunk chunk) throws IOException {
4043
super(Long.MIN_VALUE, null);
4144

4245
this.chunkHeader = chunk.getHeader();
4346
this.chunkDataBuffer = chunk.getData();
44-
47+
this.pageEstimatedMemoryUsageInBytes = calculatePageEstimatedMemoryUsageInBytes(chunk);
4548
initAllPageReaders();
4649
}
4750

@@ -56,15 +59,33 @@ private void initAllPageReaders() throws IOException {
5659
}
5760

5861
private PageReader constructPageReader(PageHeader pageHeader) throws IOException {
62+
final int currentPagePosition = chunkDataBuffer.position();
63+
chunkDataBuffer.position(currentPagePosition + pageHeader.getCompressedSize());
5964
return new PageReader(
6065
pageHeader,
61-
deserializePageData(pageHeader, chunkDataBuffer, chunkHeader),
66+
new LazyLoadPageData(
67+
chunkDataBuffer.array(),
68+
currentPagePosition,
69+
IUnCompressor.getUnCompressor(chunkHeader.getCompressionType())),
6270
chunkHeader.getDataType(),
6371
Decoder.getDecoderByType(chunkHeader.getEncodingType(), chunkHeader.getDataType()),
6472
defaultTimeDecoder,
6573
null);
6674
}
6775

76+
@Override
77+
public long getCurrentPageEstimatedMemoryUsageInBytes() {
78+
return pageEstimatedMemoryUsageInBytes;
79+
}
80+
81+
public static long calculatePageEstimatedMemoryUsageInBytes(final Chunk chunk)
82+
throws IOException {
83+
final ByteBuffer chunkDataBuffer = chunk.getData().duplicate();
84+
final PageHeader pageHeader =
85+
PageHeader.deserializeFrom(chunkDataBuffer, (Statistics<? extends Serializable>) null);
86+
return pageHeader.getUncompressedSize();
87+
}
88+
6889
/////////////////////////////////////////////////////////////////////////////////////////////////
6990
// util methods
7091
/////////////////////////////////////////////////////////////////////////////////////////////////

0 commit comments

Comments
 (0)