Skip to content

Commit badb0b8

Browse files
author
zhongheng.gy
committed
use combined cap and ratio hysteresis for buffer shrink decision
1 parent 631e441 commit badb0b8

2 files changed

Lines changed: 82 additions & 62 deletions

File tree

paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,17 @@ public BinaryRow deserialize(DataInputView source) throws IOException {
8181
}
8282

8383
/**
84-
* Threshold above which we consider a reuse buffer "oversized" and eligible for shrinking. This
85-
* prevents accumulation of large byte arrays when a few large records inflate the reuse buffer
86-
* and subsequent small records never trigger reallocation.
84+
* Maximum retained reuse buffer size in bytes. Buffers exceeding this cap are eligible for
85+
* shrinking when the shrink ratio condition is also met.
8786
*/
88-
private static final int REUSE_SHRINK_THRESHOLD = 4 * 1024 * 1024; // 4MB
87+
private static final int MAX_RETAINED_REUSE_BUFFER_SIZE = 4 * 1024 * 1024; // 4MB
88+
89+
/**
90+
* Shrink ratio. The buffer is reallocated only when its size exceeds {@link
91+
* #MAX_RETAINED_REUSE_BUFFER_SIZE} AND is more than {@code SHRINK_RATIO} times the current
92+
* record length.
93+
*/
94+
private static final int SHRINK_RATIO = 4;
8995

