Skip to content

Commit ddf1332

Browse files
committed
PARQUET-3479: Add configuration to disable early dictionary compression check
1 parent 28593d5 commit ddf1332

4 files changed

Lines changed: 131 additions & 3 deletions

File tree

parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ public class ParquetProperties {
6767
public static final boolean DEFAULT_STATISTICS_ENABLED = true;
6868
public static final boolean DEFAULT_SIZE_STATISTICS_ENABLED = true;
6969

70+
public static final boolean DEFAULT_DICTIONARY_EARLY_CHECK_ENABLED = true;
7071
public static final boolean DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED = true;
7172

7273
/**
@@ -131,6 +132,7 @@ public static WriterVersion fromString(String name) {
131132
private final int rowGroupRowCountLimit;
132133
private final int pageRowCountLimit;
133134
private final boolean pageWriteChecksumEnabled;
135+
private final boolean dictionaryEarlyCheckEnabled;
134136
private final ColumnProperty<ByteStreamSplitMode> byteStreamSplitEnabled;
135137
private final Map<String, String> extraMetaData;
136138
private final ColumnProperty<Boolean> statistics;
@@ -163,6 +165,7 @@ private ParquetProperties(Builder builder) {
163165
this.rowGroupRowCountLimit = builder.rowGroupRowCountLimit;
164166
this.pageRowCountLimit = builder.pageRowCountLimit;
165167
this.pageWriteChecksumEnabled = builder.pageWriteChecksumEnabled;
168+
this.dictionaryEarlyCheckEnabled = builder.dictionaryEarlyCheckEnabled;
166169
this.byteStreamSplitEnabled = builder.byteStreamSplitEnabled.build();
167170
this.extraMetaData = builder.extraMetaData;
168171
this.statistics = builder.statistics.build();
@@ -322,6 +325,10 @@ public boolean getPageWriteChecksumEnabled() {
322325
return pageWriteChecksumEnabled;
323326
}
324327

328+
public boolean isDictionaryEarlyCheckEnabled() {
329+
return dictionaryEarlyCheckEnabled;
330+
}
331+
325332
public OptionalLong getBloomFilterNDV(ColumnDescriptor column) {
326333
Long ndv = bloomFilterNDVs.getValue(column);
327334
return ndv == null ? OptionalLong.empty() : OptionalLong.of(ndv);
@@ -415,6 +422,7 @@ public static class Builder {
415422
private int rowGroupRowCountLimit = DEFAULT_ROW_GROUP_ROW_COUNT_LIMIT;
416423
private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT;
417424
private boolean pageWriteChecksumEnabled = DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED;
425+
private boolean dictionaryEarlyCheckEnabled = DEFAULT_DICTIONARY_EARLY_CHECK_ENABLED;
418426
private final ColumnProperty.Builder<ByteStreamSplitMode> byteStreamSplitEnabled;
419427
private Map<String, String> extraMetaData = new HashMap<>();
420428
private final ColumnProperty.Builder<Boolean> statistics;
@@ -450,6 +458,7 @@ private Builder(ParquetProperties toCopy) {
450458
this.allocator = toCopy.allocator;
451459
this.pageRowCountLimit = toCopy.pageRowCountLimit;
452460
this.pageWriteChecksumEnabled = toCopy.pageWriteChecksumEnabled;
461+
this.dictionaryEarlyCheckEnabled = toCopy.dictionaryEarlyCheckEnabled;
453462
this.bloomFilterNDVs = ColumnProperty.builder(toCopy.bloomFilterNDVs);
454463
this.bloomFilterFPPs = ColumnProperty.builder(toCopy.bloomFilterFPPs);
455464
this.bloomFilterEnabled = ColumnProperty.builder(toCopy.bloomFilterEnabled);
@@ -709,6 +718,11 @@ public Builder withPageWriteChecksumEnabled(boolean val) {
709718
return this;
710719
}
711720

721+
public Builder withDictionaryEarlyCheckEnabled(boolean val) {
722+
this.dictionaryEarlyCheckEnabled = val;
723+
return this;
724+
}
725+
712726
public Builder withExtraMetaData(Map<String, String> extraMetaData) {
713727
this.extraMetaData = extraMetaData;
714728
return this;

parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,9 @@ static ValuesWriter dictWriterWithFallBack(
111111
ValuesWriter writerToFallBackTo) {
112112
if (parquetProperties.isDictionaryEnabled(path)) {
113113
return FallbackValuesWriter.of(
114-
dictionaryWriter(path, parquetProperties, dictPageEncoding, dataPageEncoding), writerToFallBackTo);
114+
dictionaryWriter(path, parquetProperties, dictPageEncoding, dataPageEncoding),
115+
writerToFallBackTo,
116+
parquetProperties.isDictionaryEarlyCheckEnabled());
115117
} else {
116118
return writerToFallBackTo;
117119
}

parquet-column/src/main/java/org/apache/parquet/column/values/fallback/FallbackValuesWriter.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,12 @@ public class FallbackValuesWriter<I extends ValuesWriter & RequiresFallback, F e
3030

3131
public static <I extends ValuesWriter & RequiresFallback, F extends ValuesWriter> FallbackValuesWriter<I, F> of(
3232
I initialWriter, F fallBackWriter) {
33-
return new FallbackValuesWriter<>(initialWriter, fallBackWriter);
33+
return new FallbackValuesWriter<>(initialWriter, fallBackWriter, true);
34+
}
35+
36+
public static <I extends ValuesWriter & RequiresFallback, F extends ValuesWriter> FallbackValuesWriter<I, F> of(
37+
I initialWriter, F fallBackWriter, boolean checkCompressionOnFirstPage) {
38+
return new FallbackValuesWriter<>(initialWriter, fallBackWriter, checkCompressionOnFirstPage);
3439
}
3540

3641
/**
@@ -44,6 +49,8 @@ public static <I extends ValuesWriter & RequiresFallback, F extends ValuesWriter
4449

4550
private boolean fellBackAlready = false;
4651

52+
private final boolean checkCompressionOnFirstPage;
53+
4754
/**
4855
* writer currently written to
4956
*/
@@ -63,10 +70,15 @@ public static <I extends ValuesWriter & RequiresFallback, F extends ValuesWriter
6370
private boolean firstPage = true;
6471

6572
public FallbackValuesWriter(I initialWriter, F fallBackWriter) {
73+
this(initialWriter, fallBackWriter, true);
74+
}
75+
76+
public FallbackValuesWriter(I initialWriter, F fallBackWriter, boolean checkCompressionOnFirstPage) {
6677
super();
6778
this.initialWriter = initialWriter;
6879
this.fallBackWriter = fallBackWriter;
6980
this.currentWriter = initialWriter;
81+
this.checkCompressionOnFirstPage = checkCompressionOnFirstPage;
7082
}
7183

7284
@Override
@@ -82,7 +94,7 @@ public BytesInput getBytes() {
8294
if (!fellBackAlready && firstPage) {
8395
// we use the first page to decide if we're going to use this encoding
8496
BytesInput bytes = initialWriter.getBytes();
85-
if (!initialWriter.isCompressionSatisfying(rawDataByteSize, bytes.size())) {
97+
if (checkCompressionOnFirstPage && !initialWriter.isCompressionSatisfying(rawDataByteSize, bytes.size())) {
8698
fallBack();
8799
} else {
88100
return bytes;
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.parquet.column.values.fallback;
20+
21+
import static org.junit.Assert.assertFalse;
22+
import static org.junit.Assert.assertTrue;
23+
24+
import org.apache.parquet.bytes.DirectByteBufferAllocator;
25+
import org.apache.parquet.bytes.TrackingByteBufferAllocator;
26+
import org.apache.parquet.column.Encoding;
27+
import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainIntegerDictionaryValuesWriter;
28+
import org.apache.parquet.column.values.plain.PlainValuesWriter;
29+
import org.junit.After;
30+
import org.junit.Before;
31+
import org.junit.Test;
32+
33+
public class TestFallbackValuesWriter {
34+
35+
private TrackingByteBufferAllocator allocator;
36+
37+
@Before
38+
public void initAllocator() {
39+
allocator = TrackingByteBufferAllocator.wrap(new DirectByteBufferAllocator());
40+
}
41+
42+
@After
43+
public void closeAllocator() {
44+
allocator.close();
45+
}
46+
47+
@Test
48+
public void testEarlyCheckDisabledPreservesDictionary() throws Exception {
49+
int dictPageSize = 1024 * 1024;
50+
PlainIntegerDictionaryValuesWriter dictWriter = new PlainIntegerDictionaryValuesWriter(
51+
dictPageSize, Encoding.PLAIN_DICTIONARY, Encoding.PLAIN_DICTIONARY, allocator);
52+
PlainValuesWriter plainWriter = new PlainValuesWriter(1024, 1024 * 1024, allocator);
53+
54+
FallbackValuesWriter<PlainIntegerDictionaryValuesWriter, PlainValuesWriter> writer =
55+
FallbackValuesWriter.of(dictWriter, plainWriter, false);
56+
57+
try {
58+
// Write many distinct values — would normally cause isCompressionSatisfying to fail
59+
for (int i = 0; i < 1000; i++) {
60+
writer.writeInteger(i);
61+
}
62+
63+
writer.getBytes();
64+
Encoding encoding = writer.getEncoding();
65+
66+
assertTrue(
67+
"Dictionary encoding should be preserved when early check is disabled",
68+
encoding.usesDictionary());
69+
} finally {
70+
writer.close();
71+
}
72+
}
73+
74+
@Test
75+
public void testEarlyCheckEnabledFallsBack() throws Exception {
76+
int dictPageSize = 1024 * 1024;
77+
PlainIntegerDictionaryValuesWriter dictWriter = new PlainIntegerDictionaryValuesWriter(
78+
dictPageSize, Encoding.PLAIN_DICTIONARY, Encoding.PLAIN_DICTIONARY, allocator);
79+
PlainValuesWriter plainWriter = new PlainValuesWriter(1024, 1024 * 1024, allocator);
80+
81+
FallbackValuesWriter<PlainIntegerDictionaryValuesWriter, PlainValuesWriter> writer =
82+
FallbackValuesWriter.of(dictWriter, plainWriter, true);
83+
84+
try {
85+
// Write many distinct values — encoded size will exceed raw size
86+
for (int i = 0; i < 1000; i++) {
87+
writer.writeInteger(i);
88+
}
89+
90+
writer.getBytes();
91+
Encoding encoding = writer.getEncoding();
92+
93+
assertFalse(
94+
"Should fall back to plain encoding when early check is enabled with high cardinality",
95+
encoding.usesDictionary());
96+
} finally {
97+
writer.close();
98+
}
99+
}
100+
}

0 commit comments

Comments
 (0)