Skip to content

Commit 1f1e07b

Browse files
authored
GH-3211: Implement Variant parquet reader (#3212)
1 parent f3535a6 commit 1f1e07b

17 files changed

Lines changed: 3411 additions & 85 deletions

File tree

parquet-avro/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@
4848
<artifactId>parquet-common</artifactId>
4949
<version>${project.version}</version>
5050
</dependency>
51+
<dependency>
52+
<groupId>org.apache.parquet</groupId>
53+
<artifactId>parquet-variant</artifactId>
54+
<version>${project.version}</version>
55+
</dependency>
5156
<dependency>
5257
<groupId>org.apache.avro</groupId>
5358
<artifactId>avro</artifactId>

parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.parquet.io.api.GroupConverter;
3636
import org.apache.parquet.io.api.PrimitiveConverter;
3737
import org.apache.parquet.schema.GroupType;
38+
import org.apache.parquet.schema.LogicalTypeAnnotation;
3839
import org.apache.parquet.schema.MessageType;
3940
import org.apache.parquet.schema.Type;
4041

@@ -168,7 +169,11 @@ private static Converter newConverter(Schema schema, Type type, GenericData mode
168169
case MAP:
169170
return new MapConverter(parent, type.asGroupType(), schema, model);
170171
case RECORD:
171-
return new AvroIndexedRecordConverter<>(parent, type.asGroupType(), schema, model);
172+
if (type.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.VariantLogicalTypeAnnotation) {
173+
return new AvroVariantConverter(parent, type.asGroupType(), schema, model);
174+
} else {
175+
return new AvroIndexedRecordConverter<>(parent, type.asGroupType(), schema, model);
176+
}
172177
case STRING:
173178
return new AvroConverters.FieldStringConverter(parent);
174179
case UNION:

parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import org.apache.parquet.io.api.Converter;
6363
import org.apache.parquet.io.api.GroupConverter;
6464
import org.apache.parquet.schema.GroupType;
65+
import org.apache.parquet.schema.LogicalTypeAnnotation;
6566
import org.apache.parquet.schema.MessageType;
6667
import org.apache.parquet.schema.Type;
6768
import org.slf4j.Logger;
@@ -394,7 +395,11 @@ private static Converter newConverter(
394395
}
395396
return newStringConverter(schema, model, parent, validator);
396397
case RECORD:
397-
return new AvroRecordConverter(parent, type.asGroupType(), schema, model, validator);
398+
if (type.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.VariantLogicalTypeAnnotation) {
399+
return new AvroVariantConverter(parent, type.asGroupType(), schema, model);
400+
} else {
401+
return new AvroRecordConverter(parent, type.asGroupType(), schema, model, validator);
402+
}
398403
case ENUM:
399404
return new AvroConverters.FieldEnumConverter(parent, schema, model);
400405
case ARRAY:

parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,16 @@ public Optional<Schema> visit(
466466
LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) {
467467
return of(Schema.create(Schema.Type.STRING));
468468
}
469+
470+
@Override
471+
public Optional<Schema> visit(
472+
LogicalTypeAnnotation.VariantLogicalTypeAnnotation variantLogicalType) {
473+
String name = parquetGroupType.getName();
474+
List<Schema.Field> fields = new ArrayList<>();
475+
fields.add(new Schema.Field("metadata", Schema.create(Schema.Type.BYTES)));
476+
fields.add(new Schema.Field("value", Schema.create(Schema.Type.BYTES)));
477+
return of(Schema.createRecord(name, null, namespace(name, names), false, fields));
478+
}
469479
})
470480
.orElseThrow(
471481
() -> new UnsupportedOperationException("Cannot convert Parquet type " + parquetType));
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.parquet.avro;
20+
21+
import java.nio.ByteBuffer;
22+
import java.util.function.Consumer;
23+
import org.apache.avro.Schema;
24+
import org.apache.avro.generic.GenericData;
25+
import org.apache.parquet.Preconditions;
26+
import org.apache.parquet.io.api.Converter;
27+
import org.apache.parquet.io.api.GroupConverter;
28+
import org.apache.parquet.schema.GroupType;
29+
import org.apache.parquet.variant.ImmutableMetadata;
30+
import org.apache.parquet.variant.VariantBuilder;
31+
import org.apache.parquet.variant.VariantConverters;
32+
33+
/**
34+
* Converter for Variant values.
35+
*/
36+
class AvroVariantConverter extends GroupConverter implements VariantConverters.ParentConverter<VariantBuilder> {
37+
private final ParentValueContainer parent;
38+
private final Schema avroSchema;
39+
private final GenericData model;
40+
private final int metadataPos;
41+
private final int valuePos;
42+
private final GroupConverter wrappedConverter;
43+
44+
private VariantBuilder builder = null;
45+
private ImmutableMetadata metadata = null;
46+
47+
AvroVariantConverter(ParentValueContainer parent, GroupType variantGroup, Schema avroSchema, GenericData model) {
48+
this.parent = parent;
49+
this.avroSchema = avroSchema;
50+
this.metadataPos = avroSchema.getField("metadata").pos();
51+
this.valuePos = avroSchema.getField("value").pos();
52+
this.model = model;
53+
this.wrappedConverter = VariantConverters.newVariantConverter(variantGroup, this::setMetadata, this);
54+
}
55+
56+
@Override
57+
public void build(Consumer<VariantBuilder> consumer) {
58+
Preconditions.checkState(builder != null, "Cannot build variant: builder has not been initialized");
59+
consumer.accept(builder);
60+
}
61+
62+
@Override
63+
public Converter getConverter(int fieldIndex) {
64+
return wrappedConverter.getConverter(fieldIndex);
65+
}
66+
67+
@Override
68+
public void start() {
69+
wrappedConverter.start();
70+
}
71+
72+
@Override
73+
public void end() {
74+
wrappedConverter.end();
75+
76+
Preconditions.checkState(metadata != null, "Cannot build variant: missing metadata");
77+
78+
builder.appendNullIfEmpty();
79+
80+
Object record = model.newRecord(null, avroSchema);
81+
model.setField(record, "metadata", metadataPos, metadata.getEncodedBuffer());
82+
model.setField(record, "value", valuePos, builder.encodedValue());
83+
parent.add(record);
84+
85+
this.builder = null;
86+
}
87+
88+
void setMetadata(ByteBuffer metadataBuffer) {
89+
// If the metadata hasn't changed, we don't need to rebuild the map.
90+
if (metadata == null || metadata.getEncodedBuffer() != metadataBuffer) {
91+
this.metadata = new ImmutableMetadata(metadataBuffer);
92+
}
93+
94+
this.builder = new VariantBuilder(metadata);
95+
}
96+
}

0 commit comments

Comments
 (0)