Skip to content

[common] Fix RowHelper internal buffer never shrinking for large records#8159

Merged
JingsongLi merged 3 commits into
apache:masterfrom
yugan95:fix/rowhelper-buffer-release
Jun 11, 2026
Merged

[common] Fix RowHelper internal buffer never shrinking for large records#8159
JingsongLi merged 3 commits into
apache:masterfrom
yugan95:fix/rowhelper-buffer-release

Conversation

@yugan95

@yugan95 yugan95 commented Jun 8, 2026

Copy link
Copy Markdown
Contributor

Purpose

Linked issue: #7620

RowHelper.reuseWriter grows its internal MemorySegment for large records (e.g. 100MB+), but BinaryRowWriter.reset() only resets the cursor without releasing the oversized segment. Additionally, InternalRowSerializer.serialize() can exit via EOFException — a normal signal when the sort buffer is full (SimpleCollectingOutputView.nextSegment() throws it, caught by BinaryInMemorySortBuffer.write()) — skipping any cleanup of the bloated buffer.

With many buckets (e.g. 256), each bucket's writer independently retains an inflated buffer: 256 × 100MB+ = tens of GB, causing OOM.

Changes

  • RowHelper: add resetIfTooLarge() with hysteresis — release internal buffer only when the segment exceeds 4MB and the last written record (reuseRow.getSizeInBytes()) is smaller than 4MB
    • Sustained large records (5–10MB): buffer retained, no thrashing
    • Occasional large record → back to small records: buffer released, OOM protection
    • Normal small records (< 4MB): buffer never exceeds threshold, the check is a no-op
  • InternalRowSerializer: call resetIfTooLarge() in finally blocks of serialize() and serializeToPages() to handle the EOFException exit path

Why 4MB and why not configurable

The threshold is derived from the production scenario: 256 buckets × 4MB = 1GB baseline, leaving reasonable headroom in a typical 4–8GB TaskManager heap. Making it configurable would require plumbing config through the entire serializer chain — RowHelper and InternalRowSerializer live in paimon-common which is config-free by design (all 27 serializer classes are parameterized solely by schema types). The full chain from CoreOptionsKeyValueFileStoreWriteMergeTreeWriterSortBufferWriteBufferBinaryInMemorySortBufferInternalRowSerializerRowHelper would need changes across modules. A fixed value with hysteresis is sufficient for the initial fix.

Tests

RowHelperTest — 4 test cases covering:

  • Hysteresis: buffer retained when last record is large
  • Buffer released when workload transitions to small records
  • Safe to call before any copyInto
  • Reuse recreated after release, small buffer survives resetIfTooLarge()

API and Format

N/A

Documentation

N/A

@JingsongLi JingsongLi left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for tackling this. The direction makes sense to me, but I think the current cleanup is still a bit too tied to specific call paths and can miss some retained-buffer cases.

A few concerns:

  1. RowHelper.resetIfTooLarge() checks reuseRow.getSizeInBytes(), but InternalRowSerializer.toBinaryRow() returns immediately for BinaryRow inputs. If a large GenericRow inflates the helper and subsequent small rows are already BinaryRow, the helper state is not refreshed, so the oversized buffer may remain retained.

  2. Nested row conversion has the same problem. AbstractBinaryWriter.writeRow() calls the nested InternalRowSerializer.toBinaryRow(input), which may inflate the nested serializer’s RowHelper, but the new finally blocks only reset the outer serializer’s helper. A large nested ROW field can therefore still retain a large nested buffer.

  3. Some callers use toBinaryRow() directly and then serialize manually, for example the initial-buffer path in DataPagedOutputSerializer.write(). Those paths do not go through the new serialize() / serializeToPages() finally blocks, so they can also skip cleanup.

Could we make the lifecycle more explicit inside InternalRowSerializer / RowHelper, so that cleanup is tied to the actual reusable BinaryRow borrowed for the current conversion rather than relying on each serialization entry point to remember a finally block? At minimum, I think this needs regression coverage for BinaryRow input after a large generic row, nested row fields, and a direct toBinaryRow() caller path.

@JingsongLi

Copy link
Copy Markdown
Contributor

To make the suggestion more concrete, the shape I had in mind is something like this: let the serializer keep cleanup next to the conversion result, and make RowHelper release only when the row being serialized is actually its reusable row.

@Override
public void serialize(InternalRow row, DataOutputView target) throws IOException {
    BinaryRow binaryRow = toBinaryRow(row);
    try {
        binarySerializer.serialize(binaryRow, target);
    } finally {
        rowHelper.resetIfTooLarge(binaryRow);
    }
}

@Override
public int serializeToPages(InternalRow row, AbstractPagedOutputView target) throws IOException {
    BinaryRow binaryRow = toBinaryRow(row);
    try {
        return binarySerializer.serializeToPages(binaryRow, target);
    } finally {
        rowHelper.resetIfTooLarge(binaryRow);
    }
}

And in RowHelper:

public void resetIfTooLarge(BinaryRow currentRow) {
    if (currentRow == reuseRow
            && reuseWriter != null
            && reuseWriter.getSegments() != null
            && reuseWriter.getSegments().size() > REUSE_RELEASE_THRESHOLD
            && currentRow.getSizeInBytes() < REUSE_RELEASE_THRESHOLD) {
        reuseRow = null;
        reuseWriter = null;
    }
}

