Skip to content

Commit 618fb54

Browse files
author
zhongheng.gy
committed
pass BinaryRow to resetIfTooLarge for ownership check
1 parent 2be334f commit 618fb54

3 files changed

Lines changed: 66 additions & 44 deletions

File tree

paimon-common/src/main/java/org/apache/paimon/data/RowHelper.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public class RowHelper implements Serializable {
3939
/**
4040
* Threshold in bytes for releasing the internal reuse buffer. When big records are written, the
4141
* BinaryRowWriter's internal segment can grow very large via grow(). The {@link
42-
* #resetIfTooLarge()} method checks this threshold and releases the bloated
42+
* #resetIfTooLarge(BinaryRow)} method checks this threshold and releases the bloated
4343
* reuseRow/reuseWriter to avoid holding onto oversized buffers indefinitely.
4444
*/
4545
private static final int REUSE_RELEASE_THRESHOLD = 4 * 1024 * 1024; // 4MB
@@ -90,15 +90,22 @@ public void copyInto(InternalRow row) {
9090
}
9191

9292
/**
93-
* Release the internal reuse buffer if the segment exceeds the threshold AND the last written
94-
* record is small. This hysteresis avoids thrashing when records are consistently large, while
95-
* still reclaiming memory when the workload transitions back to small records.
93+
* Release the internal reuse buffer if the given row is the reuse row produced by this helper,
94+
* the backing segment exceeds the threshold, and the current record is small. The identity
95+
* check ({@code currentRow == reuseRow}) ensures we only act when the caller actually used this
96+
* helper's buffer — if the input was already a {@link BinaryRow}, {@code toBinaryRow()} returns
97+
* it directly and the helper state is stale, so we must skip cleanup.
98+
*
99+
* <p>The hysteresis ({@code currentRow.getSizeInBytes() < threshold}) avoids thrashing when
100+
* records are consistently large, while still reclaiming memory when the workload transitions
101+
* back to small records.
96102
*/
97-
public void resetIfTooLarge() {
98-
if (reuseWriter != null
103+
public void resetIfTooLarge(BinaryRow currentRow) {
104+
if (currentRow == reuseRow
105+
&& reuseWriter != null
99106
&& reuseWriter.getSegments() != null
100107
&& reuseWriter.getSegments().size() > REUSE_RELEASE_THRESHOLD
101-
&& reuseRow.getSizeInBytes() < REUSE_RELEASE_THRESHOLD) {
108+
&& currentRow.getSizeInBytes() < REUSE_RELEASE_THRESHOLD) {
102109
reuseRow = null;
103110
reuseWriter = null;
104111
}

paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -59,15 +59,11 @@ public InternalRowSerializer duplicate() {
5959

6060
@Override
6161
public void serialize(InternalRow row, DataOutputView target) throws IOException {
62+
BinaryRow binaryRow = toBinaryRow(row);
6263
try {
63-
binarySerializer.serialize(toBinaryRow(row), target);
64+
binarySerializer.serialize(binaryRow, target);
6465
} finally {
65-
// Must use finally here: toBinaryRow() may inflate RowHelper's internal buffer
66-
// for large records (e.g. 100MB+). The serialization can exit via EOFException
67-
// thrown by SimpleCollectingOutputView.nextSegment() when the sort buffer is
68-
// full, which is caught by BinaryInMemorySortBuffer.write() as a normal signal.
69-
// Without finally, the bloated buffer would never be released on that path.
70-
rowHelper.resetIfTooLarge();
66+
rowHelper.resetIfTooLarge(binaryRow);
7167
}
7268
}
7369

@@ -141,12 +137,11 @@ public InternalRow createReuseInstance() {
141137
@Override
142138
public int serializeToPages(InternalRow row, AbstractPagedOutputView target)
143139
throws IOException {
140+
BinaryRow binaryRow = toBinaryRow(row);
144141
try {
145-
return binarySerializer.serializeToPages(toBinaryRow(row), target);
142+
return binarySerializer.serializeToPages(binaryRow, target);
146143
} finally {
147-
// Same as serialize(): must use finally because EOFException may bypass normal
148-
// return when the sort buffer is full.
149-
rowHelper.resetIfTooLarge();
144+
rowHelper.resetIfTooLarge(binaryRow);
150145
}
151146
}
152147

paimon-common/src/test/java/org/apache/paimon/data/RowHelperTest.java

Lines changed: 46 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.paimon.data;
2020

21+
import org.apache.paimon.memory.MemorySegment;
2122
import org.apache.paimon.types.DataTypes;
2223
import org.apache.paimon.types.RowKind;
2324

@@ -27,69 +28,89 @@
2728

2829
import static org.assertj.core.api.Assertions.assertThat;
2930

30-
/** Tests for {@link RowHelper}, focusing on the resetIfTooLarge() behavior. */
31+
/** Tests for {@link RowHelper}, focusing on the resetIfTooLarge(BinaryRow) behavior. */
3132
class RowHelperTest {
3233

3334
@Test
34-
void testResetIfTooLargeReleasesAfterTransitionToSmallRecord() {
35+
void testReleasesWhenTransitionFromLargeToSmallRecord() {
3536
RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING(), DataTypes.BYTES()));
3637

3738
// Write a large record (> 4MB) to inflate the internal buffer
38-
byte[] largePayload = new byte[5 * 1024 * 1024]; // 5MB
39-
Arrays.fill(largePayload, (byte) 'x');
39+
byte[] largePayload = new byte[5 * 1024 * 1024];
4040
GenericRow largeRow = GenericRow.of(BinaryString.fromString("key"), largePayload);
4141
largeRow.setRowKind(RowKind.INSERT);
4242
helper.copyInto(largeRow);
43+
BinaryRow reuseAfterLarge = helper.reuseRow();
44+
assertThat(reuseAfterLarge).isNotNull();
4345

46+
// Hysteresis: should NOT release when current record is large
47+
helper.resetIfTooLarge(reuseAfterLarge);
4448
assertThat(helper.reuseRow()).isNotNull();
4549

46-
// Hysteresis: resetIfTooLarge() should NOT release when last record is large
47-
helper.resetIfTooLarge();
48-
assertThat(helper.reuseRow()).isNotNull();
49-
50-
// Now write a small record — buffer is still oversized from the large record
50+
// Write a small record — buffer is still oversized from the large record
5151
GenericRow smallRow = GenericRow.of(BinaryString.fromString("s"), new byte[10]);
5252
smallRow.setRowKind(RowKind.INSERT);
5353
helper.copyInto(smallRow);
54+
BinaryRow reuseAfterSmall = helper.reuseRow();
5455

55-
// resetIfTooLarge() should release now: buffer > 4MB but last record < 4MB
56-
helper.resetIfTooLarge();
56+
// Should release now: buffer > 4MB but current record < 4MB
57+
helper.resetIfTooLarge(reuseAfterSmall);
5758
assertThat(helper.reuseRow()).isNull();
5859
}
5960

6061
@Test
61-
void testResetIfTooLargeKeepsSmallBuffer() {
62+
void testKeepsSmallBuffer() {
6263
RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING(), DataTypes.INT()));
6364

64-
// Write a small record (< 4MB)
6565
GenericRow smallRow = GenericRow.of(BinaryString.fromString("hello"), 42);
6666
smallRow.setRowKind(RowKind.INSERT);
6767
helper.copyInto(smallRow);
68+
BinaryRow reuse = helper.reuseRow();
69+
assertThat(reuse).isNotNull();
6870

71+
// Small buffer should NOT be released
72+
helper.resetIfTooLarge(reuse);
6973
assertThat(helper.reuseRow()).isNotNull();
74+
}
7075

71-
// resetIfTooLarge() should NOT release the small buffer
72-
helper.resetIfTooLarge();
76+
@Test
77+
void testSkipsWhenCurrentRowIsNotReuseRow() {
78+
RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING(), DataTypes.BYTES()));
79+
80+
// Write a large record to inflate the buffer
81+
byte[] largePayload = new byte[5 * 1024 * 1024];
82+
GenericRow largeRow = GenericRow.of(BinaryString.fromString("key"), largePayload);
83+
largeRow.setRowKind(RowKind.INSERT);
84+
helper.copyInto(largeRow);
85+
assertThat(helper.reuseRow()).isNotNull();
86+
87+
// Simulate the BinaryRow input path: toBinaryRow() returns the input directly,
88+
// not the helper's reuseRow. Pass a different BinaryRow instance.
89+
BinaryRow externalRow = new BinaryRow(2);
90+
externalRow.pointTo(MemorySegment.wrap(new byte[32]), 0, 32);
91+
92+
// Should NOT release because externalRow != reuseRow
93+
helper.resetIfTooLarge(externalRow);
7394
assertThat(helper.reuseRow()).isNotNull();
7495
}
7596

7697
@Test
77-
void testResetIfTooLargeBeforeCopyInto() {
98+
void testSafeToCallWithNullReuseRow() {
7899
RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING()));
79-
80-
// reuseRow is null before any copyInto
81100
assertThat(helper.reuseRow()).isNull();
82101

83-
// resetIfTooLarge() should be safe to call when reuseRow is null
84-
helper.resetIfTooLarge();
102+
// Should be safe — reuseRow is null, no NPE
103+
BinaryRow someRow = new BinaryRow(1);
104+
someRow.pointTo(MemorySegment.wrap(new byte[16]), 0, 16);
105+
helper.resetIfTooLarge(someRow);
85106
assertThat(helper.reuseRow()).isNull();
86107
}
87108

88109
@Test
89-
void testReuseIsRecreatedAfterRelease() {
110+
void testReuseRecreatedAfterRelease() {
90111
RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING(), DataTypes.BYTES()));
91112

92-
// Write a large record to inflate the buffer, then a small record to trigger release
113+
// Inflate buffer, then transition to small
93114
byte[] largePayload = new byte[5 * 1024 * 1024];
94115
GenericRow largeRow = GenericRow.of(BinaryString.fromString("key"), largePayload);
95116
largeRow.setRowKind(RowKind.INSERT);
@@ -99,16 +120,15 @@ void testReuseIsRecreatedAfterRelease() {
99120
smallRow.setRowKind(RowKind.INSERT);
100121
helper.copyInto(smallRow);
101122

102-
// Buffer is oversized + last record is small → release
103-
helper.resetIfTooLarge();
123+
helper.resetIfTooLarge(helper.reuseRow());
104124
assertThat(helper.reuseRow()).isNull();
105125

106126
// Write another small record — reuseRow should be recreated
107127
helper.copyInto(smallRow);
108128
assertThat(helper.reuseRow()).isNotNull();
109129

110-
// Small buffer should survive resetIfTooLarge()
111-
helper.resetIfTooLarge();
130+
// Small buffer should survive
131+
helper.resetIfTooLarge(helper.reuseRow());
112132
assertThat(helper.reuseRow()).isNotNull();
113133
}
114134
}

0 commit comments

Comments
 (0)