9096
public BinaryRow deserialize(BinaryRow reuse, DataInputView source) throws IOException {
9197
MemorySegment[] segments = reuse.getSegments();
@@ -97,11 +103,8 @@ public BinaryRow deserialize(BinaryRow reuse, DataInputView source) throws IOExc
97103
if (segments == null || segments[0].size() < length) {
98104
// Need a larger buffer
99105
segments = new MemorySegment[] {MemorySegment.wrap(new byte[length])};
100-
} else if (segments[0].size() > REUSE_SHRINK_THRESHOLD && length < REUSE_SHRINK_THRESHOLD) {
101-
// Hysteresis: only shrink when the buffer is oversized AND the current record is
102-
// small. This avoids thrashing (release-and-rebuild on every record) when records
103-
// are consistently large (e.g. 5-10MB), while still reclaiming memory when the
104-
// workload transitions back to small records.
106+
} else if (segments[0].size() > MAX_RETAINED_REUSE_BUFFER_SIZE
107+
&& segments[0].size() > (long) length * SHRINK_RATIO) {
105108
segments = new MemorySegment[] {MemorySegment.wrap(new byte[length])};
106109
}
107110
source.readFully(segments[0].getArray(), 0, length);

paimon-common/src/test/java/org/apache/paimon/data/serializer/BinaryRowSerializerShrinkTest.java

Lines changed: 70 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -30,109 +30,126 @@
3030

3131
/**
3232
* Tests for {@link BinaryRowSerializer#deserialize(BinaryRow, org.apache.paimon.io.DataInputView)},
33-
* focusing on the REUSE_SHRINK_THRESHOLD behavior.
33+
* focusing on the combined cap + ratio shrink behavior.
3434
*/
3535
class BinaryRowSerializerShrinkTest {
3636

37-
private static final int SHRINK_THRESHOLD = 4 * 1024 * 1024; // 4MB
37+
private static final int MAX_RETAINED = 4 * 1024 * 1024; // 4MB
3838

3939
@Test
40-
void testDeserializeShrinksOversizedReuseBuffer() throws Exception {
40+
void testShrinksWhenSpikeFollowedBySmallRecord() throws Exception {
4141
BinaryRowSerializer serializer = new BinaryRowSerializer(1);
4242

43-
// Serialize a large record (> 4MB)
43+
// Inflate buffer with a large record (> 4MB)
4444
BinaryRow largeRow = createRowWithPayload(5 * 1024 * 1024);
4545
byte[] largeBytes = serializeRow(serializer, largeRow);
4646

47-
// Deserialize into a fresh reuse row — buffer grows to hold the large record
4847
BinaryRow reuse = serializer.createInstance();
49-
DataInputDeserializer largeInput = new DataInputDeserializer(largeBytes);
50-
reuse = serializer.deserialize(reuse, largeInput);
48+
reuse = serializer.deserialize(reuse, new DataInputDeserializer(largeBytes));
5149
int largeBufferSize = reuse.getSegments()[0].size();
5250
assertThat(largeBufferSize).isGreaterThanOrEqualTo(5 * 1024 * 1024);
5351

54-
// Serialize a small record
52+
// Deserialize a small record — buffer > 4MB and ratio huge > 4x → shrink
5553
BinaryRow smallRow = createRowWithPayload(100);
5654
byte[] smallBytes = serializeRow(serializer, smallRow);
55+
reuse = serializer.deserialize(reuse, new DataInputDeserializer(smallBytes));
56+
assertThat(reuse.getSegments()[0].size()).isLessThan(MAX_RETAINED);
57+
}
58+
59+
@Test
60+
void testShrinksWhenSpikeFollowedByMediumRecord() throws Exception {
61+
BinaryRowSerializer serializer = new BinaryRowSerializer(1);
62+
63+
// Inflate buffer with a very large record (100MB)
64+
BinaryRow hugeRow = createRowWithPayload(100 * 1024 * 1024);
65+
byte[] hugeBytes = serializeRow(serializer, hugeRow);
5766

58-
// Deserialize the small record into the same reuse row
59-
// The oversized buffer (> 4MB) should be shrunk to the exact size needed
60-
DataInputDeserializer smallInput = new DataInputDeserializer(smallBytes);
61-
reuse = serializer.deserialize(reuse, smallInput);
62-
int shrunkBufferSize = reuse.getSegments()[0].size();
63-
assertThat(shrunkBufferSize).isLessThan(SHRINK_THRESHOLD);
67+
BinaryRow reuse = serializer.createInstance();
68+
reuse = serializer.deserialize(reuse, new DataInputDeserializer(hugeBytes));
69+
int hugeBufferSize = reuse.getSegments()[0].size();
70+
assertThat(hugeBufferSize).isGreaterThanOrEqualTo(100 * 1024 * 1024);
71+
72+
// Deserialize a 5MB record — buffer ~100MB, ratio ~20x > 4x → shrink
73+
BinaryRow mediumRow = createRowWithPayload(5 * 1024 * 1024);
74+
byte[] mediumBytes = serializeRow(serializer, mediumRow);
75+
reuse = serializer.deserialize(reuse, new DataInputDeserializer(mediumBytes));
76+
assertThat(reuse.getSegments()[0].size()).isLessThan(hugeBufferSize);
6477
}
6578

6679
@Test
67-
void testDeserializeKeepsSmallReuseBuffer() throws Exception {
80+
void testRetainsWhenBufferProportionalToRecordSize() throws Exception {
6881
BinaryRowSerializer serializer = new BinaryRowSerializer(1);
6982

70-
// Serialize a small record (< 4MB)
71-
BinaryRow row1 = createRowWithPayload(1024);
83+
// Inflate buffer with a 5MB record
84+
BinaryRow row1 = createRowWithPayload(5 * 1024 * 1024);
7285
byte[] bytes1 = serializeRow(serializer, row1);
7386

7487
BinaryRow reuse = serializer.createInstance();
75-
DataInputDeserializer input1 = new DataInputDeserializer(bytes1);
76-
reuse = serializer.deserialize(reuse, input1);
77-
int bufferSize1 = reuse.getSegments()[0].size();
88+
reuse = serializer.deserialize(reuse, new DataInputDeserializer(bytes1));
89+
int bufferAfterFirst = reuse.getSegments()[0].size();
90+
assertThat(bufferAfterFirst).isGreaterThan(MAX_RETAINED);
7891

79-
// Serialize an even smaller record
80-
BinaryRow row2 = createRowWithPayload(100);
92+
// Deserialize another record just above threshold — ratio ~1.2x < 4x → retain
93+
BinaryRow row2 = createRowWithPayload(MAX_RETAINED + 100);
8194
byte[] bytes2 = serializeRow(serializer, row2);
82-
83-
// Deserialize — buffer should be reused (not shrunk), since it's < 4MB
84-
DataInputDeserializer input2 = new DataInputDeserializer(bytes2);
85-
reuse = serializer.deserialize(reuse, input2);
86-
int bufferSize2 = reuse.getSegments()[0].size();
87-
assertThat(bufferSize2).isEqualTo(bufferSize1);
95+
reuse = serializer.deserialize(reuse, new DataInputDeserializer(bytes2));
96+
assertThat(reuse.getSegments()[0].size()).isEqualTo(bufferAfterFirst);
8897
}
8998

9099
@Test
91-
void testDeserializeRetainsBufferForConsecutiveLargeRecords() throws Exception {
100+
void testKeepsSmallBuffer() throws Exception {
92101
BinaryRowSerializer serializer = new BinaryRowSerializer(1);
93102

94-
// Serialize a large record (> 4MB) to inflate the buffer
95-
BinaryRow largeRow1 = createRowWithPayload(5 * 1024 * 1024);
96-
byte[] largeBytes1 = serializeRow(serializer, largeRow1);
103+
BinaryRow row1 = createRowWithPayload(1024);
104+
byte[] bytes1 = serializeRow(serializer, row1);
97105

98106
BinaryRow reuse = serializer.createInstance();
99-
DataInputDeserializer input1 = new DataInputDeserializer(largeBytes1);
100-
reuse = serializer.deserialize(reuse, input1);
101-
int bufferAfterFirst = reuse.getSegments()[0].size();
102-
assertThat(bufferAfterFirst).isGreaterThanOrEqualTo(5 * 1024 * 1024);
103-
104-
// Deserialize another large record (also > 4MB)
105-
// Hysteresis: buffer should NOT be shrunk because the incoming record is also large
106-
BinaryRow largeRow2 = createRowWithPayload(SHRINK_THRESHOLD + 100);
107-
byte[] largeBytes2 = serializeRow(serializer, largeRow2);
107+
reuse = serializer.deserialize(reuse, new DataInputDeserializer(bytes1));
108+
int bufferSize1 = reuse.getSegments()[0].size();
108109

109-
DataInputDeserializer input2 = new DataInputDeserializer(largeBytes2);
110-
reuse = serializer.deserialize(reuse, input2);
111-
int bufferAfterSecond = reuse.getSegments()[0].size();
112-
assertThat(bufferAfterSecond).isEqualTo(bufferAfterFirst);
110+
// Smaller record — buffer < 4MB, should reuse without shrinking
111+
BinaryRow row2 = createRowWithPayload(100);
112+
byte[] bytes2 = serializeRow(serializer, row2);
113+
reuse = serializer.deserialize(reuse, new DataInputDeserializer(bytes2));
114+
assertThat(reuse.getSegments()[0].size()).isEqualTo(bufferSize1);
113115
}
114116

115117
@Test
116-
void testDeserializeGrowsBufferWhenNeeded() throws Exception {
118+
void testGrowsBufferWhenNeeded() throws Exception {
117119
BinaryRowSerializer serializer = new BinaryRowSerializer(1);
118120

119-
// Start with a small record
120121
BinaryRow smallRow = createRowWithPayload(100);
121122
byte[] smallBytes = serializeRow(serializer, smallRow);
122123

123124
BinaryRow reuse = serializer.createInstance();
124-
DataInputDeserializer smallInput = new DataInputDeserializer(smallBytes);
125-
reuse = serializer.deserialize(reuse, smallInput);
125+
reuse = serializer.deserialize(reuse, new DataInputDeserializer(smallBytes));
126126

127-
// Deserialize a larger record — buffer should grow
127+
// Larger record arrives — buffer must grow
128128
BinaryRow largerRow = createRowWithPayload(2048);
129129
byte[] largerBytes = serializeRow(serializer, largerRow);
130-
131-
DataInputDeserializer largerInput = new DataInputDeserializer(largerBytes);
132-
reuse = serializer.deserialize(reuse, largerInput);
130+
reuse = serializer.deserialize(reuse, new DataInputDeserializer(largerBytes));
133131
assertThat(reuse.getSegments()[0].size()).isGreaterThanOrEqualTo(2048);
134132
}
135133

134+
@Test
135+
void testRetainsBufferForConsecutiveLargeRecords() throws Exception {
136+
BinaryRowSerializer serializer = new BinaryRowSerializer(1);
137+
138+
// Inflate buffer with 5MB record
139+
BinaryRow largeRow1 = createRowWithPayload(5 * 1024 * 1024);
140+
byte[] largeBytes1 = serializeRow(serializer, largeRow1);
141+
142+
BinaryRow reuse = serializer.createInstance();
143+
reuse = serializer.deserialize(reuse, new DataInputDeserializer(largeBytes1));
144+
int bufferAfterFirst = reuse.getSegments()[0].size();
145+
146+
// Another 5MB record — ratio ~1x < 4x → retain
147+
BinaryRow largeRow2 = createRowWithPayload(5 * 1024 * 1024);
148+
byte[] largeBytes2 = serializeRow(serializer, largeRow2);
149+
reuse = serializer.deserialize(reuse, new DataInputDeserializer(largeBytes2));
150+
assertThat(reuse.getSegments()[0].size()).isEqualTo(bufferAfterFirst);
151+
}
152+
136153
private static BinaryRow createRowWithPayload(int payloadSize) {
137154
BinaryRow row = new BinaryRow(1);
138155
BinaryRowWriter writer = new BinaryRowWriter(row, payloadSize + 32);

0 commit comments

Comments
 (0)