[common] Fix RowHelper internal buffer never shrinking for large records#8159
Conversation
JingsongLi
left a comment
There was a problem hiding this comment.
Thanks for tackling this. The direction makes sense to me, but I think the current cleanup is still a bit too tied to specific call paths and can miss some retained-buffer cases.
A few concerns:
-
RowHelper.resetIfTooLarge()checksreuseRow.getSizeInBytes(), butInternalRowSerializer.toBinaryRow()returns immediately forBinaryRowinputs. If a largeGenericRowinflates the helper and subsequent small rows are alreadyBinaryRow, the helper state is not refreshed, so the oversized buffer may remain retained. -
Nested row conversion has the same problem.
AbstractBinaryWriter.writeRow()calls the nestedInternalRowSerializer.toBinaryRow(input), which may inflate the nested serializer’sRowHelper, but the newfinallyblocks only reset the outer serializer’s helper. A large nestedROWfield can therefore still retain a large nested buffer. -
Some callers use
toBinaryRow()directly and then serialize manually, for example the initial-buffer path inDataPagedOutputSerializer.write(). Those paths do not go through the newserialize()/serializeToPages()finally blocks, so they can also skip cleanup.
Could we make the lifecycle more explicit inside InternalRowSerializer / RowHelper, so that cleanup is tied to the actual reusable BinaryRow borrowed for the current conversion rather than relying on each serialization entry point to remember a finally block? At minimum, I think this needs regression coverage for BinaryRow input after a large generic row, nested row fields, and a direct toBinaryRow() caller path.
|
To make the suggestion more concrete, the shape I had in mind is something like this: let the serializer keep cleanup next to the conversion result, and make @Override
public void serialize(InternalRow row, DataOutputView target) throws IOException {
BinaryRow binaryRow = toBinaryRow(row);
try {
binarySerializer.serialize(binaryRow, target);
} finally {
rowHelper.resetIfTooLarge(binaryRow);
}
}
@Override
public int serializeToPages(InternalRow row, AbstractPagedOutputView target) throws IOException {
BinaryRow binaryRow = toBinaryRow(row);
try {
return binarySerializer.serializeToPages(binaryRow, target);
} finally {
rowHelper.resetIfTooLarge(binaryRow);
}
}And in public void resetIfTooLarge(BinaryRow currentRow) {
if (currentRow == reuseRow
&& reuseWriter != null
&& reuseWriter.getSegments() != null
&& reuseWriter.getSegments().size() > REUSE_RELEASE_THRESHOLD
&& currentRow.getSizeInBytes() < REUSE_RELEASE_THRESHOLD) {
reuseRow = null;
reuseWriter = null;
}
}This does not solve every path by itself, but it avoids using stale BinaryRow row = serializer.toBinaryRow(input);
try {
writeSegmentsToVarLenPart(pos, row.getSegments(), row.getOffset(), row.getSizeInBytes());
} finally {
serializer.resetIfTooLarge(row);
}The exact API name can be different, but I think this makes the intended ownership clearer: only the serializer/helper that produced the reusable row is responsible for deciding whether to drop that reusable buffer after the current use. |
8f3be7d to
618fb54
Compare
|
@JingsongLi Thanks for the concrete suggestion! I've adopted the approach in the updated commit:
Both
Regarding nested row fields and direct |
|
Thanks for updating the cleanup ownership check. I think the current fixed 4MB threshold is acceptable as a first guardrail, especially because One possible improvement to the release strategy: instead of only releasing when the current row is smaller than 4MB, we can combine the fixed cap with a relative hysteresis check, for example: bufferCapacity > MAX_RETAINED_REUSE_BUFFER_SIZE
&& bufferCapacity > currentRow.getSizeInBytes() * SHRINK_RATIOThis keeps the good parts of the current approach:
But it also handles the case where a 100MB spike is followed by medium-size rows, e.g. 5MB rows. With the current I do not think this needs to become configurable in this PR. If we want to keep the patch small, I am also okay with merging the current version and treating the relative hysteresis as a follow-up refinement. |
|
@JingsongLi Thanks for the suggestion! Updated to use a combined fixed-cap and ratio check:
SHRINK_RATIO = 4 is well above the 1.5x grow factor ( Tests updated to cover the ratio-based logic. |
Purpose
Linked issue: #7620
RowHelper.reuseWritergrows its internalMemorySegmentfor large records (e.g. 100MB+), butBinaryRowWriter.reset()only resets the cursor without releasing the oversized segment. Additionally,InternalRowSerializer.serialize()can exit viaEOFException— a normal signal when the sort buffer is full (SimpleCollectingOutputView.nextSegment()throws it, caught byBinaryInMemorySortBuffer.write()) — skipping any cleanup of the bloated buffer.With many buckets (e.g. 256), each bucket's writer independently retains an inflated buffer: 256 × 100MB+ = tens of GB, causing OOM.
Changes
RowHelper: addresetIfTooLarge()with hysteresis — release internal buffer only when the segment exceeds 4MB and the last written record (reuseRow.getSizeInBytes()) is smaller than 4MBInternalRowSerializer: callresetIfTooLarge()infinallyblocks ofserialize()andserializeToPages()to handle theEOFExceptionexit pathWhy 4MB and why not configurable
The threshold is derived from the production scenario: 256 buckets × 4MB = 1GB baseline, leaving reasonable headroom in a typical 4–8GB TaskManager heap. Making it configurable would require plumbing config through the entire serializer chain —
RowHelperandInternalRowSerializerlive in paimon-common which is config-free by design (all 27 serializer classes are parameterized solely by schema types). The full chain fromCoreOptions→KeyValueFileStoreWrite→MergeTreeWriter→SortBufferWriteBuffer→BinaryInMemorySortBuffer→InternalRowSerializer→RowHelperwould need changes across modules. A fixed value with hysteresis is sufficient for the initial fix.Tests
RowHelperTest— 4 test cases covering:copyIntoresetIfTooLarge()API and Format
N/A
Documentation
N/A