Skip to content

Commit 2b2aa6b

Browse files
authored
fix(parquet): align dictionary fallback with parquet-mr (#786)
### Rationale for this change On dictionary overflow, arrow-go always flushed the dictionary page and any buffered dict-encoded data pages before switching to PLAIN, even when no dict-encoded data page had been cut. On mid-cardinality columns the result was a 4-encoding chunk layout (PLAIN_DICTIONARY, PLAIN, RLE, PLAIN) that bloated output by 20-30% versus parquet-mr. This was noticed when testing iceberg-go's recently added compaction feature, where some tables with particular high cardinality columns would see a 30% size increase after compaction. ### What changes are included in this PR? Mirror parquet-mr's FallbackValuesWriter: - Discard the dictionary and re-encode buffered indices as PLAIN when no dict-encoded data page has been flushed yet; only emit the dictionary page once a dict-encoded page is committed. - Before the first dict-encoded page, fall back to PLAIN if dict + indices >= raw input bytes. - Size dict-encoded pages by raw input bytes (not the RLE indices' encoded size) so the page cadence matches PLAIN. Adds DictEncoder.FallBackTo / ObservedRawSize and exposes BinaryMemoTable.Value for the fallback translation. ### Are these changes tested? Yes, as part of the PR and also e2e while testing compaction in iceberg-go. ### Are there any user-facing changes? No public API changes, only observable thing should be the dropped double encoding.
1 parent cb314d6 commit 2b2aa6b

12 files changed

Lines changed: 1003 additions & 126 deletions

parquet/file/column_writer.go

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ type columnWriter struct {
140140
totalCompressedBytes int64
141141
closed bool
142142
fallbackToNonDict bool
143+
dictPageWritten bool
143144

144145
pages []DataPage
145146

@@ -264,8 +265,20 @@ func (w *columnWriter) commitWriteAndCheckPageLimit(numLevels, numValues int64)
264265
w.numBufferedValues += numLevels
265266
w.numDataValues += numValues
266267

267-
enc := w.currentEncoder.EstimatedDataEncodedSize()
268-
if enc >= w.props.DataPageSize() {
268+
// While dictionary encoding is active we size pages by raw input bytes
269+
// instead of the RLE-indices' encoded size. Mirrors parquet-mr's
270+
// FallbackValuesWriter.getBufferedSize — it keeps dict pages roughly
271+
// the same raw-byte footprint as the PLAIN pages they'd otherwise be,
272+
// which also pulls the first-page compression check into the same
273+
// cadence parquet-mr uses and avoids committing dict pages that only
274+
// look cheap because their RLE indices are tiny.
275+
var bufferedSize int64
276+
if w.hasDict && !w.fallbackToNonDict {
277+
bufferedSize = w.currentEncoder.(encoding.DictEncoder).ObservedRawSize()
278+
} else {
279+
bufferedSize = w.currentEncoder.EstimatedDataEncodedSize()
280+
}
281+
if bufferedSize >= w.props.DataPageSize() {
269282
return w.FlushCurrentPage()
270283
}
271284
return nil
@@ -427,7 +440,14 @@ func (w *columnWriter) FlushBufferedDataPages() (err error) {
427440
return err
428441
}
429442
}
443+
return w.drainBufferedDataPages()
444+
}
430445

446+
// drainBufferedDataPages writes out and releases any pages buffered while
447+
// dictionary encoding was active. Unlike FlushBufferedDataPages, it does not
448+
// touch the current encoder's unflushed values, so the caller can re-encode
449+
// them as PLAIN during a dictionary fallback.
450+
func (w *columnWriter) drainBufferedDataPages() (err error) {
431451
for i, p := range w.pages {
432452
defer p.Release()
433453
if err = w.WriteDataPage(p); err != nil {
@@ -502,6 +522,7 @@ func (w *columnWriter) WriteDictionaryPage() error {
502522
page := NewDictionaryPage(buffer, int32(dictEncoder.NumEntries()), w.props.DictionaryPageEncoding())
503523
written, err := w.pager.WriteDictionaryPage(page)
504524
w.totalBytesWritten += written
525+
w.dictPageWritten = err == nil
505526
return err
506527
}
507528

@@ -620,7 +641,14 @@ func (w *columnWriter) Close() (err error) {
620641
if w.rowsWritten > 0 && chunkStats.IsSet() {
621642
w.metaData.SetStats(chunkStats)
622643
}
623-
err = w.pager.Close(w.hasDict, w.fallbackToNonDict)
644+
// Only advertise PLAIN_DICTIONARY / fallback encodings in the column
645+
// chunk's encoding list when a dictionary page was actually written.
646+
// When fallback discards the dictionary before any dict-encoded page
647+
// is flushed, the column contains only PLAIN data and the list should
648+
// reflect that unambiguously.
649+
advertiseDict := w.hasDict && w.dictPageWritten
650+
advertiseFallback := w.fallbackToNonDict && w.dictPageWritten
651+
err = w.pager.Close(advertiseDict, advertiseFallback)
624652
}
625653
return err
626654
}

parquet/file/column_writer_test.go

Lines changed: 11 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -427,39 +427,20 @@ func (p *PrimitiveWriterTestSuite) testDictionaryFallbackEncoding(version parque
427427
p.EqualValues(VeryLargeSize, valuesRead)
428428
p.Equal(p.Values, p.ValuesOut)
429429

430+
// With the parquet-mr-aligned fallback behavior, a dictionary that overflows
431+
// before any dict-encoded data page has been flushed is discarded entirely.
432+
// With the default page-size parameters used here the dict-encoded
433+
// EstimatedDataEncodedSize (RLE indices) stays under DataPageSize right up
434+
// to the overflow point, so no dict page and no dict-encoded data pages
435+
// appear in the output — the column is pure PLAIN.
430436
encodings := p.metadataEncodings()
431-
if p.Typ.Kind() == reflect.Bool || p.Typ == reflect.TypeOf(parquet.Int96{}) {
432-
// dictionary encoding is not allowed for booleans
433-
// there are 2 encodings (PLAIN, RLE) in a non dictionary encoding case
434-
p.Equal([]parquet.Encoding{parquet.Encodings.Plain, parquet.Encodings.RLE}, encodings)
435-
} else if version == parquet.V1_0 {
436-
// There are 4 encodings (PLAIN_DICTIONARY, PLAIN, RLE, PLAIN) in a fallback case
437-
// for version 1.0
438-
p.Equal([]parquet.Encoding{parquet.Encodings.PlainDict, parquet.Encodings.Plain, parquet.Encodings.RLE, parquet.Encodings.Plain}, encodings)
439-
} else {
440-
// There are 4 encodings (RLE_DICTIONARY, PLAIN, RLE, PLAIN) in a fallback case for
441-
// version 2.0
442-
p.Equal([]parquet.Encoding{parquet.Encodings.RLEDict, parquet.Encodings.Plain, parquet.Encodings.RLE, parquet.Encodings.Plain}, encodings)
443-
}
437+
p.Equal([]parquet.Encoding{parquet.Encodings.Plain, parquet.Encodings.RLE}, encodings)
444438

445439
encodingStats := p.metadataEncodingStats()
446-
if p.Typ.Kind() == reflect.Bool || p.Typ == reflect.TypeOf(parquet.Int96{}) {
447-
p.Equal(parquet.Encodings.Plain, encodingStats[0].Encoding)
448-
p.Equal(format.PageType_DATA_PAGE, encodingStats[0].PageType)
449-
} else if version == parquet.V1_0 {
450-
expected := []metadata.PageEncodingStats{
451-
{Encoding: parquet.Encodings.PlainDict, PageType: format.PageType_DICTIONARY_PAGE},
452-
{Encoding: parquet.Encodings.Plain, PageType: format.PageType_DATA_PAGE},
453-
{Encoding: parquet.Encodings.PlainDict, PageType: format.PageType_DATA_PAGE}}
454-
p.Equal(expected[0], encodingStats[0])
455-
p.ElementsMatch(expected[1:], encodingStats[1:])
456-
} else {
457-
expected := []metadata.PageEncodingStats{
458-
{Encoding: parquet.Encodings.Plain, PageType: format.PageType_DICTIONARY_PAGE},
459-
{Encoding: parquet.Encodings.Plain, PageType: format.PageType_DATA_PAGE},
460-
{Encoding: parquet.Encodings.RLEDict, PageType: format.PageType_DATA_PAGE}}
461-
p.Equal(expected[0], encodingStats[0])
462-
p.ElementsMatch(expected[1:], encodingStats[1:])
440+
p.NotEmpty(encodingStats)
441+
for _, es := range encodingStats {
442+
p.Equal(parquet.Encodings.Plain, es.Encoding)
443+
p.Equal(format.PageType_DATA_PAGE, es.PageType)
463444
}
464445
}
465446

0 commit comments

Comments
 (0)