Skip to content

Commit 2be0d8b

Browse files
committed
PARQUET-3479: Add configurable threshold to delay dictionary compression check
1 parent 9d9ddca commit 2be0d8b

7 files changed

Lines changed: 362 additions & 11 deletions

File tree

parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ public class ParquetProperties {
6767
public static final boolean DEFAULT_STATISTICS_ENABLED = true;
6868
public static final boolean DEFAULT_SIZE_STATISTICS_ENABLED = true;
6969

70+
public static final long DEFAULT_DICTIONARY_CHECK_THRESHOLD_RAW_SIZE_BYTES = 0;
7071
public static final boolean DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED = true;
7172

7273
/**
@@ -131,6 +132,7 @@ public static WriterVersion fromString(String name) {
131132
private final int rowGroupRowCountLimit;
132133
private final int pageRowCountLimit;
133134
private final boolean pageWriteChecksumEnabled;
135+
private final long dictionaryCheckThresholdRawSizeBytes;
134136
private final ColumnProperty<ByteStreamSplitMode> byteStreamSplitEnabled;
135137
private final Map<String, String> extraMetaData;
136138
private final ColumnProperty<Boolean> statistics;
@@ -163,6 +165,7 @@ private ParquetProperties(Builder builder) {
163165
this.rowGroupRowCountLimit = builder.rowGroupRowCountLimit;
164166
this.pageRowCountLimit = builder.pageRowCountLimit;
165167
this.pageWriteChecksumEnabled = builder.pageWriteChecksumEnabled;
168+
this.dictionaryCheckThresholdRawSizeBytes = builder.dictionaryCheckThresholdRawSizeBytes;
166169
this.byteStreamSplitEnabled = builder.byteStreamSplitEnabled.build();
167170
this.extraMetaData = builder.extraMetaData;
168171
this.statistics = builder.statistics.build();
@@ -322,6 +325,17 @@ public boolean getPageWriteChecksumEnabled() {
322325
return pageWriteChecksumEnabled;
323326
}
324327

328+
/**
329+
* Returns the byte threshold after which the dictionary compression check is performed.
330+
* A value of 0 means check on the first page (backward compatible default). Higher values
331+
* delay the check until that many raw bytes have been accumulated across pages.
332+
*
333+
* @return the byte threshold for the dictionary compression check
334+
*/
335+
public long getDictionaryCheckThresholdRawSizeBytes() {
336+
return dictionaryCheckThresholdRawSizeBytes;
337+
}
338+
325339
public OptionalLong getBloomFilterNDV(ColumnDescriptor column) {
326340
Long ndv = bloomFilterNDVs.getValue(column);
327341
return ndv == null ? OptionalLong.empty() : OptionalLong.of(ndv);
@@ -415,6 +429,7 @@ public static class Builder {
415429
private int rowGroupRowCountLimit = DEFAULT_ROW_GROUP_ROW_COUNT_LIMIT;
416430
private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT;
417431
private boolean pageWriteChecksumEnabled = DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED;
432+
private long dictionaryCheckThresholdRawSizeBytes = DEFAULT_DICTIONARY_CHECK_THRESHOLD_RAW_SIZE_BYTES;
418433
private final ColumnProperty.Builder<ByteStreamSplitMode> byteStreamSplitEnabled;
419434
private Map<String, String> extraMetaData = new HashMap<>();
420435
private final ColumnProperty.Builder<Boolean> statistics;
@@ -450,6 +465,7 @@ private Builder(ParquetProperties toCopy) {
450465
this.allocator = toCopy.allocator;
451466
this.pageRowCountLimit = toCopy.pageRowCountLimit;
452467
this.pageWriteChecksumEnabled = toCopy.pageWriteChecksumEnabled;
468+
this.dictionaryCheckThresholdRawSizeBytes = toCopy.dictionaryCheckThresholdRawSizeBytes;
453469
this.bloomFilterNDVs = ColumnProperty.builder(toCopy.bloomFilterNDVs);
454470
this.bloomFilterFPPs = ColumnProperty.builder(toCopy.bloomFilterFPPs);
455471
this.bloomFilterEnabled = ColumnProperty.builder(toCopy.bloomFilterEnabled);
@@ -709,6 +725,20 @@ public Builder withPageWriteChecksumEnabled(boolean val) {
709725
return this;
710726
}
711727

728+
/**
729+
* Set the raw data byte threshold after which the dictionary compression check is performed.
730+
* A value of 0 means check on the first page (backward compatible default). Higher values
731+
* delay the check until that many raw bytes have been accumulated across pages.
732+
*
733+
* @param val byte threshold (default: 0)
734+
* @return this builder for method chaining
735+
*/
736+
public Builder withDictionaryCheckThresholdRawSizeBytes(long val) {
737+
Preconditions.checkArgument(val >= 0, "dictionaryCheckThresholdRawSizeBytes must be >= 0");
738+
this.dictionaryCheckThresholdRawSizeBytes = val;
739+
return this;
740+
}
741+
712742
public Builder withExtraMetaData(Map<String, String> extraMetaData) {
713743
this.extraMetaData = extraMetaData;
714744
return this;

parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,9 @@ static ValuesWriter dictWriterWithFallBack(
111111
ValuesWriter writerToFallBackTo) {
112112
if (parquetProperties.isDictionaryEnabled(path)) {
113113
return FallbackValuesWriter.of(
114-
dictionaryWriter(path, parquetProperties, dictPageEncoding, dataPageEncoding), writerToFallBackTo);
114+
dictionaryWriter(path, parquetProperties, dictPageEncoding, dataPageEncoding),
115+
writerToFallBackTo,
116+
parquetProperties.getDictionaryCheckThresholdRawSizeBytes());
115117
} else {
116118
return writerToFallBackTo;
117119
}

parquet-column/src/main/java/org/apache/parquet/column/values/fallback/FallbackValuesWriter.java

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,12 @@ public class FallbackValuesWriter<I extends ValuesWriter & RequiresFallback, F e
3030

3131
public static <I extends ValuesWriter & RequiresFallback, F extends ValuesWriter> FallbackValuesWriter<I, F> of(
3232
I initialWriter, F fallBackWriter) {
33-
return new FallbackValuesWriter<>(initialWriter, fallBackWriter);
33+
return new FallbackValuesWriter<>(initialWriter, fallBackWriter, /*checkAfterBytes=*/ 0);
34+
}
35+
36+
public static <I extends ValuesWriter & RequiresFallback, F extends ValuesWriter> FallbackValuesWriter<I, F> of(
37+
I initialWriter, F fallBackWriter, long checkAfterBytes) {
38+
return new FallbackValuesWriter<>(initialWriter, fallBackWriter, checkAfterBytes);
3439
}
3540

3641
/**
@@ -43,6 +48,11 @@ public static <I extends ValuesWriter & RequiresFallback, F extends ValuesWriter
4348
public final F fallBackWriter;
4449

4550
private boolean fellBackAlready = false;
51+
private boolean compressionChecked = false;
52+
private final long checkAfterBytes;
53+
/** Accumulates raw bytes across pages (only reset in resetDictionary) so the
54+
* threshold check works even when individual pages are smaller than checkAfterBytes. */
55+
private long cumulativeRawBytes = 0;
4656

4757
/**
4858
* writer currently written to
@@ -57,16 +67,16 @@ public static <I extends ValuesWriter & RequiresFallback, F extends ValuesWriter
5767
*/
5868
private long rawDataByteSize = 0;
5969

60-
/**
61-
* indicates if this is the first page being processed
62-
*/
63-
private boolean firstPage = true;
64-
6570
public FallbackValuesWriter(I initialWriter, F fallBackWriter) {
71+
this(initialWriter, fallBackWriter, /*checkAfterBytes=*/ 0);
72+
}
73+
74+
public FallbackValuesWriter(I initialWriter, F fallBackWriter, long checkAfterBytes) {
6675
super();
6776
this.initialWriter = initialWriter;
6877
this.fallBackWriter = fallBackWriter;
6978
this.currentWriter = initialWriter;
79+
this.checkAfterBytes = checkAfterBytes;
7080
}
7181

7282
@Override
@@ -79,8 +89,9 @@ public long getBufferedSize() {
7989

8090
@Override
8191
public BytesInput getBytes() {
82-
if (!fellBackAlready && firstPage) {
83-
// we use the first page to decide if we're going to use this encoding
92+
cumulativeRawBytes += rawDataByteSize;
93+
if (!fellBackAlready && !compressionChecked && cumulativeRawBytes >= checkAfterBytes) {
94+
compressionChecked = true;
8495
BytesInput bytes = initialWriter.getBytes();
8596
if (!initialWriter.isCompressionSatisfying(rawDataByteSize, bytes.size())) {
8697
fallBack();
@@ -103,7 +114,6 @@ public Encoding getEncoding() {
103114
@Override
104115
public void reset() {
105116
rawDataByteSize = 0;
106-
firstPage = false;
107117
currentWriter.reset();
108118
}
109119

@@ -131,8 +141,9 @@ public void resetDictionary() {
131141
}
132142
currentWriter = initialWriter;
133143
fellBackAlready = false;
144+
compressionChecked = false;
145+
cumulativeRawBytes = 0;
134146
initialUsedAndHadDictionary = false;
135-
firstPage = true;
136147
}
137148

138149
@Override
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.parquet.column.values.fallback;
20+
21+
import static org.junit.Assert.assertFalse;
22+
import static org.junit.Assert.assertTrue;
23+
24+
import org.apache.parquet.bytes.DirectByteBufferAllocator;
25+
import org.apache.parquet.bytes.TrackingByteBufferAllocator;
26+
import org.apache.parquet.column.Encoding;
27+
import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainIntegerDictionaryValuesWriter;
28+
import org.apache.parquet.column.values.plain.PlainValuesWriter;
29+
import org.junit.After;
30+
import org.junit.Before;
31+
import org.junit.Test;
32+
33+
public class TestFallbackValuesWriter {
34+
35+
private TrackingByteBufferAllocator allocator;
36+
37+
@Before
38+
public void initAllocator() {
39+
allocator = TrackingByteBufferAllocator.wrap(new DirectByteBufferAllocator());
40+
}
41+
42+
@After
43+
public void closeAllocator() {
44+
allocator.close();
45+
}
46+
47+
/**
48+
* With threshold=0, the check fires on the first page and falls back for high-cardinality data.
49+
*/
50+
@Test
51+
public void testThresholdZeroFallsBackImmediately() throws Exception {
52+
int dictPageSize = 1024 * 1024;
53+
54+
PlainIntegerDictionaryValuesWriter dictWriter = new PlainIntegerDictionaryValuesWriter(
55+
dictPageSize, Encoding.PLAIN_DICTIONARY, Encoding.PLAIN_DICTIONARY, allocator);
56+
PlainValuesWriter plainWriter = new PlainValuesWriter(1024, 1024 * 1024, allocator);
57+
FallbackValuesWriter<PlainIntegerDictionaryValuesWriter, PlainValuesWriter> writer =
58+
FallbackValuesWriter.of(dictWriter, plainWriter, 0);
59+
60+
try {
61+
for (int i = 0; i < 1000; i++) {
62+
writer.writeInteger(i);
63+
}
64+
writer.getBytes();
65+
66+
assertFalse(
67+
"Should fall back to plain encoding with threshold=0 and high cardinality",
68+
writer.getEncoding().usesDictionary());
69+
} finally {
70+
writer.close();
71+
}
72+
}
73+
74+
/**
75+
* With a large threshold, the check never fires and dictionary encoding is preserved.
76+
*/
77+
@Test
78+
public void testLargeThresholdPreservesDictionary() throws Exception {
79+
int dictPageSize = 1024 * 1024;
80+
81+
PlainIntegerDictionaryValuesWriter dictWriter = new PlainIntegerDictionaryValuesWriter(
82+
dictPageSize, Encoding.PLAIN_DICTIONARY, Encoding.PLAIN_DICTIONARY, allocator);
83+
PlainValuesWriter plainWriter = new PlainValuesWriter(1024, 1024 * 1024, allocator);
84+
FallbackValuesWriter<PlainIntegerDictionaryValuesWriter, PlainValuesWriter> writer =
85+
FallbackValuesWriter.of(dictWriter, plainWriter, Long.MAX_VALUE);
86+
87+
try {
88+
for (int i = 0; i < 1000; i++) {
89+
writer.writeInteger(i);
90+
}
91+
writer.getBytes();
92+
93+
assertTrue(
94+
"Dictionary encoding should be preserved with large threshold",
95+
writer.getEncoding().usesDictionary());
96+
} finally {
97+
writer.close();
98+
}
99+
}
100+
101+
/**
102+
* Threshold is crossed only after a reset() (page flush). cumulativeRawBytes accumulates
103+
* across pages while rawDataByteSize resets per page.
104+
*/
105+
@Test
106+
public void testThresholdCrossedAfterReset() throws Exception {
107+
int dictPageSize = 1024 * 1024;
108+
long threshold = 500;
109+
110+
PlainIntegerDictionaryValuesWriter dictWriter = new PlainIntegerDictionaryValuesWriter(
111+
dictPageSize, Encoding.PLAIN_DICTIONARY, Encoding.PLAIN_DICTIONARY, allocator);
112+
PlainValuesWriter plainWriter = new PlainValuesWriter(1024, 1024 * 1024, allocator);
113+
FallbackValuesWriter<PlainIntegerDictionaryValuesWriter, PlainValuesWriter> writer =
114+
FallbackValuesWriter.of(dictWriter, plainWriter, threshold);
115+
116+
try {
117+
// Write ~300 bytes (75 ints * 4 bytes = 300) — below threshold
118+
for (int i = 0; i < 75; i++) {
119+
writer.writeInteger(i);
120+
}
121+
// Simulate page flush — check should NOT fire (cumulative = 300 < 500)
122+
writer.getBytes();
123+
assertTrue(
124+
"Should still use dictionary before threshold is crossed",
125+
writer.getEncoding().usesDictionary());
126+
writer.reset();
127+
128+
// Write another ~300 bytes (75 ints * 4 = 300, cumulative now 600 > 500)
129+
for (int i = 75; i < 150; i++) {
130+
writer.writeInteger(i);
131+
}
132+
// Check SHOULD fire now and fall back (high cardinality, bad compression)
133+
writer.getBytes();
134+
assertFalse(
135+
"Should fall back after cumulative bytes cross threshold",
136+
writer.getEncoding().usesDictionary());
137+
} finally {
138+
writer.close();
139+
}
140+
}
141+
}

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,15 @@ public static enum JobSummaryLevel {
161161
public static final String BLOCK_ROW_COUNT_LIMIT = "parquet.block.row.count.limit";
162162
public static final String PAGE_ROW_COUNT_LIMIT = "parquet.page.row.count.limit";
163163
public static final String PAGE_WRITE_CHECKSUM_ENABLED = "parquet.page.write-checksum.enabled";
164+
/**
165+
* Raw data byte threshold after which the dictionary compression check is performed.
166+
* Once cumulative raw bytes (excluding nulls) written to a column chunk reach this value,
167+
* the writer evaluates whether dictionary encoding is effective. If not, it falls back to
168+
* plain encoding. A value of 0 means check on the first page (backward compatible default).
169+
*/
170+
public static final String DICTIONARY_CHECK_THRESHOLD_RAW_SIZE_BYTES =
171+
"parquet.dictionary.check.threshold.raw.size.bytes";
172+
164173
public static final String STATISTICS_ENABLED = "parquet.column.statistics.enabled";
165174
public static final String SIZE_STATISTICS_ENABLED = "parquet.size.statistics.enabled";
166175

@@ -412,6 +421,16 @@ public static boolean getPageWriteChecksumEnabled(Configuration conf) {
412421
return conf.getBoolean(PAGE_WRITE_CHECKSUM_ENABLED, ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED);
413422
}
414423

424+
public static void setDictionaryCheckThresholdRawSizeBytes(Configuration conf, long val) {
425+
conf.setLong(DICTIONARY_CHECK_THRESHOLD_RAW_SIZE_BYTES, val);
426+
}
427+
428+
public static long getDictionaryCheckThresholdRawSizeBytes(Configuration conf) {
429+
return conf.getLong(
430+
DICTIONARY_CHECK_THRESHOLD_RAW_SIZE_BYTES,
431+
ParquetProperties.DEFAULT_DICTIONARY_CHECK_THRESHOLD_RAW_SIZE_BYTES);
432+
}
433+
415434
public static void setStatisticsEnabled(JobContext jobContext, boolean enabled) {
416435
getConfiguration(jobContext).setBoolean(STATISTICS_ENABLED, enabled);
417436
}
@@ -526,6 +545,7 @@ public RecordWriter<Void, T> getRecordWriter(Configuration conf, Path file, Comp
526545
.withRowGroupRowCountLimit(getBlockRowCountLimit(conf))
527546
.withPageRowCountLimit(getPageRowCountLimit(conf))
528547
.withPageWriteChecksumEnabled(getPageWriteChecksumEnabled(conf))
548+
.withDictionaryCheckThresholdRawSizeBytes(getDictionaryCheckThresholdRawSizeBytes(conf))
529549
.withStatisticsEnabled(getStatisticsEnabled(conf));
530550
new ColumnConfigParser()
531551
.withColumnConfig(

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -771,6 +771,17 @@ public SELF withPageWriteChecksumEnabled(boolean enablePageWriteChecksum) {
771771
return self();
772772
}
773773

774+
/**
775+
* Set the raw data byte threshold after which the dictionary compression check is performed.
776+
*
777+
* @param val byte threshold (0 means check on the first page / preserve previous behavior)
778+
* @return this builder for method chaining.
779+
*/
780+
public SELF withDictionaryCheckThresholdRawSizeBytes(long val) {
781+
encodingPropsBuilder.withDictionaryCheckThresholdRawSizeBytes(val);
782+
return self();
783+
}
784+
774785
/**
775786
* Set max Bloom filter bytes for related columns.
776787
*

0 commit comments

Comments
 (0)