This does not solve every path by itself, but it avoids using stale reuseRow.getSizeInBytes() when the current input is already a BinaryRow. For nested rows, AbstractBinaryWriter.writeRow() would need a similar after-use hook on the nested serializer, for example:

BinaryRow row = serializer.toBinaryRow(input);
try {
    writeSegmentsToVarLenPart(pos, row.getSegments(), row.getOffset(), row.getSizeInBytes());
} finally {
    serializer.resetIfTooLarge(row);
}

The exact API name can be different, but I think this makes the intended ownership clearer: only the serializer/helper that produced the reusable row is responsible for deciding whether to drop that reusable buffer after the current use.

@yugan95 yugan95 force-pushed the fix/rowhelper-buffer-release branch from 8f3be7d to 618fb54 Compare June 8, 2026 06:35
@yugan95

yugan95 commented Jun 8, 2026

Copy link
Copy Markdown
Contributor Author

@JingsongLi Thanks for the concrete suggestion! I've adopted the approach in the updated commit:

  1. resetIfTooLarge(BinaryRow currentRow) with identity check

RowHelper.resetIfTooLarge() now accepts the BinaryRow that was actually serialized. The first condition is currentRow == reuseRow — if toBinaryRow() returned the input BinaryRow directly (no conversion), the identity check fails and no cleanup occurs, avoiding the stale-state issue you identified.

  1. InternalRowSerializer captures binaryRow before try

Both serialize() and serializeToPages() now follow the pattern you suggested:

  BinaryRow binaryRow = toBinaryRow(row);
  try {
      binarySerializer.serialize(binaryRow, target);
  } finally {
      rowHelper.resetIfTooLarge(binaryRow);
  }
  1. Test coverage — added testSkipsWhenCurrentRowIsNotReuseRow() to verify that passing an external BinaryRow (simulating the BinaryRow input path) does NOT trigger buffer release even when the helper's buffer is oversized.

Regarding nested row fields and direct toBinaryRow() callers — I agree those are real gaps. For this PR I'd like to keep the scope focused on the serialize/serializeToPages entry points which are the primary OOM path. The nested AbstractBinaryWriter.writeRow() hook and coverage for DataPagedOutputSerializer.write() can be addressed in a follow-up. Would that work for you?

@yugan95 yugan95 requested a review from JingsongLi June 8, 2026 08:46
@JingsongLi

Copy link
Copy Markdown
Contributor

Thanks for updating the cleanup ownership check. I think the current fixed 4MB threshold is acceptable as a first guardrail, especially because paimon-common does not have configuration context and making this configurable would add a lot of plumbing for a focused bug fix.

One possible improvement to the release strategy: instead of only releasing when the current row is smaller than 4MB, we can combine the fixed cap with a relative hysteresis check, for example:

bufferCapacity > MAX_RETAINED_REUSE_BUFFER_SIZE
        && bufferCapacity > currentRow.getSizeInBytes() * SHRINK_RATIO

This keeps the good parts of the current approach:

  • normal small rows never pay for cleanup unless the buffer was actually inflated;
  • sustained large rows do not thrash by reallocating every record;
  • a 100MB spike followed by tiny rows gets released.

But it also handles the case where a 100MB spike is followed by medium-size rows, e.g. 5MB rows. With the current currentRow < 4MB condition, the helper may keep retaining the 100MB buffer even though the active workload has dropped far below that size. A fixed cap plus ratio check would release only when the retained buffer is clearly oversized relative to the row currently being serialized.

I do not think this needs to become configurable in this PR. If we want to keep the patch small, I am also okay with merging the current version and treating the relative hysteresis as a follow-up refinement.

@yugan95

yugan95 commented Jun 10, 2026

Copy link
Copy Markdown
Contributor Author

@JingsongLi Thanks for the suggestion! Updated to use a combined fixed-cap and ratio check:

  private static final int MAX_RETAINED_REUSE_BUFFER_SIZE = 4 * 1024 * 1024; // 4MB
  private static final int SHRINK_RATIO = 4;

  public void resetIfTooLarge(BinaryRow currentRow) {
      if (currentRow == reuseRow
              && reuseWriter != null
              && reuseWriter.getSegments() != null) {
          int bufferCapacity = reuseWriter.getSegments().size();
          if (bufferCapacity > MAX_RETAINED_REUSE_BUFFER_SIZE
                  && bufferCapacity > (long) currentRow.getSizeInBytes() * SHRINK_RATIO) {
              reuseRow = null;
              reuseWriter = null;
          }
      }
  }
  • 100MB buffer + 5MB row → 20x > 4x → release
  • 10MB buffer + 5MB row → 2x < 4x → retain
  • 3MB buffer + tiny row → below 4MB cap → retain

SHRINK_RATIO = 4 is well above the 1.5x grow factor (AbstractBinaryWriter.grow()), so normal buffer growth will never trigger a false release.

Tests updated to cover the ratio-based logic.

@JingsongLi JingsongLi left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@JingsongLi JingsongLi merged commit 62d2422 into apache:master Jun 11, 2026
12 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants