Skip to content

Commit 5baa903

Browse files
authored
GH-3026: Add a fix to ParquetRewriter when you try to nullify and encrypt 2 separate columns (#3027)
1 parent eed26b4 commit 5baa903

2 files changed

Lines changed: 20 additions & 16 deletions

File tree

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -144,10 +144,11 @@ public class ParquetRewriter implements Closeable {
144144
// Reader and relevant states of the in-processing input file
145145
private final Queue<TransParquetFileReader> inputFiles = new LinkedList<>();
146146
private final Queue<TransParquetFileReader> inputFilesToJoin = new LinkedList<>();
147-
private MessageType outSchema;
147+
private final MessageType outSchema;
148148
// The index cache strategy
149149
private final IndexCache.CacheStrategy indexCacheStrategy;
150150
private final boolean overwriteInputWithJoinColumns;
151+
private final InternalFileEncryptor nullColumnEncryptor;
151152

152153
public ParquetRewriter(RewriteOptions options) throws IOException {
153154
this.newCodecName = options.getNewCodecName();
@@ -194,6 +195,17 @@ public ParquetRewriter(RewriteOptions options) throws IOException {
194195
ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED,
195196
options.getFileEncryptionProperties());
196197
writer.start();
198+
// column nullification requires a separate encryptor and forcing other columns encryption initialization
199+
if (options.getFileEncryptionProperties() == null) {
200+
this.nullColumnEncryptor = null;
201+
} else {
202+
this.nullColumnEncryptor = new InternalFileEncryptor(options.getFileEncryptionProperties());
203+
List<ColumnDescriptor> columns = outSchema.getColumns();
204+
for (int i = 0; i < columns.size(); i++) {
205+
writer.getEncryptor()
206+
.getColumnSetup(ColumnPath.get(columns.get(i).getPath()), true, i);
207+
}
208+
}
197209
}
198210

199211
// TODO: Should we mark it as deprecated to encourage the main constructor usage? it is also used only from
@@ -226,6 +238,7 @@ public ParquetRewriter(
226238
this.inputFiles.add(reader);
227239
this.indexCacheStrategy = IndexCache.CacheStrategy.NONE;
228240
this.overwriteInputWithJoinColumns = false;
241+
this.nullColumnEncryptor = null;
229242
}
230243

231244
private MessageType getSchema() {
@@ -436,15 +449,7 @@ private void processBlock(
436449
"Required column [" + descriptor.getPrimitiveType().getName() + "] cannot be nullified");
437450
}
438451
nullifyColumn(
439-
reader,
440-
blockIdx,
441-
descriptor,
442-
chunk,
443-
writer,
444-
outSchema,
445-
newCodecName,
446-
encryptColumn,
447-
originalCreatedBy);
452+
reader, blockIdx, descriptor, chunk, writer, newCodecName, encryptColumn, originalCreatedBy);
448453
} else {
449454
throw new UnsupportedOperationException("Only nullify is supported for now");
450455
}
@@ -858,7 +863,6 @@ private void nullifyColumn(
858863
ColumnDescriptor descriptor,
859864
ColumnChunkMetaData chunk,
860865
ParquetFileWriter writer,
861-
MessageType schema,
862866
CompressionCodecName newCodecName,
863867
boolean encryptColumn,
864868
String originalCreatedBy)
@@ -871,7 +875,7 @@ private void nullifyColumn(
871875
int dMax = descriptor.getMaxDefinitionLevel();
872876
PageReadStore pageReadStore = reader.readRowGroup(blockIndex);
873877
ColumnReadStoreImpl crStore =
874-
new ColumnReadStoreImpl(pageReadStore, new DummyGroupConverter(), schema, originalCreatedBy);
878+
new ColumnReadStoreImpl(pageReadStore, new DummyGroupConverter(), outSchema, originalCreatedBy);
875879
ColumnReader cReader = crStore.getColumnReader(descriptor);
876880

877881
ParquetProperties.WriterVersion writerVersion = chunk.getEncodingStats().usesV2Pages()
@@ -883,14 +887,14 @@ private void nullifyColumn(
883887
CompressionCodecFactory.BytesInputCompressor compressor = codecFactory.getCompressor(newCodecName);
884888

885889
// Create new schema that only has the current column
886-
MessageType newSchema = newSchema(schema, descriptor);
890+
MessageType newSchema = newSchema(outSchema, descriptor);
887891
ColumnChunkPageWriteStore cPageStore = new ColumnChunkPageWriteStore(
888892
compressor,
889893
newSchema,
890894
props.getAllocator(),
891895
props.getColumnIndexTruncateLength(),
892896
props.getPageWriteChecksumEnabled(),
893-
writer.getEncryptor(),
897+
nullColumnEncryptor,
894898
numBlocksRewritten);
895899
ColumnWriteStore cStore = props.newColumnWriteStore(newSchema, cPageStore);
896900
ColumnWriter cWriter = cStore.getColumnWriter(descriptor);

parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -455,7 +455,7 @@ public void testRewriteWithoutColumnIndexes() throws Exception {
455455

456456
private void testNullifyAndEncryptColumn(List<Path> inputPaths) throws Exception {
457457
Map<String, MaskMode> maskColumns = new HashMap<>();
458-
maskColumns.put("DocId", MaskMode.NULLIFY);
458+
maskColumns.put("Links.Forward", MaskMode.NULLIFY);
459459

460460
String[] encryptColumns = {"DocId"};
461461
FileEncryptionProperties fileEncryptionProperties =
@@ -491,7 +491,7 @@ private void testNullifyAndEncryptColumn(List<Path> inputPaths) throws Exception
491491
validateColumnData(Collections.emptySet(), maskColumns.keySet(), fileDecryptionProperties, false);
492492

493493
// Verify the page index
494-
validatePageIndex(ImmutableSet.of("DocId"), false);
494+
validatePageIndex(ImmutableSet.of("DocId", "Links.Forward"), false);
495495

496496
// Verify the column is encrypted
497497
ParquetMetadata metaData = getFileMetaData(outputFile, fileDecryptionProperties);

0 commit comments

Comments
 (0)