Skip to content

Commit 3ad38d0

Browse files
authored
fix(parquet/file): use adaptive batch sizing to avoid panic (#690)
### Rationale for this change Issue reported in #622 (comment) where accumulated data on a given page exceeds the DataPageSize. ### What changes are included in this PR? Removing a broken mid-batch flush in `writeValues`/`writeValuesSpaced`, instead relying back on `encoder.Put()/encoder.PutSpaced()`. Updated `WriteBatch` to use an adaptive batch sizing approach for ByteArray/FLBA writing to properly handle v2 data page row-boundary alignment without breaking on very large individual values. ### Are these changes tested? New tests are added to cover this scenario to ensure test coverage. ### Are there any user-facing changes? No
1 parent 3194e44 commit 3ad38d0

4 files changed

Lines changed: 532 additions & 330 deletions

File tree

parquet/file/column_writer.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ package file
1919
import (
2020
"bytes"
2121
"encoding/binary"
22+
"fmt"
2223
"io"
24+
"math"
2325
"strconv"
2426

2527
"github.com/apache/arrow-go/v18/arrow"
@@ -303,7 +305,12 @@ func (w *columnWriter) FlushCurrentPage() error {
303305
repLevelsRLESize = int32(w.repLevelSink.Len())
304306
}
305307

306-
uncompressed := defLevelsRLESize + repLevelsRLESize + int32(values.Len())
308+
uncompressed64 := int64(defLevelsRLESize) + int64(repLevelsRLESize) + int64(values.Len())
309+
if uncompressed64 > math.MaxInt32 {
310+
return fmt.Errorf("parquet: uncompressed page size %d exceeds INT32_MAX (%d)",
311+
uncompressed64, int64(math.MaxInt32))
312+
}
313+
uncompressed := int32(uncompressed64)
307314
if isV1DataPage {
308315
err = w.buildDataPageV1(defLevelsRLESize, repLevelsRLESize, uncompressed, values.Bytes())
309316
} else {
@@ -378,7 +385,7 @@ func (w *columnWriter) buildDataPageV2(defLevelsRLESize, repLevelsRLESize, uncom
378385

379386
// concatenate uncompressed levels and the possibly compressed values
380387
var combined bytes.Buffer
381-
combined.Grow(int(defLevelsRLESize + repLevelsRLESize + int32(len(data))))
388+
combined.Grow(int(int64(defLevelsRLESize) + int64(repLevelsRLESize) + int64(len(data))))
382389
w.concatBuffers(defLevelsRLESize, repLevelsRLESize, data, &combined)
383390

384391
pageStats, err := w.getPageStatistics()

0 commit comments

Comments
 (0)