From 2be334f96ea12d816e0b56720abc94882c3e19e3 Mon Sep 17 00:00:00 2001 From: "zhongheng.gy" Date: Mon, 8 Jun 2026 11:31:49 +0800 Subject: [PATCH 1/3] [common] Fix RowHelper internal buffer never shrinking for large records --- .../org/apache/paimon/data/RowHelper.java | 23 ++++ .../serializer/InternalRowSerializer.java | 19 ++- .../org/apache/paimon/data/RowHelperTest.java | 114 ++++++++++++++++++ 3 files changed, 154 insertions(+), 2 deletions(-) create mode 100644 paimon-common/src/test/java/org/apache/paimon/data/RowHelperTest.java diff --git a/paimon-common/src/main/java/org/apache/paimon/data/RowHelper.java b/paimon-common/src/main/java/org/apache/paimon/data/RowHelper.java index a5df6cdf7875..45c5fd752853 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/RowHelper.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/RowHelper.java @@ -36,6 +36,14 @@ public class RowHelper implements Serializable { private static final long serialVersionUID = 1L; + /** + * Threshold in bytes for releasing the internal reuse buffer. When big records are written, the + * BinaryRowWriter's internal segment can grow very large via grow(). The {@link + * #resetIfTooLarge()} method checks this threshold and releases the bloated + * reuseRow/reuseWriter to avoid holding onto oversized buffers indefinitely. + */ + private static final int REUSE_RELEASE_THRESHOLD = 4 * 1024 * 1024; // 4MB + private final FieldGetter[] fieldGetters; private final ValueSetter[] valueSetters; private final boolean[] writeNulls; @@ -81,6 +89,21 @@ public void copyInto(InternalRow row) { reuseWriter.complete(); } + /** + * Release the internal reuse buffer if the segment exceeds the threshold AND the last written + * record is small. This hysteresis avoids thrashing when records are consistently large, while + * still reclaiming memory when the workload transitions back to small records. + */ + public void resetIfTooLarge() { + if (reuseWriter != null + && reuseWriter.getSegments() != null + && reuseWriter.getSegments().size() > REUSE_RELEASE_THRESHOLD + && reuseRow.getSizeInBytes() < REUSE_RELEASE_THRESHOLD) { + reuseRow = null; + reuseWriter = null; + } + } + public BinaryRow reuseRow() { return reuseRow; } diff --git a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java index c3abc2f4cee6..744e76e631b3 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java @@ -59,7 +59,16 @@ public InternalRowSerializer duplicate() { @Override public void serialize(InternalRow row, DataOutputView target) throws IOException { - binarySerializer.serialize(toBinaryRow(row), target); + try { + binarySerializer.serialize(toBinaryRow(row), target); + } finally { + // Must use finally here: toBinaryRow() may inflate RowHelper's internal buffer + // for large records (e.g. 100MB+). The serialization can exit via EOFException + // thrown by SimpleCollectingOutputView.nextSegment() when the sort buffer is + // full, which is caught by BinaryInMemorySortBuffer.write() as a normal signal. + // Without finally, the bloated buffer would never be released on that path. + rowHelper.resetIfTooLarge(); + } } @Override @@ -132,7 +141,13 @@ public InternalRow createReuseInstance() { @Override public int serializeToPages(InternalRow row, AbstractPagedOutputView target) throws IOException { - return binarySerializer.serializeToPages(toBinaryRow(row), target); + try { + return binarySerializer.serializeToPages(toBinaryRow(row), target); + } finally { + // Same as serialize(): must use finally because EOFException may bypass normal + // return when the sort buffer is full. + rowHelper.resetIfTooLarge(); + } } @Override diff --git a/paimon-common/src/test/java/org/apache/paimon/data/RowHelperTest.java b/paimon-common/src/test/java/org/apache/paimon/data/RowHelperTest.java new file mode 100644 index 000000000000..bde249ace5ab --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/data/RowHelperTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.data; + +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowKind; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link RowHelper}, focusing on the resetIfTooLarge() behavior. */ +class RowHelperTest { + + @Test + void testResetIfTooLargeReleasesAfterTransitionToSmallRecord() { + RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING(), DataTypes.BYTES())); + + // Write a large record (> 4MB) to inflate the internal buffer + byte[] largePayload = new byte[5 * 1024 * 1024]; // 5MB + Arrays.fill(largePayload, (byte) 'x'); + GenericRow largeRow = GenericRow.of(BinaryString.fromString("key"), largePayload); + largeRow.setRowKind(RowKind.INSERT); + helper.copyInto(largeRow); + + assertThat(helper.reuseRow()).isNotNull(); + + // Hysteresis: resetIfTooLarge() should NOT release when last record is large + helper.resetIfTooLarge(); + assertThat(helper.reuseRow()).isNotNull(); + + // Now write a small record — buffer is still oversized from the large record + GenericRow smallRow = GenericRow.of(BinaryString.fromString("s"), new byte[10]); + smallRow.setRowKind(RowKind.INSERT); + helper.copyInto(smallRow); + + // resetIfTooLarge() should release now: buffer > 4MB but last record < 4MB + helper.resetIfTooLarge(); + assertThat(helper.reuseRow()).isNull(); + } + + @Test + void testResetIfTooLargeKeepsSmallBuffer() { + RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING(), DataTypes.INT())); + + // Write a small record (< 4MB) + GenericRow smallRow = GenericRow.of(BinaryString.fromString("hello"), 42); + smallRow.setRowKind(RowKind.INSERT); + helper.copyInto(smallRow); + + assertThat(helper.reuseRow()).isNotNull(); + + // resetIfTooLarge() should NOT release the small buffer + helper.resetIfTooLarge(); + assertThat(helper.reuseRow()).isNotNull(); + } + + @Test + void testResetIfTooLargeBeforeCopyInto() { + RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING())); + + // reuseRow is null before any copyInto + assertThat(helper.reuseRow()).isNull(); + + // resetIfTooLarge() should be safe to call when reuseRow is null + helper.resetIfTooLarge(); + assertThat(helper.reuseRow()).isNull(); + } + + @Test + void testReuseIsRecreatedAfterRelease() { + RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING(), DataTypes.BYTES())); + + // Write a large record to inflate the buffer, then a small record to trigger release + byte[] largePayload = new byte[5 * 1024 * 1024]; + GenericRow largeRow = GenericRow.of(BinaryString.fromString("key"), largePayload); + largeRow.setRowKind(RowKind.INSERT); + helper.copyInto(largeRow); + + GenericRow smallRow = GenericRow.of(BinaryString.fromString("small"), new byte[10]); + smallRow.setRowKind(RowKind.INSERT); + helper.copyInto(smallRow); + + // Buffer is oversized + last record is small → release + helper.resetIfTooLarge(); + assertThat(helper.reuseRow()).isNull(); + + // Write another small record — reuseRow should be recreated + helper.copyInto(smallRow); + assertThat(helper.reuseRow()).isNotNull(); + + // Small buffer should survive resetIfTooLarge() + helper.resetIfTooLarge(); + assertThat(helper.reuseRow()).isNotNull(); + } +} From 618fb5476a8b4eae4c2bdee9eeaef2a91dd79115 Mon Sep 17 00:00:00 2001 From: "zhongheng.gy" Date: Mon, 8 Jun 2026 14:09:22 +0800 Subject: [PATCH 2/3] pass BinaryRow to resetIfTooLarge for ownership check --- .../org/apache/paimon/data/RowHelper.java | 21 ++++-- .../serializer/InternalRowSerializer.java | 17 ++--- .../org/apache/paimon/data/RowHelperTest.java | 72 ++++++++++++------- 3 files changed, 66 insertions(+), 44 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/data/RowHelper.java b/paimon-common/src/main/java/org/apache/paimon/data/RowHelper.java index 45c5fd752853..65003a385319 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/RowHelper.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/RowHelper.java @@ -39,7 +39,7 @@ public class RowHelper implements Serializable { /** * Threshold in bytes for releasing the internal reuse buffer. When big records are written, the * BinaryRowWriter's internal segment can grow very large via grow(). The {@link - * #resetIfTooLarge()} method checks this threshold and releases the bloated + * #resetIfTooLarge(BinaryRow)} method checks this threshold and releases the bloated * reuseRow/reuseWriter to avoid holding onto oversized buffers indefinitely. */ private static final int REUSE_RELEASE_THRESHOLD = 4 * 1024 * 1024; // 4MB @@ -90,15 +90,22 @@ public void copyInto(InternalRow row) { } /** - * Release the internal reuse buffer if the segment exceeds the threshold AND the last written - * record is small. This hysteresis avoids thrashing when records are consistently large, while - * still reclaiming memory when the workload transitions back to small records. + * Release the internal reuse buffer if the given row is the reuse row produced by this helper, + * the backing segment exceeds the threshold, and the current record is small. The identity + * check ({@code currentRow == reuseRow}) ensures we only act when the caller actually used this + * helper's buffer — if the input was already a {@link BinaryRow}, {@code toBinaryRow()} returns + * it directly and the helper state is stale, so we must skip cleanup. + * + *

The hysteresis ({@code currentRow.getSizeInBytes() < threshold}) avoids thrashing when + * records are consistently large, while still reclaiming memory when the workload transitions + * back to small records. */ - public void resetIfTooLarge() { - if (reuseWriter != null + public void resetIfTooLarge(BinaryRow currentRow) { + if (currentRow == reuseRow + && reuseWriter != null && reuseWriter.getSegments() != null && reuseWriter.getSegments().size() > REUSE_RELEASE_THRESHOLD - && reuseRow.getSizeInBytes() < REUSE_RELEASE_THRESHOLD) { + && currentRow.getSizeInBytes() < REUSE_RELEASE_THRESHOLD) { reuseRow = null; reuseWriter = null; } diff --git a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java index 744e76e631b3..b77f1ae39fb0 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java @@ -59,15 +59,11 @@ public InternalRowSerializer duplicate() { @Override public void serialize(InternalRow row, DataOutputView target) throws IOException { + BinaryRow binaryRow = toBinaryRow(row); try { - binarySerializer.serialize(toBinaryRow(row), target); + binarySerializer.serialize(binaryRow, target); } finally { - // Must use finally here: toBinaryRow() may inflate RowHelper's internal buffer - // for large records (e.g. 100MB+). The serialization can exit via EOFException - // thrown by SimpleCollectingOutputView.nextSegment() when the sort buffer is - // full, which is caught by BinaryInMemorySortBuffer.write() as a normal signal. - // Without finally, the bloated buffer would never be released on that path. - rowHelper.resetIfTooLarge(); + rowHelper.resetIfTooLarge(binaryRow); } } @@ -141,12 +137,11 @@ public InternalRow createReuseInstance() { @Override public int serializeToPages(InternalRow row, AbstractPagedOutputView target) throws IOException { + BinaryRow binaryRow = toBinaryRow(row); try { - return binarySerializer.serializeToPages(toBinaryRow(row), target); + return binarySerializer.serializeToPages(binaryRow, target); } finally { - // Same as serialize(): must use finally because EOFException may bypass normal - // return when the sort buffer is full. - rowHelper.resetIfTooLarge(); + rowHelper.resetIfTooLarge(binaryRow); } } diff --git a/paimon-common/src/test/java/org/apache/paimon/data/RowHelperTest.java b/paimon-common/src/test/java/org/apache/paimon/data/RowHelperTest.java index bde249ace5ab..2ea2bda511e0 100644 --- a/paimon-common/src/test/java/org/apache/paimon/data/RowHelperTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/data/RowHelperTest.java @@ -18,6 +18,7 @@ package org.apache.paimon.data; +import org.apache.paimon.memory.MemorySegment; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowKind; @@ -27,69 +28,89 @@ import static org.assertj.core.api.Assertions.assertThat; -/** Tests for {@link RowHelper}, focusing on the resetIfTooLarge() behavior. */ +/** Tests for {@link RowHelper}, focusing on the resetIfTooLarge(BinaryRow) behavior. */ class RowHelperTest { @Test - void testResetIfTooLargeReleasesAfterTransitionToSmallRecord() { + void testReleasesWhenTransitionFromLargeToSmallRecord() { RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING(), DataTypes.BYTES())); // Write a large record (> 4MB) to inflate the internal buffer - byte[] largePayload = new byte[5 * 1024 * 1024]; // 5MB - Arrays.fill(largePayload, (byte) 'x'); + byte[] largePayload = new byte[5 * 1024 * 1024]; GenericRow largeRow = GenericRow.of(BinaryString.fromString("key"), largePayload); largeRow.setRowKind(RowKind.INSERT); helper.copyInto(largeRow); + BinaryRow reuseAfterLarge = helper.reuseRow(); + assertThat(reuseAfterLarge).isNotNull(); + // Hysteresis: should NOT release when current record is large + helper.resetIfTooLarge(reuseAfterLarge); assertThat(helper.reuseRow()).isNotNull(); - // Hysteresis: resetIfTooLarge() should NOT release when last record is large - helper.resetIfTooLarge(); - assertThat(helper.reuseRow()).isNotNull(); - - // Now write a small record — buffer is still oversized from the large record + // Write a small record — buffer is still oversized from the large record GenericRow smallRow = GenericRow.of(BinaryString.fromString("s"), new byte[10]); smallRow.setRowKind(RowKind.INSERT); helper.copyInto(smallRow); + BinaryRow reuseAfterSmall = helper.reuseRow(); - // resetIfTooLarge() should release now: buffer > 4MB but last record < 4MB - helper.resetIfTooLarge(); + // Should release now: buffer > 4MB but current record < 4MB + helper.resetIfTooLarge(reuseAfterSmall); assertThat(helper.reuseRow()).isNull(); } @Test - void testResetIfTooLargeKeepsSmallBuffer() { + void testKeepsSmallBuffer() { RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING(), DataTypes.INT())); - // Write a small record (< 4MB) GenericRow smallRow = GenericRow.of(BinaryString.fromString("hello"), 42); smallRow.setRowKind(RowKind.INSERT); helper.copyInto(smallRow); + BinaryRow reuse = helper.reuseRow(); + assertThat(reuse).isNotNull(); + // Small buffer should NOT be released + helper.resetIfTooLarge(reuse); assertThat(helper.reuseRow()).isNotNull(); + } - // resetIfTooLarge() should NOT release the small buffer - helper.resetIfTooLarge(); + @Test + void testSkipsWhenCurrentRowIsNotReuseRow() { + RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING(), DataTypes.BYTES())); + + // Write a large record to inflate the buffer + byte[] largePayload = new byte[5 * 1024 * 1024]; + GenericRow largeRow = GenericRow.of(BinaryString.fromString("key"), largePayload); + largeRow.setRowKind(RowKind.INSERT); + helper.copyInto(largeRow); + assertThat(helper.reuseRow()).isNotNull(); + + // Simulate the BinaryRow input path: toBinaryRow() returns the input directly, + // not the helper's reuseRow. Pass a different BinaryRow instance. + BinaryRow externalRow = new BinaryRow(2); + externalRow.pointTo(MemorySegment.wrap(new byte[32]), 0, 32); + + // Should NOT release because externalRow != reuseRow + helper.resetIfTooLarge(externalRow); assertThat(helper.reuseRow()).isNotNull(); } @Test - void testResetIfTooLargeBeforeCopyInto() { + void testSafeToCallWithNullReuseRow() { RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING())); - - // reuseRow is null before any copyInto assertThat(helper.reuseRow()).isNull(); - // resetIfTooLarge() should be safe to call when reuseRow is null - helper.resetIfTooLarge(); + // Should be safe — reuseRow is null, no NPE + BinaryRow someRow = new BinaryRow(1); + someRow.pointTo(MemorySegment.wrap(new byte[16]), 0, 16); + helper.resetIfTooLarge(someRow); assertThat(helper.reuseRow()).isNull(); } @Test - void testReuseIsRecreatedAfterRelease() { + void testReuseRecreatedAfterRelease() { RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING(), DataTypes.BYTES())); - // Write a large record to inflate the buffer, then a small record to trigger release + // Inflate buffer, then transition to small byte[] largePayload = new byte[5 * 1024 * 1024]; GenericRow largeRow = GenericRow.of(BinaryString.fromString("key"), largePayload); largeRow.setRowKind(RowKind.INSERT); @@ -99,16 +120,15 @@ void testReuseIsRecreatedAfterRelease() { smallRow.setRowKind(RowKind.INSERT); helper.copyInto(smallRow); - // Buffer is oversized + last record is small → release - helper.resetIfTooLarge(); + helper.resetIfTooLarge(helper.reuseRow()); assertThat(helper.reuseRow()).isNull(); // Write another small record — reuseRow should be recreated helper.copyInto(smallRow); assertThat(helper.reuseRow()).isNotNull(); - // Small buffer should survive resetIfTooLarge() - helper.resetIfTooLarge(); + // Small buffer should survive + helper.resetIfTooLarge(helper.reuseRow()); assertThat(helper.reuseRow()).isNotNull(); } } From 8c0f466a8d0bed427cc747af67d235fdaad3bfe5 Mon Sep 17 00:00:00 2001 From: "zhongheng.gy" Date: Wed, 10 Jun 2026 09:37:39 +0800 Subject: [PATCH 3/3] use combined cap and ratio hysteresis for buffer shrink decision --- .../org/apache/paimon/data/RowHelper.java | 48 +++++++++------- .../org/apache/paimon/data/RowHelperTest.java | 55 +++++++++++++++---- 2 files changed, 74 insertions(+), 29 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/data/RowHelper.java b/paimon-common/src/main/java/org/apache/paimon/data/RowHelper.java index 65003a385319..f18e2d219f26 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/RowHelper.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/RowHelper.java @@ -37,12 +37,18 @@ public class RowHelper implements Serializable { private static final long serialVersionUID = 1L; /** - * Threshold in bytes for releasing the internal reuse buffer. When big records are written, the - * BinaryRowWriter's internal segment can grow very large via grow(). The {@link - * #resetIfTooLarge(BinaryRow)} method checks this threshold and releases the bloated - * reuseRow/reuseWriter to avoid holding onto oversized buffers indefinitely. + * Maximum retained reuse buffer size in bytes. Buffers exceeding this cap are eligible for + * release when the shrink ratio condition is also met. */ - private static final int REUSE_RELEASE_THRESHOLD = 4 * 1024 * 1024; // 4MB + private static final int MAX_RETAINED_REUSE_BUFFER_SIZE = 4 * 1024 * 1024; // 4MB + + /** + * Shrink ratio for hysteresis. The buffer is released only when its capacity exceeds {@link + * #MAX_RETAINED_REUSE_BUFFER_SIZE} AND is more than {@code SHRINK_RATIO} times the current + * row's size. This avoids thrashing for sustained medium-to-large records while still releasing + * after a spike (e.g. 100MB buffer with 5MB rows → 20x > 4x → release). + */ + private static final int SHRINK_RATIO = 4; private final FieldGetter[] fieldGetters; private final ValueSetter[] valueSetters; @@ -91,23 +97,27 @@ public void copyInto(InternalRow row) { /** * Release the internal reuse buffer if the given row is the reuse row produced by this helper, - * the backing segment exceeds the threshold, and the current record is small. The identity - * check ({@code currentRow == reuseRow}) ensures we only act when the caller actually used this - * helper's buffer — if the input was already a {@link BinaryRow}, {@code toBinaryRow()} returns - * it directly and the helper state is stale, so we must skip cleanup. + * the backing segment exceeds the maximum retained size, and the buffer is clearly oversized + * relative to the current record. The identity check ({@code currentRow == reuseRow}) ensures + * we only act when the caller actually used this helper's buffer. * - *

The hysteresis ({@code currentRow.getSizeInBytes() < threshold}) avoids thrashing when - * records are consistently large, while still reclaiming memory when the workload transitions - * back to small records. + *

The release condition combines a fixed cap with a relative ratio check: + * + *

*/ public void resetIfTooLarge(BinaryRow currentRow) { - if (currentRow == reuseRow - && reuseWriter != null - && reuseWriter.getSegments() != null - && reuseWriter.getSegments().size() > REUSE_RELEASE_THRESHOLD - && currentRow.getSizeInBytes() < REUSE_RELEASE_THRESHOLD) { - reuseRow = null; - reuseWriter = null; + if (currentRow == reuseRow && reuseWriter != null && reuseWriter.getSegments() != null) { + int bufferCapacity = reuseWriter.getSegments().size(); + if (bufferCapacity > MAX_RETAINED_REUSE_BUFFER_SIZE + && bufferCapacity > (long) currentRow.getSizeInBytes() * SHRINK_RATIO) { + reuseRow = null; + reuseWriter = null; + } } } diff --git a/paimon-common/src/test/java/org/apache/paimon/data/RowHelperTest.java b/paimon-common/src/test/java/org/apache/paimon/data/RowHelperTest.java index 2ea2bda511e0..2fa26251395f 100644 --- a/paimon-common/src/test/java/org/apache/paimon/data/RowHelperTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/data/RowHelperTest.java @@ -32,7 +32,7 @@ class RowHelperTest { @Test - void testReleasesWhenTransitionFromLargeToSmallRecord() { + void testReleasesWhenSpikeFollowedBySmallRecords() { RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING(), DataTypes.BYTES())); // Write a large record (> 4MB) to inflate the internal buffer @@ -43,21 +43,58 @@ void testReleasesWhenTransitionFromLargeToSmallRecord() { BinaryRow reuseAfterLarge = helper.reuseRow(); assertThat(reuseAfterLarge).isNotNull(); - // Hysteresis: should NOT release when current record is large + // buffer ~8MB, row ~5MB → ratio ~1.6x < 4x → should NOT release helper.resetIfTooLarge(reuseAfterLarge); assertThat(helper.reuseRow()).isNotNull(); - // Write a small record — buffer is still oversized from the large record + // Write a small record — buffer still oversized GenericRow smallRow = GenericRow.of(BinaryString.fromString("s"), new byte[10]); smallRow.setRowKind(RowKind.INSERT); helper.copyInto(smallRow); - BinaryRow reuseAfterSmall = helper.reuseRow(); - // Should release now: buffer > 4MB but current record < 4MB - helper.resetIfTooLarge(reuseAfterSmall); + // buffer ~8MB, row ~50B → ratio huge > 4x, buffer > 4MB → release + helper.resetIfTooLarge(helper.reuseRow()); assertThat(helper.reuseRow()).isNull(); } + @Test + void testReleasesWhenSpikeFollowedByMediumRecords() { + RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING(), DataTypes.BYTES())); + + // Write a very large record (100MB) to inflate the buffer significantly + byte[] hugePayload = new byte[100 * 1024 * 1024]; + GenericRow hugeRow = GenericRow.of(BinaryString.fromString("key"), hugePayload); + hugeRow.setRowKind(RowKind.INSERT); + helper.copyInto(hugeRow); + assertThat(helper.reuseRow()).isNotNull(); + + // Write a medium record (5MB) — buffer is ~150MB (from grow), row is ~5MB + // ratio ~30x > 4x, buffer > 4MB → should release + byte[] mediumPayload = new byte[5 * 1024 * 1024]; + GenericRow mediumRow = GenericRow.of(BinaryString.fromString("m"), mediumPayload); + mediumRow.setRowKind(RowKind.INSERT); + helper.copyInto(mediumRow); + + helper.resetIfTooLarge(helper.reuseRow()); + assertThat(helper.reuseRow()).isNull(); + } + + @Test + void testRetainsWhenBufferProportionalToRecordSize() { + RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING(), DataTypes.BYTES())); + + // Write a 5MB record — buffer grows to ~8MB via grow() (1.5x strategy) + byte[] payload = new byte[5 * 1024 * 1024]; + GenericRow row = GenericRow.of(BinaryString.fromString("key"), payload); + row.setRowKind(RowKind.INSERT); + helper.copyInto(row); + + // buffer ~8MB, row ~5MB → ratio ~1.6x < 4x → should NOT release + // even though buffer > 4MB + helper.resetIfTooLarge(helper.reuseRow()); + assertThat(helper.reuseRow()).isNotNull(); + } + @Test void testKeepsSmallBuffer() { RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING(), DataTypes.INT())); @@ -68,7 +105,7 @@ void testKeepsSmallBuffer() { BinaryRow reuse = helper.reuseRow(); assertThat(reuse).isNotNull(); - // Small buffer should NOT be released + // Small buffer (< 4MB) — should NOT be released regardless of ratio helper.resetIfTooLarge(reuse); assertThat(helper.reuseRow()).isNotNull(); } @@ -84,8 +121,7 @@ void testSkipsWhenCurrentRowIsNotReuseRow() { helper.copyInto(largeRow); assertThat(helper.reuseRow()).isNotNull(); - // Simulate the BinaryRow input path: toBinaryRow() returns the input directly, - // not the helper's reuseRow. Pass a different BinaryRow instance. + // Pass a different BinaryRow — simulates toBinaryRow() returning input directly BinaryRow externalRow = new BinaryRow(2); externalRow.pointTo(MemorySegment.wrap(new byte[32]), 0, 32); @@ -99,7 +135,6 @@ void testSafeToCallWithNullReuseRow() { RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING())); assertThat(helper.reuseRow()).isNull(); - // Should be safe — reuseRow is null, no NPE BinaryRow someRow = new BinaryRow(1); someRow.pointTo(MemorySegment.wrap(new byte[16]), 0, 16); helper.resetIfTooLarge(someRow);