diff --git a/paimon-common/src/main/java/org/apache/paimon/data/RowHelper.java b/paimon-common/src/main/java/org/apache/paimon/data/RowHelper.java
index a5df6cdf7875..f18e2d219f26 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/RowHelper.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/RowHelper.java
@@ -36,6 +36,20 @@ public class RowHelper implements Serializable {
private static final long serialVersionUID = 1L;
+ /**
+ * Maximum retained reuse buffer size in bytes. Buffers exceeding this cap are eligible for
+ * release when the shrink ratio condition is also met.
+ */
+ private static final int MAX_RETAINED_REUSE_BUFFER_SIZE = 4 * 1024 * 1024; // 4MB
+
+ /**
+ * Shrink ratio for hysteresis. The buffer is released only when its capacity exceeds {@link
+ * #MAX_RETAINED_REUSE_BUFFER_SIZE} AND is more than {@code SHRINK_RATIO} times the current
+ * row's size. This avoids thrashing for sustained medium-to-large records while still releasing
+ * after a spike (e.g. 100MB buffer with 5MB rows → 20x > 4x → release).
+ */
+ private static final int SHRINK_RATIO = 4;
+
private final FieldGetter[] fieldGetters;
private final ValueSetter[] valueSetters;
private final boolean[] writeNulls;
@@ -81,6 +95,32 @@ public void copyInto(InternalRow row) {
reuseWriter.complete();
}
+ /**
+ * Release the internal reuse buffer if the given row is the reuse row produced by this helper,
+ * the backing segment exceeds the maximum retained size, and the buffer is clearly oversized
+ * relative to the current record. The identity check ({@code currentRow == reuseRow}) ensures
+ * we only act when the caller actually used this helper's buffer.
+ *
+ *
The release condition combines a fixed cap with a relative ratio check:
+ *
+ *
+ * - bufferCapacity > {@link #MAX_RETAINED_REUSE_BUFFER_SIZE} — the buffer was inflated
+ * beyond the baseline
+ *
- bufferCapacity > currentRow.getSizeInBytes() * {@link #SHRINK_RATIO} — the buffer is
+ * significantly larger than the current workload needs
+ *
+ */
+ public void resetIfTooLarge(BinaryRow currentRow) {
+ if (currentRow == reuseRow && reuseWriter != null && reuseWriter.getSegments() != null) {
+ int bufferCapacity = reuseWriter.getSegments().size();
+ if (bufferCapacity > MAX_RETAINED_REUSE_BUFFER_SIZE
+ && bufferCapacity > (long) currentRow.getSizeInBytes() * SHRINK_RATIO) {
+ reuseRow = null;
+ reuseWriter = null;
+ }
+ }
+ }
+
public BinaryRow reuseRow() {
return reuseRow;
}
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
index c3abc2f4cee6..b77f1ae39fb0 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
@@ -59,7 +59,12 @@ public InternalRowSerializer duplicate() {
@Override
public void serialize(InternalRow row, DataOutputView target) throws IOException {
- binarySerializer.serialize(toBinaryRow(row), target);
+ BinaryRow binaryRow = toBinaryRow(row);
+ try {
+ binarySerializer.serialize(binaryRow, target);
+ } finally {
+ rowHelper.resetIfTooLarge(binaryRow);
+ }
}
@Override
@@ -132,7 +137,12 @@ public InternalRow createReuseInstance() {
@Override
public int serializeToPages(InternalRow row, AbstractPagedOutputView target)
throws IOException {
- return binarySerializer.serializeToPages(toBinaryRow(row), target);
+ BinaryRow binaryRow = toBinaryRow(row);
+ try {
+ return binarySerializer.serializeToPages(binaryRow, target);
+ } finally {
+ rowHelper.resetIfTooLarge(binaryRow);
+ }
}
@Override
diff --git a/paimon-common/src/test/java/org/apache/paimon/data/RowHelperTest.java b/paimon-common/src/test/java/org/apache/paimon/data/RowHelperTest.java
new file mode 100644
index 000000000000..2fa26251395f
--- /dev/null
+++ b/paimon-common/src/test/java/org/apache/paimon/data/RowHelperTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data;
+
+import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link RowHelper}, focusing on the resetIfTooLarge(BinaryRow) behavior. */
+class RowHelperTest {
+
+ @Test
+ void testReleasesWhenSpikeFollowedBySmallRecords() {
+ RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING(), DataTypes.BYTES()));
+
+ // Write a large record (> 4MB) to inflate the internal buffer
+ byte[] largePayload = new byte[5 * 1024 * 1024];
+ GenericRow largeRow = GenericRow.of(BinaryString.fromString("key"), largePayload);
+ largeRow.setRowKind(RowKind.INSERT);
+ helper.copyInto(largeRow);
+ BinaryRow reuseAfterLarge = helper.reuseRow();
+ assertThat(reuseAfterLarge).isNotNull();
+
+ // buffer ~8MB, row ~5MB → ratio ~1.6x < 4x → should NOT release
+ helper.resetIfTooLarge(reuseAfterLarge);
+ assertThat(helper.reuseRow()).isNotNull();
+
+ // Write a small record — buffer still oversized
+ GenericRow smallRow = GenericRow.of(BinaryString.fromString("s"), new byte[10]);
+ smallRow.setRowKind(RowKind.INSERT);
+ helper.copyInto(smallRow);
+
+ // buffer ~8MB, row ~50B → ratio huge > 4x, buffer > 4MB → release
+ helper.resetIfTooLarge(helper.reuseRow());
+ assertThat(helper.reuseRow()).isNull();
+ }
+
+ @Test
+ void testReleasesWhenSpikeFollowedByMediumRecords() {
+ RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING(), DataTypes.BYTES()));
+
+ // Write a very large record (100MB) to inflate the buffer significantly
+ byte[] hugePayload = new byte[100 * 1024 * 1024];
+ GenericRow hugeRow = GenericRow.of(BinaryString.fromString("key"), hugePayload);
+ hugeRow.setRowKind(RowKind.INSERT);
+ helper.copyInto(hugeRow);
+ assertThat(helper.reuseRow()).isNotNull();
+
+ // Write a medium record (5MB) — buffer is ~150MB (from grow), row is ~5MB
+ // ratio ~30x > 4x, buffer > 4MB → should release
+ byte[] mediumPayload = new byte[5 * 1024 * 1024];
+ GenericRow mediumRow = GenericRow.of(BinaryString.fromString("m"), mediumPayload);
+ mediumRow.setRowKind(RowKind.INSERT);
+ helper.copyInto(mediumRow);
+
+ helper.resetIfTooLarge(helper.reuseRow());
+ assertThat(helper.reuseRow()).isNull();
+ }
+
+ @Test
+ void testRetainsWhenBufferProportionalToRecordSize() {
+ RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING(), DataTypes.BYTES()));
+
+ // Write a 5MB record — buffer grows to ~8MB via grow() (1.5x strategy)
+ byte[] payload = new byte[5 * 1024 * 1024];
+ GenericRow row = GenericRow.of(BinaryString.fromString("key"), payload);
+ row.setRowKind(RowKind.INSERT);
+ helper.copyInto(row);
+
+ // buffer ~8MB, row ~5MB → ratio ~1.6x < 4x → should NOT release
+ // even though buffer > 4MB
+ helper.resetIfTooLarge(helper.reuseRow());
+ assertThat(helper.reuseRow()).isNotNull();
+ }
+
+ @Test
+ void testKeepsSmallBuffer() {
+ RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING(), DataTypes.INT()));
+
+ GenericRow smallRow = GenericRow.of(BinaryString.fromString("hello"), 42);
+ smallRow.setRowKind(RowKind.INSERT);
+ helper.copyInto(smallRow);
+ BinaryRow reuse = helper.reuseRow();
+ assertThat(reuse).isNotNull();
+
+ // Small buffer (< 4MB) — should NOT be released regardless of ratio
+ helper.resetIfTooLarge(reuse);
+ assertThat(helper.reuseRow()).isNotNull();
+ }
+
+ @Test
+ void testSkipsWhenCurrentRowIsNotReuseRow() {
+ RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING(), DataTypes.BYTES()));
+
+ // Write a large record to inflate the buffer
+ byte[] largePayload = new byte[5 * 1024 * 1024];
+ GenericRow largeRow = GenericRow.of(BinaryString.fromString("key"), largePayload);
+ largeRow.setRowKind(RowKind.INSERT);
+ helper.copyInto(largeRow);
+ assertThat(helper.reuseRow()).isNotNull();
+
+ // Pass a different BinaryRow — simulates toBinaryRow() returning input directly
+ BinaryRow externalRow = new BinaryRow(2);
+ externalRow.pointTo(MemorySegment.wrap(new byte[32]), 0, 32);
+
+ // Should NOT release because externalRow != reuseRow
+ helper.resetIfTooLarge(externalRow);
+ assertThat(helper.reuseRow()).isNotNull();
+ }
+
+ @Test
+ void testSafeToCallWithNullReuseRow() {
+ RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING()));
+ assertThat(helper.reuseRow()).isNull();
+
+ BinaryRow someRow = new BinaryRow(1);
+ someRow.pointTo(MemorySegment.wrap(new byte[16]), 0, 16);
+ helper.resetIfTooLarge(someRow);
+ assertThat(helper.reuseRow()).isNull();
+ }
+
+ @Test
+ void testReuseRecreatedAfterRelease() {
+ RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING(), DataTypes.BYTES()));
+
+ // Inflate buffer, then transition to small
+ byte[] largePayload = new byte[5 * 1024 * 1024];
+ GenericRow largeRow = GenericRow.of(BinaryString.fromString("key"), largePayload);
+ largeRow.setRowKind(RowKind.INSERT);
+ helper.copyInto(largeRow);
+
+ GenericRow smallRow = GenericRow.of(BinaryString.fromString("small"), new byte[10]);
+ smallRow.setRowKind(RowKind.INSERT);
+ helper.copyInto(smallRow);
+
+ helper.resetIfTooLarge(helper.reuseRow());
+ assertThat(helper.reuseRow()).isNull();
+
+ // Write another small record — reuseRow should be recreated
+ helper.copyInto(smallRow);
+ assertThat(helper.reuseRow()).isNotNull();
+
+ // Small buffer should survive
+ helper.resetIfTooLarge(helper.reuseRow());
+ assertThat(helper.reuseRow()).isNotNull();
+ }
+}