Skip to content

Commit 05c9f2d

Browse files
author
zhongheng.gy
committed
[common] Fix BinaryRowSerializer reuse buffer never shrinking
1 parent 1e1c9f4 commit 05c9f2d

2 files changed

Lines changed: 165 additions & 0 deletions

File tree

paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,13 @@ public BinaryRow deserialize(DataInputView source) throws IOException {
8080
return row;
8181
}
8282

83+
/**
84+
* Threshold above which we consider a reuse buffer "oversized" and eligible for shrinking. This
85+
* prevents accumulation of large byte arrays when a few large records inflate the reuse buffer
86+
* and subsequent small records never trigger reallocation.
87+
*/
88+
private static final int REUSE_SHRINK_THRESHOLD = 4 * 1024 * 1024; // 4MB
89+
8390
public BinaryRow deserialize(BinaryRow reuse, DataInputView source) throws IOException {
8491
MemorySegment[] segments = reuse.getSegments();
8592
checkArgument(
@@ -88,6 +95,13 @@ public BinaryRow deserialize(BinaryRow reuse, DataInputView source) throws IOExc
8895

8996
int length = source.readInt();
9097
if (segments == null || segments[0].size() < length) {
98+
// Need a larger buffer
99+
segments = new MemorySegment[] {MemorySegment.wrap(new byte[length])};
100+
} else if (segments[0].size() > REUSE_SHRINK_THRESHOLD && length < REUSE_SHRINK_THRESHOLD) {
101+
// Hysteresis: only shrink when the buffer is oversized AND the current record is
102+
// small. This avoids thrashing (release-and-rebuild on every record) when records
103+
// are consistently large (e.g. 5-10MB), while still reclaiming memory when the
104+
// workload transitions back to small records.
91105
segments = new MemorySegment[] {MemorySegment.wrap(new byte[length])};
92106
}
93107
source.readFully(segments[0].getArray(), 0, length);
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.data.serializer;
20+
21+
import org.apache.paimon.data.BinaryRow;
22+
import org.apache.paimon.data.BinaryRowWriter;
23+
import org.apache.paimon.data.BinaryString;
24+
import org.apache.paimon.io.DataInputDeserializer;
25+
import org.apache.paimon.io.DataOutputSerializer;
26+
27+
import org.junit.jupiter.api.Test;
28+
29+
import static org.assertj.core.api.Assertions.assertThat;
30+
31+
/**
32+
* Tests for {@link BinaryRowSerializer#deserialize(BinaryRow, org.apache.paimon.io.DataInputView)},
33+
* focusing on the REUSE_SHRINK_THRESHOLD behavior.
34+
*/
35+
class BinaryRowSerializerShrinkTest {
36+
37+
private static final int SHRINK_THRESHOLD = 4 * 1024 * 1024; // 4MB
38+
39+
@Test
40+
void testDeserializeShrinksOversizedReuseBuffer() throws Exception {
41+
BinaryRowSerializer serializer = new BinaryRowSerializer(1);
42+
43+
// Serialize a large record (> 4MB)
44+
BinaryRow largeRow = createRowWithPayload(5 * 1024 * 1024);
45+
byte[] largeBytes = serializeRow(serializer, largeRow);
46+
47+
// Deserialize into a fresh reuse row — buffer grows to hold the large record
48+
BinaryRow reuse = serializer.createInstance();
49+
DataInputDeserializer largeInput = new DataInputDeserializer(largeBytes);
50+
reuse = serializer.deserialize(reuse, largeInput);
51+
int largeBufferSize = reuse.getSegments()[0].size();
52+
assertThat(largeBufferSize).isGreaterThanOrEqualTo(5 * 1024 * 1024);
53+
54+
// Serialize a small record
55+
BinaryRow smallRow = createRowWithPayload(100);
56+
byte[] smallBytes = serializeRow(serializer, smallRow);
57+
58+
// Deserialize the small record into the same reuse row
59+
// The oversized buffer (> 4MB) should be shrunk to the exact size needed
60+
DataInputDeserializer smallInput = new DataInputDeserializer(smallBytes);
61+
reuse = serializer.deserialize(reuse, smallInput);
62+
int shrunkBufferSize = reuse.getSegments()[0].size();
63+
assertThat(shrunkBufferSize).isLessThan(SHRINK_THRESHOLD);
64+
}
65+
66+
@Test
67+
void testDeserializeKeepsSmallReuseBuffer() throws Exception {
68+
BinaryRowSerializer serializer = new BinaryRowSerializer(1);
69+
70+
// Serialize a small record (< 4MB)
71+
BinaryRow row1 = createRowWithPayload(1024);
72+
byte[] bytes1 = serializeRow(serializer, row1);
73+
74+
BinaryRow reuse = serializer.createInstance();
75+
DataInputDeserializer input1 = new DataInputDeserializer(bytes1);
76+
reuse = serializer.deserialize(reuse, input1);
77+
int bufferSize1 = reuse.getSegments()[0].size();
78+
79+
// Serialize an even smaller record
80+
BinaryRow row2 = createRowWithPayload(100);
81+
byte[] bytes2 = serializeRow(serializer, row2);
82+
83+
// Deserialize — buffer should be reused (not shrunk), since it's < 4MB
84+
DataInputDeserializer input2 = new DataInputDeserializer(bytes2);
85+
reuse = serializer.deserialize(reuse, input2);
86+
int bufferSize2 = reuse.getSegments()[0].size();
87+
assertThat(bufferSize2).isEqualTo(bufferSize1);
88+
}
89+
90+
@Test
91+
void testDeserializeRetainsBufferForConsecutiveLargeRecords() throws Exception {
92+
BinaryRowSerializer serializer = new BinaryRowSerializer(1);
93+
94+
// Serialize a large record (> 4MB) to inflate the buffer
95+
BinaryRow largeRow1 = createRowWithPayload(5 * 1024 * 1024);
96+
byte[] largeBytes1 = serializeRow(serializer, largeRow1);
97+
98+
BinaryRow reuse = serializer.createInstance();
99+
DataInputDeserializer input1 = new DataInputDeserializer(largeBytes1);
100+
reuse = serializer.deserialize(reuse, input1);
101+
int bufferAfterFirst = reuse.getSegments()[0].size();
102+
assertThat(bufferAfterFirst).isGreaterThanOrEqualTo(5 * 1024 * 1024);
103+
104+
// Deserialize another large record (also > 4MB)
105+
// Hysteresis: buffer should NOT be shrunk because the incoming record is also large
106+
BinaryRow largeRow2 = createRowWithPayload(SHRINK_THRESHOLD + 100);
107+
byte[] largeBytes2 = serializeRow(serializer, largeRow2);
108+
109+
DataInputDeserializer input2 = new DataInputDeserializer(largeBytes2);
110+
reuse = serializer.deserialize(reuse, input2);
111+
int bufferAfterSecond = reuse.getSegments()[0].size();
112+
assertThat(bufferAfterSecond).isEqualTo(bufferAfterFirst);
113+
}
114+
115+
@Test
116+
void testDeserializeGrowsBufferWhenNeeded() throws Exception {
117+
BinaryRowSerializer serializer = new BinaryRowSerializer(1);
118+
119+
// Start with a small record
120+
BinaryRow smallRow = createRowWithPayload(100);
121+
byte[] smallBytes = serializeRow(serializer, smallRow);
122+
123+
BinaryRow reuse = serializer.createInstance();
124+
DataInputDeserializer smallInput = new DataInputDeserializer(smallBytes);
125+
reuse = serializer.deserialize(reuse, smallInput);
126+
127+
// Deserialize a larger record — buffer should grow
128+
BinaryRow largerRow = createRowWithPayload(2048);
129+
byte[] largerBytes = serializeRow(serializer, largerRow);
130+
131+
DataInputDeserializer largerInput = new DataInputDeserializer(largerBytes);
132+
reuse = serializer.deserialize(reuse, largerInput);
133+
assertThat(reuse.getSegments()[0].size()).isGreaterThanOrEqualTo(2048);
134+
}
135+
136+
private static BinaryRow createRowWithPayload(int payloadSize) {
137+
BinaryRow row = new BinaryRow(1);
138+
BinaryRowWriter writer = new BinaryRowWriter(row, payloadSize + 32);
139+
byte[] payload = new byte[payloadSize];
140+
writer.writeString(0, BinaryString.fromBytes(payload));
141+
writer.complete();
142+
return row;
143+
}
144+
145+
private static byte[] serializeRow(BinaryRowSerializer serializer, BinaryRow row)
146+
throws Exception {
147+
DataOutputSerializer output = new DataOutputSerializer(row.getSizeInBytes() + 4);
148+
serializer.serialize(row, output);
149+
return output.getCopyOfBuffer();
150+
}
151+
}

0 commit comments

Comments
 (0)