From 3a5a85bae792ec764aab91289dd5ea4919b5431f Mon Sep 17 00:00:00 2001 From: "zhongheng.gy" Date: Mon, 8 Jun 2026 11:32:00 +0800 Subject: [PATCH 1/3] [common] fix BinaryRowSerializer reuse buffer never shrinking --- .../data/serializer/BinaryRowSerializer.java | 14 ++ .../BinaryRowSerializerShrinkTest.java | 151 ++++++++++++++++++ 2 files changed, 165 insertions(+) create mode 100644 paimon-common/src/test/java/org/apache/paimon/data/serializer/BinaryRowSerializerShrinkTest.java diff --git a/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java b/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java index 49dcee73ef27..6b5f70a0d2b7 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java @@ -80,6 +80,13 @@ public BinaryRow deserialize(DataInputView source) throws IOException { return row; } + /** + * Threshold above which we consider a reuse buffer "oversized" and eligible for shrinking. This + * prevents accumulation of large byte arrays when a few large records inflate the reuse buffer + * and subsequent small records never trigger reallocation. + */ + private static final int REUSE_SHRINK_THRESHOLD = 4 * 1024 * 1024; // 4MB + public BinaryRow deserialize(BinaryRow reuse, DataInputView source) throws IOException { MemorySegment[] segments = reuse.getSegments(); checkArgument( @@ -88,6 +95,13 @@ public BinaryRow deserialize(BinaryRow reuse, DataInputView source) throws IOExc int length = source.readInt(); if (segments == null || segments[0].size() < length) { + // Need a larger buffer + segments = new MemorySegment[] {MemorySegment.wrap(new byte[length])}; + } else if (segments[0].size() > REUSE_SHRINK_THRESHOLD && length < REUSE_SHRINK_THRESHOLD) { + // Hysteresis: only shrink when the buffer is oversized AND the current record is + // small. This avoids thrashing (release-and-rebuild on every record) when records + // are consistently large (e.g. 5-10MB), while still reclaiming memory when the + // workload transitions back to small records. segments = new MemorySegment[] {MemorySegment.wrap(new byte[length])}; } source.readFully(segments[0].getArray(), 0, length); diff --git a/paimon-common/src/test/java/org/apache/paimon/data/serializer/BinaryRowSerializerShrinkTest.java b/paimon-common/src/test/java/org/apache/paimon/data/serializer/BinaryRowSerializerShrinkTest.java new file mode 100644 index 000000000000..e2faf4774018 --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/data/serializer/BinaryRowSerializerShrinkTest.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.data.serializer; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.BinaryRowWriter; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.io.DataInputDeserializer; +import org.apache.paimon.io.DataOutputSerializer; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for {@link BinaryRowSerializer#deserialize(BinaryRow, org.apache.paimon.io.DataInputView)}, + * focusing on the REUSE_SHRINK_THRESHOLD behavior. + */ +class BinaryRowSerializerShrinkTest { + + private static final int SHRINK_THRESHOLD = 4 * 1024 * 1024; // 4MB + + @Test + void testDeserializeShrinksOversizedReuseBuffer() throws Exception { + BinaryRowSerializer serializer = new BinaryRowSerializer(1); + + // Serialize a large record (> 4MB) + BinaryRow largeRow = createRowWithPayload(5 * 1024 * 1024); + byte[] largeBytes = serializeRow(serializer, largeRow); + + // Deserialize into a fresh reuse row — buffer grows to hold the large record + BinaryRow reuse = serializer.createInstance(); + DataInputDeserializer largeInput = new DataInputDeserializer(largeBytes); + reuse = serializer.deserialize(reuse, largeInput); + int largeBufferSize = reuse.getSegments()[0].size(); + assertThat(largeBufferSize).isGreaterThanOrEqualTo(5 * 1024 * 1024); + + // Serialize a small record + BinaryRow smallRow = createRowWithPayload(100); + byte[] smallBytes = serializeRow(serializer, smallRow); + + // Deserialize the small record into the same reuse row + // The oversized buffer (> 4MB) should be shrunk to the exact size needed + DataInputDeserializer smallInput = new DataInputDeserializer(smallBytes); + reuse = serializer.deserialize(reuse, smallInput); + int shrunkBufferSize = reuse.getSegments()[0].size(); + assertThat(shrunkBufferSize).isLessThan(SHRINK_THRESHOLD); + } + + @Test + void testDeserializeKeepsSmallReuseBuffer() throws Exception { + BinaryRowSerializer serializer = new BinaryRowSerializer(1); + + // Serialize a small record (< 4MB) + BinaryRow row1 = createRowWithPayload(1024); + byte[] bytes1 = serializeRow(serializer, row1); + + BinaryRow reuse = serializer.createInstance(); + DataInputDeserializer input1 = new DataInputDeserializer(bytes1); + reuse = serializer.deserialize(reuse, input1); + int bufferSize1 = reuse.getSegments()[0].size(); + + // Serialize an even smaller record + BinaryRow row2 = createRowWithPayload(100); + byte[] bytes2 = serializeRow(serializer, row2); + + // Deserialize — buffer should be reused (not shrunk), since it's < 4MB + DataInputDeserializer input2 = new DataInputDeserializer(bytes2); + reuse = serializer.deserialize(reuse, input2); + int bufferSize2 = reuse.getSegments()[0].size(); + assertThat(bufferSize2).isEqualTo(bufferSize1); + } + + @Test + void testDeserializeRetainsBufferForConsecutiveLargeRecords() throws Exception { + BinaryRowSerializer serializer = new BinaryRowSerializer(1); + + // Serialize a large record (> 4MB) to inflate the buffer + BinaryRow largeRow1 = createRowWithPayload(5 * 1024 * 1024); + byte[] largeBytes1 = serializeRow(serializer, largeRow1); + + BinaryRow reuse = serializer.createInstance(); + DataInputDeserializer input1 = new DataInputDeserializer(largeBytes1); + reuse = serializer.deserialize(reuse, input1); + int bufferAfterFirst = reuse.getSegments()[0].size(); + assertThat(bufferAfterFirst).isGreaterThanOrEqualTo(5 * 1024 * 1024); + + // Deserialize another large record (also > 4MB) + // Hysteresis: buffer should NOT be shrunk because the incoming record is also large + BinaryRow largeRow2 = createRowWithPayload(SHRINK_THRESHOLD + 100); + byte[] largeBytes2 = serializeRow(serializer, largeRow2); + + DataInputDeserializer input2 = new DataInputDeserializer(largeBytes2); + reuse = serializer.deserialize(reuse, input2); + int bufferAfterSecond = reuse.getSegments()[0].size(); + assertThat(bufferAfterSecond).isEqualTo(bufferAfterFirst); + } + + @Test + void testDeserializeGrowsBufferWhenNeeded() throws Exception { + BinaryRowSerializer serializer = new BinaryRowSerializer(1); + + // Start with a small record + BinaryRow smallRow = createRowWithPayload(100); + byte[] smallBytes = serializeRow(serializer, smallRow); + + BinaryRow reuse = serializer.createInstance(); + DataInputDeserializer smallInput = new DataInputDeserializer(smallBytes); + reuse = serializer.deserialize(reuse, smallInput); + + // Deserialize a larger record — buffer should grow + BinaryRow largerRow = createRowWithPayload(2048); + byte[] largerBytes = serializeRow(serializer, largerRow); + + DataInputDeserializer largerInput = new DataInputDeserializer(largerBytes); + reuse = serializer.deserialize(reuse, largerInput); + assertThat(reuse.getSegments()[0].size()).isGreaterThanOrEqualTo(2048); + } + + private static BinaryRow createRowWithPayload(int payloadSize) { + BinaryRow row = new BinaryRow(1); + BinaryRowWriter writer = new BinaryRowWriter(row, payloadSize + 32); + byte[] payload = new byte[payloadSize]; + writer.writeString(0, BinaryString.fromBytes(payload)); + writer.complete(); + return row; + } + + private static byte[] serializeRow(BinaryRowSerializer serializer, BinaryRow row) + throws Exception { + DataOutputSerializer output = new DataOutputSerializer(row.getSizeInBytes() + 4); + serializer.serialize(row, output); + return output.getCopyOfBuffer(); + } +} From 075a825a8f3658d9977f24a1a65988354738cd3d Mon Sep 17 00:00:00 2001 From: "zhongheng.gy" Date: Wed, 10 Jun 2026 09:59:22 +0800 Subject: [PATCH 2/3] use combined cap and ratio hysteresis for buffer shrink decision --- .../data/serializer/BinaryRowSerializer.java | 21 +-- .../BinaryRowSerializerShrinkTest.java | 123 ++++++++++-------- 2 files changed, 82 insertions(+), 62 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java b/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java index 6b5f70a0d2b7..42947b576cec 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java @@ -81,11 +81,17 @@ public BinaryRow deserialize(DataInputView source) throws IOException { } /** - * Threshold above which we consider a reuse buffer "oversized" and eligible for shrinking. This - * prevents accumulation of large byte arrays when a few large records inflate the reuse buffer - * and subsequent small records never trigger reallocation. + * Maximum retained reuse buffer size in bytes. Buffers exceeding this cap are eligible for + * shrinking when the shrink ratio condition is also met. */ - private static final int REUSE_SHRINK_THRESHOLD = 4 * 1024 * 1024; // 4MB + private static final int MAX_RETAINED_REUSE_BUFFER_SIZE = 4 * 1024 * 1024; // 4MB + + /** + * Shrink ratio. The buffer is reallocated only when its size exceeds {@link + * #MAX_RETAINED_REUSE_BUFFER_SIZE} AND is more than {@code SHRINK_RATIO} times the current + * record length. + */ + private static final int SHRINK_RATIO = 4; public BinaryRow deserialize(BinaryRow reuse, DataInputView source) throws IOException { MemorySegment[] segments = reuse.getSegments(); @@ -97,11 +103,8 @@ public BinaryRow deserialize(BinaryRow reuse, DataInputView source) throws IOExc if (segments == null || segments[0].size() < length) { // Need a larger buffer segments = new MemorySegment[] {MemorySegment.wrap(new byte[length])}; - } else if (segments[0].size() > REUSE_SHRINK_THRESHOLD && length < REUSE_SHRINK_THRESHOLD) { - // Hysteresis: only shrink when the buffer is oversized AND the current record is - // small. This avoids thrashing (release-and-rebuild on every record) when records - // are consistently large (e.g. 5-10MB), while still reclaiming memory when the - // workload transitions back to small records. + } else if (segments[0].size() > MAX_RETAINED_REUSE_BUFFER_SIZE + && segments[0].size() > (long) length * SHRINK_RATIO) { segments = new MemorySegment[] {MemorySegment.wrap(new byte[length])}; } source.readFully(segments[0].getArray(), 0, length); diff --git a/paimon-common/src/test/java/org/apache/paimon/data/serializer/BinaryRowSerializerShrinkTest.java b/paimon-common/src/test/java/org/apache/paimon/data/serializer/BinaryRowSerializerShrinkTest.java index e2faf4774018..c1eac7e47a67 100644 --- a/paimon-common/src/test/java/org/apache/paimon/data/serializer/BinaryRowSerializerShrinkTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/data/serializer/BinaryRowSerializerShrinkTest.java @@ -30,109 +30,126 @@ /** * Tests for {@link BinaryRowSerializer#deserialize(BinaryRow, org.apache.paimon.io.DataInputView)}, - * focusing on the REUSE_SHRINK_THRESHOLD behavior. + * focusing on the combined cap + ratio shrink behavior. */ class BinaryRowSerializerShrinkTest { - private static final int SHRINK_THRESHOLD = 4 * 1024 * 1024; // 4MB + private static final int MAX_RETAINED = 4 * 1024 * 1024; // 4MB @Test - void testDeserializeShrinksOversizedReuseBuffer() throws Exception { + void testShrinksWhenSpikeFollowedBySmallRecord() throws Exception { BinaryRowSerializer serializer = new BinaryRowSerializer(1); - // Serialize a large record (> 4MB) + // Inflate buffer with a large record (> 4MB) BinaryRow largeRow = createRowWithPayload(5 * 1024 * 1024); byte[] largeBytes = serializeRow(serializer, largeRow); - // Deserialize into a fresh reuse row — buffer grows to hold the large record BinaryRow reuse = serializer.createInstance(); - DataInputDeserializer largeInput = new DataInputDeserializer(largeBytes); - reuse = serializer.deserialize(reuse, largeInput); + reuse = serializer.deserialize(reuse, new DataInputDeserializer(largeBytes)); int largeBufferSize = reuse.getSegments()[0].size(); assertThat(largeBufferSize).isGreaterThanOrEqualTo(5 * 1024 * 1024); - // Serialize a small record + // Deserialize a small record — buffer > 4MB and ratio huge > 4x → shrink BinaryRow smallRow = createRowWithPayload(100); byte[] smallBytes = serializeRow(serializer, smallRow); + reuse = serializer.deserialize(reuse, new DataInputDeserializer(smallBytes)); + assertThat(reuse.getSegments()[0].size()).isLessThan(MAX_RETAINED); + } + + @Test + void testShrinksWhenSpikeFollowedByMediumRecord() throws Exception { + BinaryRowSerializer serializer = new BinaryRowSerializer(1); + + // Inflate buffer with a very large record (100MB) + BinaryRow hugeRow = createRowWithPayload(100 * 1024 * 1024); + byte[] hugeBytes = serializeRow(serializer, hugeRow); - // Deserialize the small record into the same reuse row - // The oversized buffer (> 4MB) should be shrunk to the exact size needed - DataInputDeserializer smallInput = new DataInputDeserializer(smallBytes); - reuse = serializer.deserialize(reuse, smallInput); - int shrunkBufferSize = reuse.getSegments()[0].size(); - assertThat(shrunkBufferSize).isLessThan(SHRINK_THRESHOLD); + BinaryRow reuse = serializer.createInstance(); + reuse = serializer.deserialize(reuse, new DataInputDeserializer(hugeBytes)); + int hugeBufferSize = reuse.getSegments()[0].size(); + assertThat(hugeBufferSize).isGreaterThanOrEqualTo(100 * 1024 * 1024); + + // Deserialize a 5MB record — buffer ~100MB, ratio ~20x > 4x → shrink + BinaryRow mediumRow = createRowWithPayload(5 * 1024 * 1024); + byte[] mediumBytes = serializeRow(serializer, mediumRow); + reuse = serializer.deserialize(reuse, new DataInputDeserializer(mediumBytes)); + assertThat(reuse.getSegments()[0].size()).isLessThan(hugeBufferSize); } @Test - void testDeserializeKeepsSmallReuseBuffer() throws Exception { + void testRetainsWhenBufferProportionalToRecordSize() throws Exception { BinaryRowSerializer serializer = new BinaryRowSerializer(1); - // Serialize a small record (< 4MB) - BinaryRow row1 = createRowWithPayload(1024); + // Inflate buffer with a 5MB record + BinaryRow row1 = createRowWithPayload(5 * 1024 * 1024); byte[] bytes1 = serializeRow(serializer, row1); BinaryRow reuse = serializer.createInstance(); - DataInputDeserializer input1 = new DataInputDeserializer(bytes1); - reuse = serializer.deserialize(reuse, input1); - int bufferSize1 = reuse.getSegments()[0].size(); + reuse = serializer.deserialize(reuse, new DataInputDeserializer(bytes1)); + int bufferAfterFirst = reuse.getSegments()[0].size(); + assertThat(bufferAfterFirst).isGreaterThan(MAX_RETAINED); - // Serialize an even smaller record - BinaryRow row2 = createRowWithPayload(100); + // Deserialize another record just above threshold — ratio ~1.2x < 4x → retain + BinaryRow row2 = createRowWithPayload(MAX_RETAINED + 100); byte[] bytes2 = serializeRow(serializer, row2); - - // Deserialize — buffer should be reused (not shrunk), since it's < 4MB - DataInputDeserializer input2 = new DataInputDeserializer(bytes2); - reuse = serializer.deserialize(reuse, input2); - int bufferSize2 = reuse.getSegments()[0].size(); - assertThat(bufferSize2).isEqualTo(bufferSize1); + reuse = serializer.deserialize(reuse, new DataInputDeserializer(bytes2)); + assertThat(reuse.getSegments()[0].size()).isEqualTo(bufferAfterFirst); } @Test - void testDeserializeRetainsBufferForConsecutiveLargeRecords() throws Exception { + void testKeepsSmallBuffer() throws Exception { BinaryRowSerializer serializer = new BinaryRowSerializer(1); - // Serialize a large record (> 4MB) to inflate the buffer - BinaryRow largeRow1 = createRowWithPayload(5 * 1024 * 1024); - byte[] largeBytes1 = serializeRow(serializer, largeRow1); + BinaryRow row1 = createRowWithPayload(1024); + byte[] bytes1 = serializeRow(serializer, row1); BinaryRow reuse = serializer.createInstance(); - DataInputDeserializer input1 = new DataInputDeserializer(largeBytes1); - reuse = serializer.deserialize(reuse, input1); - int bufferAfterFirst = reuse.getSegments()[0].size(); - assertThat(bufferAfterFirst).isGreaterThanOrEqualTo(5 * 1024 * 1024); - - // Deserialize another large record (also > 4MB) - // Hysteresis: buffer should NOT be shrunk because the incoming record is also large - BinaryRow largeRow2 = createRowWithPayload(SHRINK_THRESHOLD + 100); - byte[] largeBytes2 = serializeRow(serializer, largeRow2); + reuse = serializer.deserialize(reuse, new DataInputDeserializer(bytes1)); + int bufferSize1 = reuse.getSegments()[0].size(); - DataInputDeserializer input2 = new DataInputDeserializer(largeBytes2); - reuse = serializer.deserialize(reuse, input2); - int bufferAfterSecond = reuse.getSegments()[0].size(); - assertThat(bufferAfterSecond).isEqualTo(bufferAfterFirst); + // Smaller record — buffer < 4MB, should reuse without shrinking + BinaryRow row2 = createRowWithPayload(100); + byte[] bytes2 = serializeRow(serializer, row2); + reuse = serializer.deserialize(reuse, new DataInputDeserializer(bytes2)); + assertThat(reuse.getSegments()[0].size()).isEqualTo(bufferSize1); } @Test - void testDeserializeGrowsBufferWhenNeeded() throws Exception { + void testGrowsBufferWhenNeeded() throws Exception { BinaryRowSerializer serializer = new BinaryRowSerializer(1); - // Start with a small record BinaryRow smallRow = createRowWithPayload(100); byte[] smallBytes = serializeRow(serializer, smallRow); BinaryRow reuse = serializer.createInstance(); - DataInputDeserializer smallInput = new DataInputDeserializer(smallBytes); - reuse = serializer.deserialize(reuse, smallInput); + reuse = serializer.deserialize(reuse, new DataInputDeserializer(smallBytes)); - // Deserialize a larger record — buffer should grow + // Larger record arrives — buffer must grow BinaryRow largerRow = createRowWithPayload(2048); byte[] largerBytes = serializeRow(serializer, largerRow); - - DataInputDeserializer largerInput = new DataInputDeserializer(largerBytes); - reuse = serializer.deserialize(reuse, largerInput); + reuse = serializer.deserialize(reuse, new DataInputDeserializer(largerBytes)); assertThat(reuse.getSegments()[0].size()).isGreaterThanOrEqualTo(2048); } + @Test + void testRetainsBufferForConsecutiveLargeRecords() throws Exception { + BinaryRowSerializer serializer = new BinaryRowSerializer(1); + + // Inflate buffer with 5MB record + BinaryRow largeRow1 = createRowWithPayload(5 * 1024 * 1024); + byte[] largeBytes1 = serializeRow(serializer, largeRow1); + + BinaryRow reuse = serializer.createInstance(); + reuse = serializer.deserialize(reuse, new DataInputDeserializer(largeBytes1)); + int bufferAfterFirst = reuse.getSegments()[0].size(); + + // Another 5MB record — ratio ~1x < 4x → retain + BinaryRow largeRow2 = createRowWithPayload(5 * 1024 * 1024); + byte[] largeBytes2 = serializeRow(serializer, largeRow2); + reuse = serializer.deserialize(reuse, new DataInputDeserializer(largeBytes2)); + assertThat(reuse.getSegments()[0].size()).isEqualTo(bufferAfterFirst); + } + private static BinaryRow createRowWithPayload(int payloadSize) { BinaryRow row = new BinaryRow(1); BinaryRowWriter writer = new BinaryRowWriter(row, payloadSize + 32); From c55d98b08e5000ab5ff068f10d4ee349e5fa0def Mon Sep 17 00:00:00 2001 From: "zhongheng.gy" Date: Thu, 11 Jun 2026 15:02:45 +0800 Subject: [PATCH 3/3] reuse shrink constants from RowHelper --- .../java/org/apache/paimon/data/RowHelper.java | 4 ++-- .../data/serializer/BinaryRowSerializer.java | 18 +++--------------- 2 files changed, 5 insertions(+), 17 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/data/RowHelper.java b/paimon-common/src/main/java/org/apache/paimon/data/RowHelper.java index f18e2d219f26..5f5224465e39 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/RowHelper.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/RowHelper.java @@ -40,7 +40,7 @@ public class RowHelper implements Serializable { * Maximum retained reuse buffer size in bytes. Buffers exceeding this cap are eligible for * release when the shrink ratio condition is also met. */ - private static final int MAX_RETAINED_REUSE_BUFFER_SIZE = 4 * 1024 * 1024; // 4MB + public static final int MAX_RETAINED_REUSE_BUFFER_SIZE = 4 * 1024 * 1024; // 4MB /** * Shrink ratio for hysteresis. The buffer is released only when its capacity exceeds {@link @@ -48,7 +48,7 @@ public class RowHelper implements Serializable { * row's size. This avoids thrashing for sustained medium-to-large records while still releasing * after a spike (e.g. 100MB buffer with 5MB rows → 20x > 4x → release). */ - private static final int SHRINK_RATIO = 4; + public static final int SHRINK_RATIO = 4; private final FieldGetter[] fieldGetters; private final ValueSetter[] valueSetters; diff --git a/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java b/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java index 42947b576cec..dbf37f520f4e 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java @@ -21,6 +21,7 @@ import org.apache.paimon.data.AbstractPagedInputView; import org.apache.paimon.data.AbstractPagedOutputView; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.RowHelper; import org.apache.paimon.io.DataInputView; import org.apache.paimon.io.DataOutputView; import org.apache.paimon.memory.MemorySegment; @@ -80,19 +81,6 @@ public BinaryRow deserialize(DataInputView source) throws IOException { return row; } - /** - * Maximum retained reuse buffer size in bytes. Buffers exceeding this cap are eligible for - * shrinking when the shrink ratio condition is also met. - */ - private static final int MAX_RETAINED_REUSE_BUFFER_SIZE = 4 * 1024 * 1024; // 4MB - - /** - * Shrink ratio. The buffer is reallocated only when its size exceeds {@link - * #MAX_RETAINED_REUSE_BUFFER_SIZE} AND is more than {@code SHRINK_RATIO} times the current - * record length. - */ - private static final int SHRINK_RATIO = 4; - public BinaryRow deserialize(BinaryRow reuse, DataInputView source) throws IOException { MemorySegment[] segments = reuse.getSegments(); checkArgument( @@ -103,8 +91,8 @@ public BinaryRow deserialize(BinaryRow reuse, DataInputView source) throws IOExc if (segments == null || segments[0].size() < length) { // Need a larger buffer segments = new MemorySegment[] {MemorySegment.wrap(new byte[length])}; - } else if (segments[0].size() > MAX_RETAINED_REUSE_BUFFER_SIZE - && segments[0].size() > (long) length * SHRINK_RATIO) { + } else if (segments[0].size() > RowHelper.MAX_RETAINED_REUSE_BUFFER_SIZE + && segments[0].size() > (long) length * RowHelper.SHRINK_RATIO) { segments = new MemorySegment[] {MemorySegment.wrap(new byte[length])}; } source.readFully(segments[0].getArray(), 0, length);