Skip to content

Commit 2be334f

Browse files
author
zhongheng.gy
committed
[common] Fix RowHelper internal buffer never shrinking for large records
1 parent 1e1c9f4 commit 2be334f

3 files changed

Lines changed: 154 additions & 2 deletions

File tree

paimon-common/src/main/java/org/apache/paimon/data/RowHelper.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,14 @@ public class RowHelper implements Serializable {
3636

3737
private static final long serialVersionUID = 1L;
3838

39+
/**
40+
* Threshold in bytes for releasing the internal reuse buffer. When big records are written, the
41+
* BinaryRowWriter's internal segment can grow very large via grow(). The {@link
42+
* #resetIfTooLarge()} method checks this threshold and releases the bloated
43+
* reuseRow/reuseWriter to avoid holding onto oversized buffers indefinitely.
44+
*/
45+
private static final int REUSE_RELEASE_THRESHOLD = 4 * 1024 * 1024; // 4MB
46+
3947
private final FieldGetter[] fieldGetters;
4048
private final ValueSetter[] valueSetters;
4149
private final boolean[] writeNulls;
@@ -81,6 +89,21 @@ public void copyInto(InternalRow row) {
8189
reuseWriter.complete();
8290
}
8391

92+
/**
93+
* Release the internal reuse buffer if the segment exceeds the threshold AND the last written
94+
* record is small. This hysteresis avoids thrashing when records are consistently large, while
95+
* still reclaiming memory when the workload transitions back to small records.
96+
*/
97+
public void resetIfTooLarge() {
98+
if (reuseWriter != null
99+
&& reuseWriter.getSegments() != null
100+
&& reuseWriter.getSegments().size() > REUSE_RELEASE_THRESHOLD
101+
&& reuseRow.getSizeInBytes() < REUSE_RELEASE_THRESHOLD) {
102+
reuseRow = null;
103+
reuseWriter = null;
104+
}
105+
}
106+
84107
public BinaryRow reuseRow() {
85108
return reuseRow;
86109
}

paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,16 @@ public InternalRowSerializer duplicate() {
5959

6060
@Override
6161
public void serialize(InternalRow row, DataOutputView target) throws IOException {
62-
binarySerializer.serialize(toBinaryRow(row), target);
62+
try {
63+
binarySerializer.serialize(toBinaryRow(row), target);
64+
} finally {
65+
// Must use finally here: toBinaryRow() may inflate RowHelper's internal buffer
66+
// for large records (e.g. 100MB+). The serialization can exit via EOFException
67+
// thrown by SimpleCollectingOutputView.nextSegment() when the sort buffer is
68+
// full, which is caught by BinaryInMemorySortBuffer.write() as a normal signal.
69+
// Without finally, the bloated buffer would never be released on that path.
70+
rowHelper.resetIfTooLarge();
71+
}
6372
}
6473

6574
@Override
@@ -132,7 +141,13 @@ public InternalRow createReuseInstance() {
132141
@Override
133142
public int serializeToPages(InternalRow row, AbstractPagedOutputView target)
134143
throws IOException {
135-
return binarySerializer.serializeToPages(toBinaryRow(row), target);
144+
try {
145+
return binarySerializer.serializeToPages(toBinaryRow(row), target);
146+
} finally {
147+
// Same as serialize(): must use finally because EOFException may bypass normal
148+
// return when the sort buffer is full.
149+
rowHelper.resetIfTooLarge();
150+
}
136151
}
137152

138153
@Override
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.data;
20+
21+
import org.apache.paimon.types.DataTypes;
22+
import org.apache.paimon.types.RowKind;
23+
24+
import org.junit.jupiter.api.Test;
25+
26+
import java.util.Arrays;
27+
28+
import static org.assertj.core.api.Assertions.assertThat;
29+
30+
/** Tests for {@link RowHelper}, focusing on the resetIfTooLarge() behavior. */
31+
class RowHelperTest {
32+
33+
@Test
34+
void testResetIfTooLargeReleasesAfterTransitionToSmallRecord() {
35+
RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING(), DataTypes.BYTES()));
36+
37+
// Write a large record (> 4MB) to inflate the internal buffer
38+
byte[] largePayload = new byte[5 * 1024 * 1024]; // 5MB
39+
Arrays.fill(largePayload, (byte) 'x');
40+
GenericRow largeRow = GenericRow.of(BinaryString.fromString("key"), largePayload);
41+
largeRow.setRowKind(RowKind.INSERT);
42+
helper.copyInto(largeRow);
43+
44+
assertThat(helper.reuseRow()).isNotNull();
45+
46+
// Hysteresis: resetIfTooLarge() should NOT release when last record is large
47+
helper.resetIfTooLarge();
48+
assertThat(helper.reuseRow()).isNotNull();
49+
50+
// Now write a small record — buffer is still oversized from the large record
51+
GenericRow smallRow = GenericRow.of(BinaryString.fromString("s"), new byte[10]);
52+
smallRow.setRowKind(RowKind.INSERT);
53+
helper.copyInto(smallRow);
54+
55+
// resetIfTooLarge() should release now: buffer > 4MB but last record < 4MB
56+
helper.resetIfTooLarge();
57+
assertThat(helper.reuseRow()).isNull();
58+
}
59+
60+
@Test
61+
void testResetIfTooLargeKeepsSmallBuffer() {
62+
RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING(), DataTypes.INT()));
63+
64+
// Write a small record (< 4MB)
65+
GenericRow smallRow = GenericRow.of(BinaryString.fromString("hello"), 42);
66+
smallRow.setRowKind(RowKind.INSERT);
67+
helper.copyInto(smallRow);
68+
69+
assertThat(helper.reuseRow()).isNotNull();
70+
71+
// resetIfTooLarge() should NOT release the small buffer
72+
helper.resetIfTooLarge();
73+
assertThat(helper.reuseRow()).isNotNull();
74+
}
75+
76+
@Test
77+
void testResetIfTooLargeBeforeCopyInto() {
78+
RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING()));
79+
80+
// reuseRow is null before any copyInto
81+
assertThat(helper.reuseRow()).isNull();
82+
83+
// resetIfTooLarge() should be safe to call when reuseRow is null
84+
helper.resetIfTooLarge();
85+
assertThat(helper.reuseRow()).isNull();
86+
}
87+
88+
@Test
89+
void testReuseIsRecreatedAfterRelease() {
90+
RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING(), DataTypes.BYTES()));
91+
92+
// Write a large record to inflate the buffer, then a small record to trigger release
93+
byte[] largePayload = new byte[5 * 1024 * 1024];
94+
GenericRow largeRow = GenericRow.of(BinaryString.fromString("key"), largePayload);
95+
largeRow.setRowKind(RowKind.INSERT);
96+
helper.copyInto(largeRow);
97+
98+
GenericRow smallRow = GenericRow.of(BinaryString.fromString("small"), new byte[10]);
99+
smallRow.setRowKind(RowKind.INSERT);
100+
helper.copyInto(smallRow);
101+
102+
// Buffer is oversized + last record is small → release
103+
helper.resetIfTooLarge();
104+
assertThat(helper.reuseRow()).isNull();
105+
106+
// Write another small record — reuseRow should be recreated
107+
helper.copyInto(smallRow);
108+
assertThat(helper.reuseRow()).isNotNull();
109+
110+
// Small buffer should survive resetIfTooLarge()
111+
helper.resetIfTooLarge();
112+
assertThat(helper.reuseRow()).isNotNull();
113+
}
114+
}

0 commit comments

Comments
 (0)